You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/30 15:33:13 UTC

svn commit: r1487858 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/

Author: chirino
Date: Thu May 30 13:33:12 2013
New Revision: 1487858

URL: http://svn.apache.org/r1487858
Log:
Adding debugging details for the openwire protocol too.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1487858&r1=1487857&r2=1487858&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Thu May 30 13:33:12 2013
@@ -419,8 +419,8 @@ abstract class DeliveryProducerRoute(rou
     ", retained: "+reained_base.retained()+
     ", is_connected: "+is_connected+
     ", dispatch_delivery: "+dispatch_delivery+
-    ", dispatch_sessions: "+dispatch_sessions.size+
-    ", "+super.toString
+    ", dispatch_sessions: "+dispatch_sessions+
+    ", "+super.toString +
     ", targets: "+targets
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1487858&r1=1487857&r2=1487858&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Thu May 30 13:33:12 2013
@@ -129,7 +129,7 @@ class Topic(val router:LocalRouter, val 
 
   case class ProxyConsumerSession(proxy:ProxyDeliveryConsumer, session:DeliverySession) extends DeliverySession with SessionSinkFilter[Delivery] {
 
-    override def toString = proxy.consumer.toString + " (via "+address+")"
+    override def toString = ""+address+"->"+session
 
     def downstream = session
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1487858&r1=1487857&r2=1487858&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu May 30 13:33:12 2013
@@ -17,34 +17,38 @@
 
 package org.apache.activemq.apollo.openwire
 
-import OpenwireConstants._
+import collection.mutable.{ListBuffer, HashMap}
 
 import org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
+import org.fusesource.hawtbuf.Buffer._
 
 import java.io.IOException
-import org.apache.activemq.apollo.selector.SelectorParser
-import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
-import org.apache.activemq.apollo.broker.store._
-import org.apache.activemq.apollo.util._
 import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
 import org.fusesource.hawtdispatch.transport._
-import codec.OpenWireFormat
-import command._
-import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
-import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, TopicDestinationDTO, DurableSubscriptionDestinationDTO, DestinationDTO}
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.util.path.Path
 import org.apache.activemq.apollo.broker._
-import path.Path
-import protocol._
-import security.SecurityContext
-import DestinationConverter._
-import Buffer._
-import java.net.InetSocketAddress
-import java.security.cert.X509Certificate
+import org.apache.activemq.apollo.broker.store._
+import org.apache.activemq.apollo.broker.protocol._
+import org.apache.activemq.apollo.broker.security.SecurityContext
+import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, DestinationDTO}
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat
+import org.apache.activemq.apollo.openwire.command._
+import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
+import org.apache.activemq.apollo.openwire.DestinationConverter._
+import org.apache.activemq.apollo.openwire.OpenwireConstants._
+
+object ZeroSizer extends Sizer[(Session[Delivery], Delivery)] {
+  def size(value:(Session[Delivery], Delivery)):Int = 0
+}
 
 object OpenwireProtocolHandler extends Log {
+
+
   def unit:Unit = {}
 
   val DEFAULT_DIE_DELAY = 5 * 1000L
@@ -136,6 +140,25 @@ class OpenwireProtocolHandler extends Pr
     rc.messages_sent = messages_sent
     rc.messages_received = messages_received
     rc.waiting_on = waiting_on()
+
+    if( debug ) {
+      import collection.JavaConversions._
+      val out = new StringBuilder
+      out.append("\n--- connection ---\n")
+      out.append("--- producers ---\n")
+      for( p <- producerRoutes.values() ) {
+        out.append("  { "+p+" }\n")
+      }
+      out.append("--- consumers ---\n")
+      for( c <- all_consumers.values ) {
+        out.append("  { "+c+" }\n")
+      }
+      out.append("--- transactions ---\n")
+      for( t <- all_transactions.values ) {
+        out.append("  { "+t+" }\n")
+      }
+      rc.debug = out.toString()
+    }
     rc
   }
 
@@ -642,6 +665,8 @@ class OpenwireProtocolHandler extends Pr
         resume_read
       }
     }
+
+    override def toString: String = "addresses: ["+addresses.mkString(", ")+"],"+super.toString
   }
 
   def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
@@ -832,7 +857,7 @@ class OpenwireProtocolHandler extends Pr
 //      r.release
 //    }
 
-    override def toString = "openwire consumer id:"+info.getConsumerId+", remote address: "+security_context.remote_address
+    override def toString = "openwire consumer:"+info.getConsumerId+", session_manager: "+session_manager
 
     var selector_expression:BooleanExpression = _
     var addresses:Array[_ <: BindAddress] = _
@@ -854,7 +879,7 @@ class OpenwireProtocolHandler extends Pr
       }
       messages_sent += 1
       dispatch
-    }, SessionDeliverySizer)
+    }, ZeroSizer)
 
     credit_window_filter.credit(info.getPrefetchSize, 0)
 
@@ -978,6 +1003,8 @@ class OpenwireProtocolHandler extends Pr
       producer.dispatch_queue.assertExecuting()
       retain
 
+      override def toString = "openwire consumer session:"+info.getConsumerId+", remote address: "+security_context.remote_address+", downstream: "+downstream
+
       val downstream = session_manager.open(producer.dispatch_queue)
       var closed = false