You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/30 15:33:13 UTC
svn commit: r1487858 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/
Author: chirino
Date: Thu May 30 13:33:12 2013
New Revision: 1487858
URL: http://svn.apache.org/r1487858
Log:
Adding debugging details for the openwire protocol too.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1487858&r1=1487857&r2=1487858&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Thu May 30 13:33:12 2013
@@ -419,8 +419,8 @@ abstract class DeliveryProducerRoute(rou
", retained: "+reained_base.retained()+
", is_connected: "+is_connected+
", dispatch_delivery: "+dispatch_delivery+
- ", dispatch_sessions: "+dispatch_sessions.size+
- ", "+super.toString
+ ", dispatch_sessions: "+dispatch_sessions+
+ ", "+super.toString +
", targets: "+targets
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1487858&r1=1487857&r2=1487858&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Thu May 30 13:33:12 2013
@@ -129,7 +129,7 @@ class Topic(val router:LocalRouter, val
case class ProxyConsumerSession(proxy:ProxyDeliveryConsumer, session:DeliverySession) extends DeliverySession with SessionSinkFilter[Delivery] {
- override def toString = proxy.consumer.toString + " (via "+address+")"
+ override def toString = ""+address+"->"+session
def downstream = session
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1487858&r1=1487857&r2=1487858&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu May 30 13:33:12 2013
@@ -17,34 +17,38 @@
package org.apache.activemq.apollo.openwire
-import OpenwireConstants._
+import collection.mutable.{ListBuffer, HashMap}
import org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
+import org.fusesource.hawtbuf.Buffer._
import java.io.IOException
-import org.apache.activemq.apollo.selector.SelectorParser
-import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
-import org.apache.activemq.apollo.broker.store._
-import org.apache.activemq.apollo.util._
import java.util.concurrent.TimeUnit
import java.util.Map.Entry
import org.fusesource.hawtdispatch.transport._
-import codec.OpenWireFormat
-import command._
-import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
-import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, TopicDestinationDTO, DurableSubscriptionDestinationDTO, DestinationDTO}
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.util.path.Path
import org.apache.activemq.apollo.broker._
-import path.Path
-import protocol._
-import security.SecurityContext
-import DestinationConverter._
-import Buffer._
-import java.net.InetSocketAddress
-import java.security.cert.X509Certificate
+import org.apache.activemq.apollo.broker.store._
+import org.apache.activemq.apollo.broker.protocol._
+import org.apache.activemq.apollo.broker.security.SecurityContext
+import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, DestinationDTO}
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat
+import org.apache.activemq.apollo.openwire.command._
+import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
+import org.apache.activemq.apollo.openwire.DestinationConverter._
+import org.apache.activemq.apollo.openwire.OpenwireConstants._
+
+object ZeroSizer extends Sizer[(Session[Delivery], Delivery)] {
+ def size(value:(Session[Delivery], Delivery)):Int = 0
+}
object OpenwireProtocolHandler extends Log {
+
+
def unit:Unit = {}
val DEFAULT_DIE_DELAY = 5 * 1000L
@@ -136,6 +140,25 @@ class OpenwireProtocolHandler extends Pr
rc.messages_sent = messages_sent
rc.messages_received = messages_received
rc.waiting_on = waiting_on()
+
+ if( debug ) {
+ import collection.JavaConversions._
+ val out = new StringBuilder
+ out.append("\n--- connection ---\n")
+ out.append("--- producers ---\n")
+ for( p <- producerRoutes.values() ) {
+ out.append(" { "+p+" }\n")
+ }
+ out.append("--- consumers ---\n")
+ for( c <- all_consumers.values ) {
+ out.append(" { "+c+" }\n")
+ }
+ out.append("--- transactions ---\n")
+ for( t <- all_transactions.values ) {
+ out.append(" { "+t+" }\n")
+ }
+ rc.debug = out.toString()
+ }
rc
}
@@ -642,6 +665,8 @@ class OpenwireProtocolHandler extends Pr
resume_read
}
}
+
+ override def toString: String = "addresses: ["+addresses.mkString(", ")+"],"+super.toString
}
def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
@@ -832,7 +857,7 @@ class OpenwireProtocolHandler extends Pr
// r.release
// }
- override def toString = "openwire consumer id:"+info.getConsumerId+", remote address: "+security_context.remote_address
+ override def toString = "openwire consumer:"+info.getConsumerId+", session_manager: "+session_manager
var selector_expression:BooleanExpression = _
var addresses:Array[_ <: BindAddress] = _
@@ -854,7 +879,7 @@ class OpenwireProtocolHandler extends Pr
}
messages_sent += 1
dispatch
- }, SessionDeliverySizer)
+ }, ZeroSizer)
credit_window_filter.credit(info.getPrefetchSize, 0)
@@ -978,6 +1003,8 @@ class OpenwireProtocolHandler extends Pr
producer.dispatch_queue.assertExecuting()
retain
+ override def toString = "openwire consumer session:"+info.getConsumerId+", remote address: "+security_context.remote_address+", downstream: "+downstream
+
val downstream = session_manager.open(producer.dispatch_queue)
var closed = false