You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:43:46 UTC
svn commit: r961074 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-stomp/ activemq-stomp/src/main/resources/
activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ activ...
Author: chirino
Date: Wed Jul 7 03:43:46 2010
New Revision: 961074
URL: http://svn.apache.org/viewvc?rev=961074&view=rev
Log:
Better logging
Added:
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties
- copied, changed from r961073, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:43:46 2010
@@ -143,6 +143,7 @@ class Broker() extends Service with Logg
}
def onAccept(transport: Transport): Unit = {
+ debug("Accepted connection from: %s", transport.getRemoteAddress)
var connection = new BrokerConnection(Broker.this)
connection.transport = transport
clientConnections.add(connection)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:43:46 2010
@@ -26,9 +26,8 @@ import _root_.java.lang.{String}
import _root_.org.apache.activemq.util.{FactoryFinder, IOExceptionSupport}
import _root_.org.apache.activemq.wireformat.WireFormat
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-class ConnectionConfig {
+import java.util.concurrent.atomic.AtomicLong
-}
abstract class Connection() extends TransportListener with Service {
val dispatchQueue = createQueue("connection")
@@ -70,12 +69,19 @@ abstract class Connection() extends Tran
}
-object BrokerConnection extends Log
+object BrokerConnection extends Log {
+ val id_generator = new AtomicLong()
+}
class BrokerConnection(val broker: Broker) extends Connection with Logging {
+
override protected def log = BrokerConnection
+ override protected def log_map(message:String) = "connection:"+id+" | "+message
+
+ import BrokerConnection._
var protocolHandler: ProtocolHandler = null;
+ val id = id_generator.incrementAndGet
exceptionListener = new ExceptionListener() {
def exceptionThrown(error:Exception) = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala Wed Jul 7 03:43:46 2010
@@ -34,7 +34,7 @@ trait Destination {
def getDestinations():Seq[Destination]
}
-object Destination {
+object DestinationParser {
/**
* Parses a simple destination.
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul 7 03:43:46 2010
@@ -20,39 +20,192 @@ import _root_.java.util.{LinkedHashMap,
import _root_.java.lang.{Throwable, String}
import _root_.org.apache.commons.logging.LogFactory
import _root_.org.apache.commons.logging.{Log => Logger}
+import java.util.concurrent.atomic.AtomicLong
trait Log {
val log = LogFactory.getLog(getClass.getName)
}
+object Logging {
+ val exception_id_generator = new AtomicLong(System.currentTimeMillis)
+ def next_exception_id = exception_id_generator.incrementAndGet.toHexString
+}
+
/**
* A Logging trait you can mix into an implementation class without affecting its public API
*/
trait Logging {
+ import Logging._
protected def log: Log
+ protected def log_map(message:String) = message
- protected def error(message: => String): Unit = log.log.error(message)
-
- protected def error(e: Throwable): Unit = log.log.error(e.getMessage, e)
-
- protected def error(message: => String, e: Throwable): Unit = log.log.error(message, e)
-
- protected def warn(message: => String): Unit = log.log.warn(message)
-
- protected def warn(message: => String, e: Throwable): Unit = log.log.warn(message, e)
-
- protected def info(message: => String): Unit = log.log.info(message)
-
- protected def info(message: => String, e: Throwable): Unit = log.log.info(message, e)
-
- protected def debug(message: => String): Unit = log.log.debug(message)
-
- protected def debug(message: => String, e: Throwable): Unit = log.log.debug(message, e)
-
- protected def trace(message: => String): Unit = log.log.trace(message)
-
- protected def trace(message: => String, e: Throwable): Unit = log.log.trace(message, e)
+ protected def error(message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isErrorEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ l.error(log_map(m))
+ }
+ }
+
+ protected def error(e: Throwable, message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isErrorEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ val exception_id = next_exception_id
+ log.log.error(log_map(m)+" [id:"+exception_id+"]")
+ log.log.debug("[id:"+exception_id+"]", e)
+ }
+ }
+
+ protected def error(e: Throwable): Unit = {
+ val l = log.log
+ if( l.isErrorEnabled ) {
+ val exception_id = next_exception_id
+ log.log.error(log_map(e.getMessage)+" [id:"+exception_id+"]")
+ log.log.debug("[id:"+exception_id+"]", e)
+ }
+ }
+
+ protected def warn(message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isWarnEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ l.warn(log_map(m))
+ }
+ }
+
+ protected def warn(e: Throwable, message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isWarnEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ val exception_id = next_exception_id
+ log.log.warn(log_map(m)+" [id:"+exception_id+"]")
+ log.log.debug("[id:"+exception_id+"]", e)
+ }
+ }
+
+ protected def warn(e: Throwable): Unit = {
+ val l = log.log
+ if( l.isWarnEnabled ) {
+ val exception_id = next_exception_id
+ log.log.warn(log_map(e.getMessage)+" [id:"+exception_id+"]")
+ log.log.debug("[id:"+exception_id+"]", e)
+ }
+ }
+
+ protected def info(message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isInfoEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ l.info(log_map(m))
+ }
+ }
+
+ protected def info(e: Throwable, message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isInfoEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ val exception_id = next_exception_id
+ log.log.info(log_map(m)+" [id:"+exception_id+"]")
+ log.log.debug("[id:"+exception_id+"]", e)
+ }
+ }
+
+ protected def info(e: Throwable): Unit = {
+ val l = log.log
+ if( l.isInfoEnabled ) {
+ val exception_id = next_exception_id
+ log.log.info(log_map(e.getMessage)+" [id:"+exception_id+"]")
+ log.log.debug("[id:"+exception_id+"]", e)
+ }
+ }
+
+
+ protected def debug(message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isDebugEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ l.debug(log_map(m))
+ }
+ }
+
+ protected def debug(e: Throwable, message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isDebugEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ log.log.debug(log_map(m), e)
+ }
+ }
+
+ protected def debug(e: Throwable): Unit = {
+ val l = log.log
+ if( l.isDebugEnabled ) {
+ log.log.debug(log_map(e.getMessage), e)
+ }
+ }
+
+ protected def trace(message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isTraceEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ l.trace(log_map(m))
+ }
+ }
+
+ protected def trace(e: Throwable, message: => String, args:Any*): Unit = {
+ val l = log.log
+ if( l.isTraceEnabled ) {
+ val m = if( args.isEmpty ) {
+ message
+ } else {
+ format(message, args.map(_.asInstanceOf[AnyRef]) : _*)
+ }
+ log.log.trace(log_map(m), e)
+ }
+ }
+
+ protected def trace(e: Throwable): Unit = {
+ val l = log.log
+ if( l.isTraceEnabled ) {
+ log.log.trace(log_map(e.getMessage), e)
+ }
+ }
}
-
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml Wed Jul 7 03:43:46 2010
@@ -89,7 +89,7 @@
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <scope>test</scope>
+ <!--<scope>test</scope>-->
<version>${log4j-version}</version>
</dependency>
Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties (from r961073, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties&r1=961073&r2=961074&rev=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/resources/log4j.properties (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties Wed Jul 7 03:43:46 2010
@@ -19,13 +19,13 @@
# The logging properties used during tests..
#
log4j.rootLogger=WARN, console, file
-log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.apache.activemq=DEBUG
# Console will only display warnnings
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d | %-5p | %m%n
-log4j.appender.console.threshold=WARN
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=DEBUG
# File appender will contain all info messages
log4j.appender.file=org.apache.log4j.FileAppender
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul 7 03:43:46 2010
@@ -27,11 +27,12 @@ object StompBroker {
var port = 61613
def main(args:Array[String]) = {
- println("Starting stomp broker...")
+ val uri = "tcp://"+address+":"+port+"?wireFormat=multi"
+
+ println("Starting stomp broker: "+uri)
val broker = new Broker()
- val uri = "tcp://"+address+":"+port+"?wireFormat=multi"
val server = TransportFactory.bind(uri)
broker.transportServers.add(server)
broker.start
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:43:46 2010
@@ -51,11 +51,22 @@ object StompConstants {
import StompConstants._
-class StompProtocolHandler extends ProtocolHandler {
+object StompProtocolHandler extends Log
+
+class StompProtocolHandler extends ProtocolHandler with Logging {
+
+ override protected def log = StompProtocolHandler
+ override protected def log_map(message:String) = {
+ if( connection==null )
+ message
+ else
+ "connection:"+connection.id+" | "+message
+ }
class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
+
val queue = StompProtocolHandler.this.dispatchQueue
val session_manager = new DeliverySessionManager(outboundChannel, queue)
@@ -106,25 +117,45 @@ class StompProtocolHandler extends Proto
}
}
-
private def queue = connection.dispatchQueue
def setConnection(connection:BrokerConnection) = {
this.connection = connection
+ }
- // We will be using the default virtual host
+ def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
+
+ def start = {
+ info("start")
connection.transport.suspendRead
connection.broker.runtime.getDefaultVirtualHost(
queue.wrap { (host)=>
+ info("got host.. resuming")
this.host=host
connection.transport.resumeRead
}
)
}
- def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
+ def stop = {
+ if( !closed ) {
+ info("stop")
+ closed=true;
+ if( producerRoute!=null ) {
+ host.router.disconnect(producerRoute)
+ producerRoute=null
+ }
+ if( consumer!=null ) {
+ host.router.unbind(consumer.dest, consumer::Nil)
+ consumer=null
+ }
+ connection.stop
+ }
+ }
+
def onCommand(command:Any) = {
+ info("got command: %s", command)
try {
command match {
case StompFrame(Commands.SEND, headers, content) =>
@@ -153,7 +184,6 @@ class StompProtocolHandler extends Proto
def on_stomp_connect(headers:HeaderMap) = {
- println("connected on: "+Thread.currentThread.getName);
connection.transport.oneway(StompFrame(Responses.CONNECTED))
}
@@ -257,22 +287,5 @@ class StompProtocolHandler extends Proto
stop
}
- def start = {
- }
-
- def stop = {
- if( !closed ) {
- closed=true;
- if( producerRoute!=null ) {
- host.router.disconnect(producerRoute)
- producerRoute=null
- }
- if( consumer!=null ) {
- host.router.unbind(consumer.dest, consumer::Nil)
- consumer=null
- }
- connection.stop
- }
- }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul 7 03:43:46 2010
@@ -36,7 +36,7 @@ object StompLoadClient {
import StompLoadClient._
implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
- var producerSleep = 0;
+ var producerSleep = 1000*30;
var consumerSleep = 0;
var producers = 1;
var consumers = 1;
@@ -46,7 +46,7 @@ object StompLoadClient {
var messageSize = 1024;
var useContentLength=true
- var destinationType = "queue";
+ var destinationType = "topic";
var destinationCount = 1;
val producerCounter = new AtomicLong();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:43:46 2010
@@ -95,6 +95,7 @@ public class TcpTransport implements Tra
public void connected(SocketChannel channel) {
this.channel = channel;
+ this.remoteAddress = channel.socket().getRemoteSocketAddress().toString();
this.socketState = CONNECTED;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961074&r1=961073&r2=961074&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul 7 03:43:46 2010
@@ -120,7 +120,10 @@ public class TcpTransportServer implemen
public void run() {
try {
SocketChannel client = channel.accept();
- handleSocket(client);
+ while( client!=null ) {
+ handleSocket(client);
+ client = channel.accept();
+ }
} catch (IOException e) {
listener.onAcceptError(e);
}