You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:43:46 UTC

svn commit: r961074 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/ activemq-stomp/src/main/resources/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ activ...

Author: chirino
Date: Wed Jul  7 03:43:46 2010
New Revision: 961074

URL: http://svn.apache.org/viewvc?rev=961074&view=rev
Log:
Better logging

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties
      - copied, changed from r961073, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:43:46 2010
@@ -143,6 +143,7 @@ class Broker() extends Service with Logg
       }
 
       def onAccept(transport: Transport): Unit = {
+        debug("Accepted connection from: %s", transport.getRemoteAddress)
         var connection = new BrokerConnection(Broker.this)
         connection.transport = transport
         clientConnections.add(connection)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:43:46 2010
@@ -26,9 +26,8 @@ import _root_.java.lang.{String}
 import _root_.org.apache.activemq.util.{FactoryFinder, IOExceptionSupport}
 import _root_.org.apache.activemq.wireformat.WireFormat
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-class ConnectionConfig {
+import java.util.concurrent.atomic.AtomicLong
 
-}
 abstract class Connection() extends TransportListener with Service {
 
   val dispatchQueue = createQueue("connection")
@@ -70,12 +69,19 @@ abstract class Connection() extends Tran
 
 }
 
-object BrokerConnection extends Log
+object BrokerConnection extends Log {
+  val id_generator = new AtomicLong()
+}
 
 class BrokerConnection(val broker: Broker) extends Connection with Logging {
+
   override protected def log = BrokerConnection
+  override protected def log_map(message:String) = "connection:"+id+" | "+message
+
+  import BrokerConnection._
 
   var protocolHandler: ProtocolHandler = null;
+  val id = id_generator.incrementAndGet
 
   exceptionListener = new ExceptionListener() {
     def exceptionThrown(error:Exception) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala Wed Jul  7 03:43:46 2010
@@ -34,7 +34,7 @@ trait Destination {
   def getDestinations():Seq[Destination]
 }
 
-object Destination {
+object DestinationParser {
 
     /**
      * Parses a simple destination.

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul  7 03:43:46 2010
@@ -20,39 +20,192 @@ import _root_.java.util.{LinkedHashMap, 
 import _root_.java.lang.{Throwable, String}
 import _root_.org.apache.commons.logging.LogFactory
 import _root_.org.apache.commons.logging.{Log => Logger}
+import java.util.concurrent.atomic.AtomicLong
 
 trait Log {
   val log = LogFactory.getLog(getClass.getName)
 }
 
+object Logging {
+  val exception_id_generator = new AtomicLong(System.currentTimeMillis)
+  def next_exception_id = exception_id_generator.incrementAndGet.toHexString
+}
+
 /**
  * A Logging trait you can mix into an implementation class without affecting its public API
  */
 trait Logging {
 
+  import Logging._
   protected def log: Log
+  protected def log_map(message:String) = message
 
-  protected def error(message: => String): Unit = log.log.error(message)
-
-  protected def error(e: Throwable): Unit = log.log.error(e.getMessage, e)
-
-  protected def error(message: => String, e: Throwable): Unit = log.log.error(message, e)
-
-  protected def warn(message: => String): Unit = log.log.warn(message)
-
-  protected def warn(message: => String, e: Throwable): Unit = log.log.warn(message, e)
-
-  protected def info(message: => String): Unit = log.log.info(message)
-
-  protected def info(message: => String, e: Throwable): Unit = log.log.info(message, e)
-
-  protected def debug(message: => String): Unit = log.log.debug(message)
-
-  protected def debug(message: => String, e: Throwable): Unit = log.log.debug(message, e)
-
-  protected def trace(message: => String): Unit = log.log.trace(message)
-
-  protected def trace(message: => String, e: Throwable): Unit = log.log.trace(message, e)
+  protected def error(message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isErrorEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      l.error(log_map(m))
+    }
+  }
+
+  protected def error(e: Throwable, message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isErrorEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      val exception_id = next_exception_id
+      log.log.error(log_map(m)+" [id:"+exception_id+"]")
+      log.log.debug("[id:"+exception_id+"]", e)
+    }
+  }
+
+  protected def error(e: Throwable): Unit = {
+    val l = log.log
+    if( l.isErrorEnabled ) {
+      val exception_id = next_exception_id
+      log.log.error(log_map(e.getMessage)+" [id:"+exception_id+"]")
+      log.log.debug("[id:"+exception_id+"]", e)
+    }
+  }
+
+  protected def warn(message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isWarnEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      l.warn(log_map(m))
+    }
+  }
+
+  protected def warn(e: Throwable, message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isWarnEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      val exception_id = next_exception_id
+      log.log.warn(log_map(m)+" [id:"+exception_id+"]")
+      log.log.debug("[id:"+exception_id+"]", e)
+    }
+  }
+
+  protected def warn(e: Throwable): Unit = {
+    val l = log.log
+    if( l.isWarnEnabled ) {
+      val exception_id = next_exception_id
+      log.log.warn(log_map(e.getMessage)+" [id:"+exception_id+"]")
+      log.log.debug("[id:"+exception_id+"]", e)
+    }
+  }
+
+  protected def info(message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isInfoEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      l.info(log_map(m))
+    }
+  }
+
+  protected def info(e: Throwable, message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isInfoEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      val exception_id = next_exception_id
+      log.log.info(log_map(m)+" [id:"+exception_id+"]")
+      log.log.debug("[id:"+exception_id+"]", e)
+    }
+  }
+
+  protected def info(e: Throwable): Unit = {
+    val l = log.log
+    if( l.isInfoEnabled ) {
+      val exception_id = next_exception_id
+      log.log.info(log_map(e.getMessage)+" [id:"+exception_id+"]")
+      log.log.debug("[id:"+exception_id+"]", e)
+    }
+  }
+
+
+  protected def debug(message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isDebugEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      l.debug(log_map(m))
+    }
+  }
+
+  protected def debug(e: Throwable, message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isDebugEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      log.log.debug(log_map(m), e)
+    }
+  }
+
+  protected def debug(e: Throwable): Unit = {
+    val l = log.log
+    if( l.isDebugEnabled ) {
+      log.log.debug(log_map(e.getMessage), e)
+    }
+  }
+
+  protected def trace(message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isTraceEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      l.trace(log_map(m))
+    }
+  }
+
+  protected def trace(e: Throwable, message: => String, args:Any*): Unit = {
+    val l = log.log
+    if( l.isTraceEnabled ) {
+      val m = if( args.isEmpty ) {
+        message
+      } else {
+        format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+      }
+      log.log.trace(log_map(m), e)
+    }
+  }
+
+  protected def trace(e: Throwable): Unit = {
+    val l = log.log
+    if( l.isTraceEnabled ) {
+      log.log.trace(log_map(e.getMessage), e)
+    }
+  }
 
 }
-

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml Wed Jul  7 03:43:46 2010
@@ -89,7 +89,7 @@
     <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
-      <scope>test</scope>
+      <!--<scope>test</scope>-->
       <version>${log4j-version}</version>
     </dependency>
     

Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties (from r961073, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties&r1=961073&r2=961074&rev=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties Wed Jul  7 03:43:46 2010
@@ -19,13 +19,13 @@
 # The logging properties used during tests..
 #
 log4j.rootLogger=WARN, console, file
-log4j.logger.org.apache.activemq=INFO 
+log4j.logger.org.apache.activemq=DEBUG
 
 # Console will only display warnnings
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d | %-5p | %m%n
-log4j.appender.console.threshold=WARN
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=DEBUG
 
 # File appender will contain all info messages
 log4j.appender.file=org.apache.log4j.FileAppender

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul  7 03:43:46 2010
@@ -27,11 +27,12 @@ object StompBroker {
   var port = 61613
 
   def main(args:Array[String]) = {
-    println("Starting stomp broker...")
+    val uri = "tcp://"+address+":"+port+"?wireFormat=multi"
+
+    println("Starting stomp broker: "+uri)
 
     val broker = new Broker()
 
-    val uri = "tcp://"+address+":"+port+"?wireFormat=multi"
     val server = TransportFactory.bind(uri)
     broker.transportServers.add(server)
     broker.start

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:43:46 2010
@@ -51,11 +51,22 @@ object StompConstants {
 
 import StompConstants._
 
-class StompProtocolHandler extends ProtocolHandler {
+object StompProtocolHandler extends Log
+
+class StompProtocolHandler extends ProtocolHandler with Logging {
+
+  override protected def log = StompProtocolHandler
+  override protected def log_map(message:String) = {
+    if( connection==null )
+      message
+    else
+      "connection:"+connection.id+" | "+message
+  }
 
 
   class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
 
+
     val queue = StompProtocolHandler.this.dispatchQueue
     val session_manager = new DeliverySessionManager(outboundChannel, queue)
 
@@ -106,25 +117,45 @@ class StompProtocolHandler extends Proto
     }
   }
 
-
   private def queue = connection.dispatchQueue
 
   def setConnection(connection:BrokerConnection) = {
     this.connection = connection
+  }
 
-    // We will be using the default virtual host
+  def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
+
+  def start = {
+    info("start")
     connection.transport.suspendRead
     connection.broker.runtime.getDefaultVirtualHost(
       queue.wrap { (host)=>
+        info("got host.. resuming")
         this.host=host
         connection.transport.resumeRead
       }
     )
   }
 
-  def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
+  def stop = {
+    if( !closed ) {
+      info("stop")
+      closed=true;
+      if( producerRoute!=null ) {
+        host.router.disconnect(producerRoute)
+        producerRoute=null
+      }
+      if( consumer!=null ) {
+        host.router.unbind(consumer.dest, consumer::Nil)
+        consumer=null
+      }
+      connection.stop
+    }
+  }
+
 
   def onCommand(command:Any) = {
+    info("got command: %s", command)
     try {
       command match {
         case StompFrame(Commands.SEND, headers, content) =>
@@ -153,7 +184,6 @@ class StompProtocolHandler extends Proto
 
 
   def on_stomp_connect(headers:HeaderMap) = {
-    println("connected on: "+Thread.currentThread.getName);
     connection.transport.oneway(StompFrame(Responses.CONNECTED))
   }
 
@@ -257,22 +287,5 @@ class StompProtocolHandler extends Proto
     stop
   }
 
-  def start = {
-  }
-  
-  def stop = {
-    if( !closed ) {
-      closed=true;
-      if( producerRoute!=null ) {
-        host.router.disconnect(producerRoute)
-        producerRoute=null
-      }
-      if( consumer!=null ) {
-        host.router.unbind(consumer.dest, consumer::Nil)
-        consumer=null
-      }
-      connection.stop
-    }
-  }
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul  7 03:43:46 2010
@@ -36,7 +36,7 @@ object StompLoadClient {
   import StompLoadClient._
   implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
 
-  var producerSleep = 0;
+  var producerSleep = 1000*30;
   var consumerSleep = 0;
   var producers = 1;
   var consumers = 1;
@@ -46,7 +46,7 @@ object StompLoadClient {
   var messageSize = 1024;
   var useContentLength=true
 
-  var destinationType = "queue";
+  var destinationType = "topic";
   var destinationCount = 1;
 
   val producerCounter = new AtomicLong();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:43:46 2010
@@ -95,6 +95,7 @@ public class TcpTransport implements Tra
 
     public void connected(SocketChannel channel) {
         this.channel = channel;
+        this.remoteAddress = channel.socket().getRemoteSocketAddress().toString();
         this.socketState = CONNECTED;
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul  7 03:43:46 2010
@@ -120,7 +120,10 @@ public class TcpTransportServer implemen
             public void run() {
                 try {
                     SocketChannel client = channel.accept();
-                    handleSocket(client);
+                    while( client!=null ) {
+                        handleSocket(client);
+                        client = channel.accept();
+                    }
                 } catch (IOException e) {
                     listener.onAcceptError(e);
                 }