You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/08 21:36:07 UTC
svn commit: r1480418 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protoc...
Author: chirino
Date: Wed May 8 19:36:06 2013
New Revision: 1480418
URL: http://svn.apache.org/r1480418
Log:
Adding support for a query parameter to include detailed debug info about a connection's state.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Wed May 8 19:36:06 2013
@@ -124,7 +124,7 @@ class AmqpProtocolHandler extends Protoc
var messages_sent = 0L
var messages_received = 0L
- override def create_connection_status = {
+ override def create_connection_status(debug:Boolean) = {
var rc = new AmqpConnectionStatusDTO
rc.protocol_version = "1.0.0"
rc.user = security_context.user
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed May 8 19:36:06 2013
@@ -150,11 +150,11 @@ class BrokerConnection(var connector: Co
protected override def on_transport_failure(error: IOException) = protocol_handler.on_transport_failure(error)
- def get_connection_status = {
+ def get_connection_status(debug:Boolean=false) = {
val result = if( protocol_handler==null ) {
new ConnectionStatusDTO
} else {
- protocol_handler.create_connection_status
+ protocol_handler.create_connection_status(debug)
}
result.id = id.toString
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Wed May 8 19:36:06 2013
@@ -157,7 +157,7 @@ class AcceptingConnector(val broker:Brok
for( (id, connection) <- broker.connections if connection.connector eq this ) {
result.connections.add( new LongIdLabeledDTO(id, connection.transport.getRemoteAddress.toString ) )
- val status = connection.get_connection_status
+ val status = connection.get_connection_status(false)
if( status!=null ) {
result.messages_sent += status.messages_sent
result.messages_received += status.messages_received
@@ -328,7 +328,7 @@ class AcceptingConnector(val broker:Brok
val at_limit = at_connection_limit
if( broker.connections.remove(connection.id).isDefined ) {
connected.decrementAndGet()
- val status = connection.get_connection_status
+ val status = connection.get_connection_status(false)
if( status!=null ) {
dead_messages_sent += status.messages_sent
dead_messages_received += status.messages_received
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed May 8 19:36:06 2013
@@ -270,4 +270,19 @@ class Delivery {
def redelivered = redeliveries = ((redeliveries+1).min(Short.MaxValue)).toShort
+ override def toString = {
+ "Delivery(" +
+ "sender:"+sender+", "+
+ "size:"+size+", "+
+ "message codec:"+message.codec.id+", "+
+ "expiration:"+expiration+", "+
+ "persistent:"+persistent+", "+
+ "redeliveries:"+redeliveries+", "+
+ "seq:"+seq+", "+
+ "storeKey:"+storeKey+", "+
+ "storeLocator:"+storeLocator+", "+
+ "uow:"+uow+", "+
+ "ack:"+(ack!=null)+
+ ")"
+ }
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed May 8 19:36:06 2013
@@ -1219,7 +1219,15 @@ class Queue(val router: LocalRouter, val
class QueueDeliverySession(val producer: DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery]{
retain
- override def toString = Queue.this.toString
+ def odlToString = Queue.this.toString
+ override def toString = {
+ "QueueDeliverySession("+
+ "queue: "+Queue.this.id +
+ ", full:"+full+
+ ", "+downstream+
+ ")"
+ }
+
override def consumer = Queue.this
val downstream = session_manager.open(producer.dispatch_queue)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed May 8 19:36:06 2013
@@ -414,5 +414,14 @@ abstract class DeliveryProducerRoute(rou
}
}
+ override def toString = {
+ "last_send: "+last_send+
+ ", retained: "+reained_base.retained()+
+ ", is_connected: "+is_connected+
+ ", dispatch_delivery: "+dispatch_delivery+
+ ", dispatch_sessions: "+dispatch_sessions.size+
+ ", "+super.toString
+ ", targets: "+targets
+ }
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed May 8 19:36:06 2013
@@ -56,9 +56,13 @@ abstract class Sink[T] {
def map[Y](func: Y=>T ):Sink[Y] = new SinkMapper[Y,T] {
def passing(value: Y) = func(value)
def downstream = Sink.this
+ override def toString: String = downstream.toString
}
def flatMap[Y](func: Y=>Option[T]):Sink[Y] = new Sink[Y] with SinkFilter[T] {
+
+ override def toString: String = downstream.toString
+
def downstream = Sink.this
def offer(value:Y) = {
if( full ) {
@@ -152,6 +156,7 @@ class TransportSink(val transport:Transp
var refiller:Task = NOOP
def full:Boolean = transport.full
def offer(value:AnyRef) = transport.offer(value)
+ override def toString: String = "TransportSink(full:"+full+")"
}
/**
@@ -161,7 +166,14 @@ class TransportSink(val transport:Transp
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class OverflowSink[T](val downstream:Sink[T]) extends AbstractOverflowSink[T]
+class OverflowSink[T](val downstream:Sink[T]) extends AbstractOverflowSink[T] {
+ override def toString: String = {
+ "OverflowSink("+
+ super.toString+
+ ", "+downstream+
+ ")"
+ }
+}
abstract class AbstractOverflowSink[T] extends Sink[T] {
@@ -171,6 +183,11 @@ abstract class AbstractOverflowSink[T] e
val overflow = new LinkedList[T]()
+ override def toString = {
+ "overflow: "+overflow
+ ", full: "+full
+ }
+
def overflowed = !overflow.isEmpty
def full = overflowed || downstream.full
@@ -229,7 +246,6 @@ abstract class AbstractOverflowSink[T] e
*/
protected def onDelivered(value:T) = {
}
-
}
@@ -279,6 +295,15 @@ class SinkMux[T](val downstream:Sink[T])
class ManagedSink extends Sink[T] {
+
+ override def toString: String = {
+ "ManagedSink("+
+ "rejection_handler: "+(rejection_handler)+
+ ", full: "+(full)+
+ ", "+downstream+
+ ")"
+ }
+
var rejection_handler:(T)=>Unit = _
var refiller:Task = NOOP
@@ -321,12 +346,23 @@ class CreditWindowFilter[T](val downstre
var delivery_credits = 0
var byte_credits = 0
- var disabled = true
+ var enabled = true
+
- override def full: Boolean = downstream.full || ( disabled && byte_credits <= 0 && delivery_credits <= 0 )
+ override def toString: String = {
+ "CreditWindowFilter("+
+ "enabled:"+enabled+
+ ", delivery_credits:"+delivery_credits+
+ ", byte_credits:"+byte_credits+
+ ", full:"+full+
+ ", "+downstream+
+ ")"
+ }
+
+ override def full: Boolean = downstream.full || ( enabled && byte_credits <= 0 && delivery_credits <= 0 )
def disable = {
- disabled = false
+ enabled = false
refiller.run()
}
@@ -400,6 +436,14 @@ class SessionSinkMux[T](val downstream:S
var sessions = HashSet[Session[T]]()
var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
+ override def toString: String = {
+ "SessionSinkMux(" +
+ "sessions: "+sessions.size+
+ ", overflowed_sessions: "+overflowed_sessions.size+
+ ", "+downstream+
+ ")"
+ }
+
def open(producer_queue:DispatchQueue):SessionSink[T] = {
val session = new Session[T](this, producer_queue)
consumer_queue <<| ^{
@@ -560,6 +604,21 @@ class Session[T](mux:SessionSinkMux[T],
@volatile
var enqueue_ts = mux.time_stamp
+
+ override def toString: String = {
+ "Session("+
+ "enqueue_item_counter:"+enqueue_item_counter+
+ ", enqueue_size_counter:"+enqueue_size_counter+
+ ", delivery_credits:"+delivery_credits+
+ ", size_credits:"+size_credits+
+ ", size_credits:"+size_credits+
+ ", overflow:"+overflow.size()+
+ ", stall_counter:"+stall_counter+
+ ", size_bonus:"+size_bonus +
+ ", full:"+full+
+ ")"
+ }
+
def credit(delivery_credits:Int, size_credits:Int) = {
if( delivery_credits!=0 || size_credits!=0 ) {
credit_adder.merge((delivery_credits, size_credits))
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed May 8 19:36:06 2013
@@ -73,17 +73,20 @@ class Topic(val router:LocalRouter, val
def matches(message: Delivery) = true
def is_persistent = false
def dispatch_queue = null
- def connect(producer: DeliveryProducer) = ProxyProducerSession(producer)
+ def connect(producer: DeliveryProducer) = new ProxyProducerSession(producer)
}
- case class ProxyProducerSession(val producer:DeliveryProducer) extends DeliverySession {
+ class ProxyProducerSession(val producer:DeliveryProducer) extends DeliverySession {
+
dispatch_queue {
proxy_sessions.add(this)
}
+ override def toString: String = "ProxyProducerSession(topic="+id+")"
+
def remaining_capacity = 1
var enqueue_ts = 0L
var enqueue_size_counter = 0L
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Wed May 8 19:36:06 2013
@@ -93,7 +93,7 @@ trait ProtocolHandler {
this.connection = brokerConnection
}
- def create_connection_status = new ConnectionStatusDTO
+ def create_connection_status(debug:Boolean) = new ConnectionStatusDTO
def on_transport_failure(error:IOException) = {
trace(error)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/UdpProtocol.scala Wed May 8 19:36:06 2013
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import org.fusesource.hawtdispatch._
import java.nio.channels.{DatagramChannel, WritableByteChannel, ReadableByteChannel}
import java.net.SocketAddress
-import org.apache.activemq.apollo.dto.{ProtocolDTO, UdpDTO, AcceptingConnectorDTO}
+import org.apache.activemq.apollo.dto.{ConnectionStatusDTO, ProtocolDTO, UdpDTO, AcceptingConnectorDTO}
import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
import java.util.Map.Entry
import org.apache.activemq.apollo.util._
@@ -135,8 +135,8 @@ abstract class UdpProtocolHandler extend
def broker = connection.connector.broker
def queue = connection.dispatch_queue
- override def create_connection_status = {
- var rc = super.create_connection_status
+ override def create_connection_status(debug:Boolean) = {
+ var rc = new ConnectionStatusDTO
rc.waiting_on = waiting_on
rc.messages_received = messages_received
rc
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Wed May 8 19:36:06 2013
@@ -219,7 +219,7 @@ class BrokerFunSuiteSupport extends FunS
broker.dispatch_queue {
info(" -- Connections -- ")
for(connection <- broker.connections.values) {
- info(json(connection.get_connection_status))
+ info(json(connection.get_connection_status(false)))
}
val router = broker.default_virtual_host.local_router
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java Wed May 8 19:36:06 2013
@@ -127,4 +127,11 @@ public class ConnectionStatusDTO extends
*/
@XmlAttribute(name="subscription_count")
public int subscription_count;
+
+ /**
+ * Holds detailed state data used to debug connections.
+ */
+ @XmlAttribute(name="debug")
+ public String debug;
+
}
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.java Wed May 8 19:36:06 2013
@@ -517,7 +517,7 @@ public class MqttProtocolHandler extends
LongCounter messages_received = new LongCounter(0);
int subscription_count = 0;
- public MqttConnectionStatusDTO create_connection_status() {
+ public MqttConnectionStatusDTO create_connection_status(boolean debug) {
MqttConnectionStatusDTO rc = new MqttConnectionStatusDTO();
rc.protocol_version = "3.1";
rc.messages_sent = messages_sent.get();
Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java (original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/java/org/apache/activemq/apollo/mqtt/MqttSession.java Wed May 8 19:36:06 2013
@@ -562,7 +562,7 @@ public class MqttSession {
if (route.full()) {
// but once it gets full.. suspend to flow control the producer.
route.suspended = true;
- handler._suspend_read("blocked sending to: " + route.dispatch_sessions().mkString(", "));
+ handler._suspend_read("blocked sending to: " + route.address);
}
} else {
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed May 8 19:36:06 2013
@@ -128,7 +128,7 @@ class OpenwireProtocolHandler extends Pr
var messages_received = 0L
val preferred_wireformat_settings = new WireFormatInfo();
- override def create_connection_status = {
+ override def create_connection_status(debug:Boolean) = {
var rc = new OpenwireConnectionStatusDTO
rc.protocol_version = ""+(if (wire_format == null) 0 else wire_format.getVersion)
rc.user = login.map(_.toString).getOrElse(null)
@@ -712,7 +712,7 @@ class OpenwireProtocolHandler extends Pr
// but once it gets full.. suspend, so that we get more messages
// until it's not full anymore.
route.suspended = true
- suspend_read("blocked destination: "+route.dispatch_sessions.mkString(", "))
+ suspend_read("blocked sending to: "+route.addresses.mkString(", ").mkString(", "))
}
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed May 8 19:36:06 2013
@@ -238,7 +238,26 @@ class StompProtocolHandler extends Proto
// }
- override def toString = "stomp subscription"+subscription_id.map(" id: "+_+",").getOrElse("")+" remote address: "+security_context.remote_address
+ override def toString ={
+ // Lets setup some helpers
+ def b( x:Boolean, y: =>String):String = if(x) y else ""
+ def h( x:AnyRef, y: =>String):String = if(x!=null) y else ""
+ def o[T]( x:Option[T], y: =>String):String = if(x.isDefined) y else ""
+
+ "StompConsumer("+
+ o(subscription_id, "subscription_id: "+subscription_id.get+", ")+
+ "addresses: "+addresses.mkString(",")+", "+
+ "ack_mode: "+ack_mode+", "+
+ initial_credit_window+
+ h(selector, ", selector: "+selector._1)+
+ b(browser, ", browser")+
+ b(close_on_drain, ", close_on_drain")+
+ b(exclusive, ", exclusive")+
+ b(from_seq!=0, ", from_seq: "+from_seq)+
+ o(include_seq, ", include_seq: "+include_seq)+
+ ", "+session_manager+
+ ")"
+ }
override def start_from_tail = from_seq == -1
@@ -597,7 +616,13 @@ class StompProtocolHandler extends Proto
val downstream = session_manager.open(producer.dispatch_queue)
- override def toString = "connection to "+StompProtocolHandler.this.connection.transport.getRemoteAddress
+ override def toString = {
+ "StompConsumerSession("+
+ "connection to: "+StompProtocolHandler.this.connection.transport.getRemoteAddress+", "
+ "closed: "+closed+", "
+ "downstream: "+downstream
+ ")"
+ }
def consumer = StompConsumer.this
var closed = false
@@ -768,7 +793,7 @@ class StompProtocolHandler extends Proto
var messages_sent = 0L
var messages_received = 0L
- override def create_connection_status = {
+ override def create_connection_status(debug:Boolean) = {
var rc = new StompConnectionStatusDTO
rc.protocol_version = if( protocol_version == null ) null else protocol_version.toString
rc.user = security_context.user
@@ -776,6 +801,25 @@ class StompProtocolHandler extends Proto
rc.waiting_on = waiting_on()
rc.messages_sent = messages_sent
rc.messages_received = messages_received
+ if( debug ) {
+ import collection.JavaConversions._
+ val out = new StringBuilder
+ out.append("\n--- connection ---\n")
+ out.append(" { routing_size:"+routing_size+" }\n")
+ out.append("--- producers ---\n")
+ for( p <- producer_routes.values() ) {
+ out.append(" { "+p+" }\n")
+ }
+ out.append("--- consumers ---\n")
+ for( c <- consumers.values ) {
+ out.append(" { "+c+" }\n")
+ }
+ out.append("--- transactions ---\n")
+ for( t <- transactions.values ) {
+ out.append(" { "+t+" }\n")
+ }
+ rc.debug = out.toString()
+ }
rc
}
@@ -1221,6 +1265,10 @@ class StompProtocolHandler extends Proto
}
super.offer(delivery)
}
+
+ override def toString = {
+ "addresses:"+key+", routing_items:"+routing_items+", "+super.toString
+ }
}
@@ -1434,7 +1482,7 @@ class StompProtocolHandler extends Proto
// but once it gets full.. suspend, so that we get more stomp messages
// until it's not full anymore.
route.suspended = true
- suspend_read("blocked sending to: "+route.dispatch_sessions.mkString(", "))
+ suspend_read("blocked sending to: "+route.addresses.mkString(", "))
}
frame.release
}
@@ -1718,6 +1766,11 @@ class StompProtocolHandler extends Proto
val queue = ListBuffer[((StoreUOW)=>Unit, ()=>Unit)]()
+
+ override def toString: String = {
+ "{ actions: "+queue.size+" }"
+ }
+
def add(on_commit:(StoreUOW)=>Unit, on_rollback:()=>Unit=null):Unit = {
queue += ((on_commit, on_rollback))
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1480418&r1=1480417&r2=1480418&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala Wed May 8 19:36:06 2013
@@ -967,7 +967,7 @@ class BrokerResource() extends Resource
monitoring(broker) {
val records = sync_all(broker.connections.values) { value =>
- value.get_connection_status
+ value.get_connection_status(false)
}
val rc:FutureResult[DataPageDTO] = records.map(narrow(classOf[ConnectionStatusDTO], _, f, q, p, ps, o))
@@ -978,10 +978,10 @@ class BrokerResource() extends Resource
@GET @Path("/connections/{id}")
@ApiOperation(value = "Gets that status of a connection.")
- def connection(@PathParam("id") id : Long):ConnectionStatusDTO = {
+ def connection(@PathParam("id") id : Long, @QueryParam("debug") debug:Boolean):ConnectionStatusDTO = {
with_connection(id){ connection=>
monitoring(connection.connector.broker) {
- connection.get_connection_status
+ connection.get_connection_status(debug)
}
}
}
@@ -1027,10 +1027,11 @@ class BrokerResource() extends Resource
@GET
@Path("/hawtdispatch/profile")
@ApiOperation(value="Enables or disables profiling")
- def hawtdispatch_profile(@QueryParam("enabled") enabled : Boolean) = ok {
+ def hawtdispatch_profile(@QueryParam("enabled") enabled : Boolean):String = {
with_broker { broker =>
admining(broker) {
Dispatch.profile(enabled)
+ ""
}
}
}