You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:42:45 UTC

svn commit: r961071 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ activemq-jaxb/src/main/java/org/apache/activemq/a...

Author: chirino
Date: Wed Jul  7 03:42:44 2010
New Revision: 961071

URL: http://svn.apache.org/viewvc?rev=961071&view=rev
Log:
working on getting a stimple use case running

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
      - copied, changed from r961070, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (from r961070, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala&r1=961070&r2=961071&rev=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:42:44 2010
@@ -27,6 +27,7 @@ import _root_.org.fusesource.hawtdispatc
 import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
 
 import _root_.scala.collection.JavaConversions._
+import _root_.scala.reflect.BeanProperty
 
 object BrokerFactory {
 
@@ -94,56 +95,25 @@ class Broker() extends Service with Logg
   import BrokerConstants._
   override protected def log = BrokerConstants
 
-  class BrokerAcceptListener extends TransportAcceptListener {
-    def onAcceptError(error: Exception): Unit = {
-      warn("Accept error: " + error)
-      debug("Accept error details: ", error)
-    }
-
-    def onAccept(transport: Transport): Unit = {
-      var connection = new BrokerConnection(Broker.this)
-      connection.transport = transport
-      clientConnections.add(connection)
-      try {
-        connection.start
-      }
-      catch {
-        case e1: Exception => {
-          onAcceptError(e1)
-        }
-      }
-    }
-  }
-
-  val q = createQueue("broker");
-
-  var connectUris: List[String] = Nil
+  // The configuration state of the broker... It can be modified directly until the broker
+  // is started.
+  @BeanProperty
+  val connectUris: ArrayList[String] = new ArrayList[String]
+  @BeanProperty
   val virtualHosts: LinkedHashMap[AsciiBuffer, VirtualHost] = new LinkedHashMap[AsciiBuffer, VirtualHost]
+  @BeanProperty
   val transportServers: ArrayList[TransportServer] = new ArrayList[TransportServer]
-  val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
+  @BeanProperty
   var dataDirectory: File = null
-  var state = CONFIGURATION
+  @BeanProperty
   var name = "broker";
+  @BeanProperty
   var defaultVirtualHost: VirtualHost = null
 
-  def removeConnectUri(uri: String): Unit = ^ {
-    this.connectUris = this.connectUris.filterNot(_==uri)
-  } ->: q
-
-  def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
-    virtualHosts.get(name)
-  } ->: q
-
-  def getConnectUris(cb: (List[String]) => Unit) = callback(cb) {
-    connectUris
-  } ->: q
+  def start = runtime.start
+  def stop = runtime.stop
 
-
-  def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
-    defaultVirtualHost
-  } ->: q
-
-  def addVirtualHost(host: VirtualHost) = ^ {
+  def addVirtualHost(host: VirtualHost) = {
     if (host.names.isEmpty) {
       throw new IllegalArgumentException("Virtual host must be configured with at least one host name.")
     }
@@ -156,138 +126,127 @@ class Broker() extends Service with Logg
       virtualHosts.put(name, host)
     }
     if (defaultVirtualHost == null) {
-      setDefaultVirtualHost(host)
-    }
-  } ->: q
-
-  def addTransportServer(server: TransportServer) = ^ {
-    state match {
-      case RUNNING =>
-        start(server)
-      case CONFIGURATION =>
-        this.transportServers.add(server)
-      case _ =>
-        throw new IllegalStateException("Cannot add a transport server when broker is: " + state)
-    }
-  } ->: q
-
-  def removeTransportServer(server: TransportServer) = ^ {
-    state match {
-      case RUNNING =>
-        stopTransportServerWrapException(server)
-      case STOPPED =>
-        this.transportServers.remove(server)
-      case CONFIGURATION =>
-        this.transportServers.remove(server)
-      case _ =>
-        throw new IllegalStateException("Cannot add a transport server when broker is: " + state)
-    }
-  } ->: q
-
-
-  def getState(cb: (String) => Unit) = callback(cb) {state} ->: q
-
-
-  def addConnectUri(uri: String) = ^ {
-    this.connectUris = this.connectUris ::: uri::Nil 
-  } ->: q
-
-  def removeVirtualHost(host: VirtualHost) = ^ {
-    for (name <- host.names) {
-      virtualHosts.remove(name)
-    }
-    if (host == defaultVirtualHost) {
-      if (virtualHosts.isEmpty) {
-        defaultVirtualHost = null
-      }
-      else {
-        defaultVirtualHost = virtualHosts.values.iterator.next
-      }
+      defaultVirtualHost = host
     }
-  } ->: q
-
-  def setDefaultVirtualHost(defaultVirtualHost: VirtualHost) = ^ {
-    this.defaultVirtualHost = defaultVirtualHost
-  } ->: q
-
-  def getName(cb: (String) => Unit) = callback(cb) {
-    name;
-  } ->: q
-
-
-  private def start(server: TransportServer): Unit = {
-    server.setDispatchQueue(q)
-    server.setAcceptListener(new BrokerAcceptListener)
-    server.start
   }
 
-
-  final def stop: Unit = ^ {
-    if (state == RUNNING) {
-      state = STOPPING
-
-      for (server <- transportServers) {
-        stop(server)
-      }
-      for (connection <- clientConnections) {
-        stop(connection)
-      }
-      for (virtualHost <- virtualHosts.values) {
-        stop(virtualHost)
+  // Holds the runtime state of the broker all access should be serialized
+  // via a the dispatch queue and therefore all requests are setup to return
+  // results via callbacks.
+  object runtime {
+
+    class BrokerAcceptListener extends TransportAcceptListener {
+      def onAcceptError(error: Exception): Unit = {
+        error.printStackTrace
+        warn("Accept error: " + error)
+        debug("Accept error details: ", error)
+      }
+
+      def onAccept(transport: Transport): Unit = {
+        var connection = new BrokerConnection(Broker.this)
+        connection.transport = transport
+        clientConnections.add(connection)
+        try {
+          connection.start
+        }
+        catch {
+          case e1: Exception => {
+            onAcceptError(e1)
+          }
+        }
       }
-      state = STOPPED;
     }
 
-  } ->: q
-
-  def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
-    new ArrayList[VirtualHost](virtualHosts.values)
-  } ->: q
-
-  def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
-    new ArrayList[TransportServer](transportServers)
-  } ->: q
-
-
-
+    var state = CONFIGURATION
+    val dispatchQueue = createQueue("broker");
+    val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
+
+    def removeConnectUri(uri: String): Unit = ^ {
+      connectUris.remove(uri)
+    } ->: dispatchQueue
+
+    def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
+      virtualHosts.get(name)
+    } ->: dispatchQueue
+
+    def getConnectUris(cb: (ArrayList[String]) => Unit) = callback(cb) {
+      new ArrayList(connectUris)
+    } ->: dispatchQueue
+
+
+    def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
+      defaultVirtualHost
+    } ->: dispatchQueue
+
+    def addVirtualHost(host: VirtualHost) = ^ {
+      Broker.this.addVirtualHost(host)
+    } ->: dispatchQueue
+
+    def getState(cb: (String) => Unit) = callback(cb) {state} ->: dispatchQueue
+
+    def addConnectUri(uri: String) = ^ {
+      connectUris.add(uri)
+    } ->: dispatchQueue
+
+    def getName(cb: (String) => Unit) = callback(cb) {
+      name;
+    } ->: dispatchQueue
+
+    def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
+      new ArrayList[VirtualHost](virtualHosts.values)
+    } ->: dispatchQueue
+
+    def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
+      new ArrayList[TransportServer](transportServers)
+    } ->: dispatchQueue
+
+    def start = ^ {
+      if (state == CONFIGURATION) {
+        // We can apply defaults now
+        if (dataDirectory == null) {
+          dataDirectory = new File(IOHelper.getDefaultDataDirectory)
+        }
 
-  def start = ^ {
-    if (state == CONFIGURATION) {
-      // We can apply defaults now
-      if (dataDirectory == null) {
-        dataDirectory = new File(IOHelper.getDefaultDataDirectory)
-      }
+        if (defaultVirtualHost == null) {
+          defaultVirtualHost = new VirtualHost()
+          defaultVirtualHost.broker = Broker.this
+          defaultVirtualHost.names = DEFAULT_VIRTUAL_HOST_NAME.toString :: Nil
+          virtualHosts.put(DEFAULT_VIRTUAL_HOST_NAME, defaultVirtualHost)
+        }
 
-      if (defaultVirtualHost == null) {
-        defaultVirtualHost = new VirtualHost()
-        defaultVirtualHost.broker = Broker.this
-        defaultVirtualHost.names = DEFAULT_VIRTUAL_HOST_NAME.toString :: Nil
-        virtualHosts.put(DEFAULT_VIRTUAL_HOST_NAME, defaultVirtualHost)
-      }
+        state = STARTING
 
-      state = STARTING
+        for (virtualHost <- virtualHosts.values) {
+          virtualHost.start
+        }
+        for (server <- transportServers) {
+          server.setDispatchQueue(dispatchQueue)
+          server.setAcceptListener(new BrokerAcceptListener)
+          server.start
+        }
+        state = RUNNING
+      } else {
+        warn("Can only start a broker that is in the " + CONFIGURATION + " state.  Broker was " + state)
+      }
+    } ->: dispatchQueue
+
+    def stop: Unit = ^ {
+      if (state == RUNNING) {
+        state = STOPPING
 
-      for (virtualHost <- virtualHosts.values) {
-        virtualHost.start
-      }
-      for (server <- transportServers) {
-        start(server)
+        for (server <- transportServers) {
+          stopService(server)
+        }
+        for (connection <- clientConnections) {
+          stopService(connection)
+        }
+        for (virtualHost <- virtualHosts.values) {
+          stopService(virtualHost)
+        }
+        state = STOPPED;
       }
-      state = RUNNING
-    } else {
-      warn("Can only start a broker that is in the " + CONFIGURATION + " state.  Broker was " + state)
-    }
-  } ->: q
 
-  private def stopTransportServerWrapException(server: TransportServer): Unit = {
-    try {
-      server.stop
-    }
-    catch {
-      case e: Exception => {
-        throw new RuntimeException(e)
-      }
-    }
+    } ->: dispatchQueue
   }
 
 
@@ -295,7 +254,7 @@ class Broker() extends Service with Logg
    * Helper method to help stop broker services and log error if they fail to start.
    * @param server
    */
-  private def stop(server: Service): Unit = {
+  private def stopService(server: Service): Unit = {
     try {
       server.stop
     } catch {
@@ -339,16 +298,16 @@ object Queue {
  */
 class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer {
 
-
-
   override val queue:DispatchQueue = createQueue("queue:"+destination);
   queue.setTargetQueue(getRandomThreadQueue)
   setDisposer(^{
     queue.release
   })
 
-
   val delivery_buffer  = new DeliveryBuffer
+  delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
+
+  val delivery_sessions = new DeliveryCreditBufferProtocol(delivery_buffer, queue)
 
   class ConsumerState(val consumer:DeliverySession) {
     var bound=true
@@ -381,7 +340,7 @@ class Queue(val destination:Destination)
         allConsumers += consumer->cs
         readyConsumers.addLast(cs)
       }
-      delivery_buffer.eventHandler.run
+      drain_delivery_buffer
     } ->: queue
 
   def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
@@ -407,22 +366,22 @@ class Queue(val destination:Destination)
   }
 
 
-  delivery_buffer.eventHandler = ^{
-    while( !readyConsumers.isEmpty && !delivery_buffer.isEmpty ) {
+  def drain_delivery_buffer: Unit = {
+    while (!readyConsumers.isEmpty && !delivery_buffer.isEmpty) {
       val cs = readyConsumers.removeFirst
       val delivery = delivery_buffer.receive
       cs.deliver(delivery)
     }
   }
 
-
-  val deliveryQueue = new DeliveryCreditBufferProtocol(delivery_buffer, queue)
   def open_session(producer_queue:DispatchQueue) = new DeliverySession {
-    val session = deliveryQueue.session(producer_queue)
+
+    val session = delivery_sessions.session(producer_queue)
     val consumer = Queue.this
     retain
 
     def deliver(delivery:Delivery) = session.send(delivery)
+
     def close = {
       session.close
       release

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:42:44 2010
@@ -38,29 +38,28 @@ abstract class Connection() extends Tran
   var transport:Transport = null
   var exceptionListener:ExceptionListener = null;
 
-  def start() = {
+  def start() = ^{
     transport.setDispatchQueue(dispatchQueue);
-    transport.getDispatchQueue.release
-    transport.setTransportListener(this);
+    transport.setTransportListener(Connection.this);
     transport.start()
-  }
+  } ->: dispatchQueue
 
-  def stop() = {
+  def stop() = ^{
     stopping=true
     transport.stop()
     dispatchQueue.release
-  }
+  } ->: dispatchQueue
 
   def onException(error:IOException) = {
-      if (!stopping) {
-          onFailure(error);
-      }
+    if (!stopping) {
+        onFailure(error);
+    }
   }
 
   def onFailure(error:Exception) = {
-      if (exceptionListener != null) {
-          exceptionListener.exceptionThrown(error);
-      }
+    if (exceptionListener != null) {
+        exceptionListener.exceptionThrown(error);
+    }
   }
 
   def onDisconnected() = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala Wed Jul  7 03:42:44 2010
@@ -89,7 +89,7 @@ object Destination {
     }
 }
 
-class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
+case class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
 
   def getDestinations():Seq[Destination] = null;
   def getDomain():AsciiBuffer = domain
@@ -97,8 +97,7 @@ class SingleDestination(var domain:Ascii
 
   override def toString() = ""+domain+":"+name
 }
-
-class MultiDestination(var destinations:List[Destination]=Nil) extends Destination {
+case class MultiDestination(var destinations:List[Destination]=Nil) extends Destination {
 
   def getDestinations():Seq[Destination] = destinations;
   def getDomain():AsciiBuffer = null

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala Wed Jul  7 03:42:44 2010
@@ -18,15 +18,10 @@ package org.apache.activemq.apollo.broke
 
 import _root_.java.util.{LinkedHashMap, HashMap}
 
-class TransactionManagerConfig {
-
-  def apply(host:VirtualHost): TransactionManager = {
-    new TransactionManager(host, this);
-  }
-}
+class TransactionManager() {
 
+  var virtualHost:VirtualHost = null
 
-class TransactionManager(val virtualHost:VirtualHost, config:TransactionManagerConfig) {
 // TODO:
 //    private static final Log LOG = LogFactory.getLog(TransactionManager.class);
 //    private static final String TX_QUEUE_PREFIX = "TX-";

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 03:42:44 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.apollo.broker;
 
 import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.broker.store.memory.MemoryStore
 import _root_.org.apache.activemq.broker.store.{Store}
 import _root_.org.apache.activemq.Service
 import _root_.java.lang.{String}
@@ -50,9 +51,9 @@ class VirtualHost() extends Service with
   }
 
   @BeanProperty
-  var database:BrokerDatabase = null
+  var database:BrokerDatabase = new BrokerDatabase
   @BeanProperty
-  var txnManager:TransactionManager = null
+  var transactionManager:TransactionManager = new TransactionManager
 
 
   def start():Unit = {
@@ -60,6 +61,7 @@ class VirtualHost() extends Service with
           return;
       }
 
+      database.virtualHost = this
       database.start();
 
 //      router.setDatabase(database);
@@ -84,7 +86,8 @@ class VirtualHost() extends Service with
 //        }
 
       //Recover transactions:
-      txnManager.loadTransactions();
+      transactionManager.virtualHost = this
+      transactionManager.loadTransactions();
       started = true;
   }
 
@@ -212,7 +215,7 @@ class VirtualHost() extends Service with
 class BrokerDatabase() extends Service {
 
   @BeanProperty
-  var store:Store=null;
+  var store:Store=new MemoryStore;
 
   @BeanProperty
   var virtualHost:VirtualHost=null;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul  7 03:42:44 2010
@@ -32,6 +32,7 @@ import _root_.org.junit.{Test, Before}
 import org.apache.activemq.transport.TransportFactory
 
 import _root_.scala.collection.JavaConversions._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 
 abstract class RemoteConsumer extends Connection {
@@ -48,6 +49,10 @@ abstract class RemoteConsumer extends Co
     totalConsumerRate.add(consumerRate);
     transport = TransportFactory.connect(uri);
     super.start();
+  }
+
+
+  override def onConnected() = {
     setupSubscription();
   }
 
@@ -69,6 +74,7 @@ abstract class RemoteProducer extends Co
   var property: String = null
   var totalProducerRate: MetricAggregator = null
   var next: Delivery = null
+  var thinkTime: Long = 0
 
   var filler: String = null
   var payloadSize = 20
@@ -89,10 +95,13 @@ abstract class RemoteProducer extends Co
 
     transport = TransportFactory.connect(uri);
     super.start();
-    setupProducer();
 
   }
 
+  override def onConnected() = {
+    setupProducer();
+  }
+
   def setupProducer()
 
 def createPayload(): String = {
@@ -119,7 +128,7 @@ def createPayload(): String = {
 }
 
 object BaseBrokerPerfTest {
-  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3000000"))
   var IO_WORK_AMOUNT = 0
   var FANIN_COUNT = 10
   var FANOUT_COUNT = 10
@@ -229,6 +238,7 @@ abstract class BaseBrokerPerfTest {
     consumerCount = 1;
 
     createConnections();
+    producers.get(0).thinkTime = 50;
 
     // Start 'em up.
     startClients();
@@ -573,8 +583,6 @@ abstract class BaseBrokerPerfTest {
     val broker = new Broker()
     broker.transportServers.add(TransportFactory.bind(new URI(bindURI)))
     broker.connectUris.add(connectUri)
-    //     TODO:
-    //    broker.defaultVirtualHost.setStore(createStore(broker))
     broker
   }
 
@@ -609,14 +617,15 @@ abstract class BaseBrokerPerfTest {
   }
 
   private def startClients() = {
-
-    for (connection <- consumers) {
-      connection.start();
-    }
-
-    for (connection <- producers) {
-      connection.start();
-    }
+    // Start the clients after a delay to give the server a chance to startup.
+    getGlobalQueue.dispatchAfter(200, TimeUnit.MILLISECONDS, ^{
+      for (connection <- consumers) {
+        connection.start();
+      }
+      for (connection <- producers) {
+        connection.start();
+      }
+    })
   }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java Wed Jul  7 03:42:44 2010
@@ -57,10 +57,10 @@ public class BrokerXml {
 			} catch (Exception e) {
 				throw new Exception("Unable to bind transport server '"+element+" due to: "+e.getMessage(), e);
 			}
-			rc.addTransportServer(server);
+			rc.transportServers().add(server);
 		}
 		for (String element : connectUris) {
-			rc.addConnectUri(element);
+			rc.connectUris().add(element);
 		}
 		
 		return rc;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 03:42:44 2010
@@ -20,6 +20,7 @@ import _root_.java.util.LinkedList
 import _root_.org.apache.activemq.apollo.broker.{BufferConversions, Destination, Message}
 import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
 import _root_.org.apache.activemq.util.buffer._
+import collection.mutable.ListBuffer
 
 /**
  *
@@ -27,6 +28,7 @@ import _root_.org.apache.activemq.util.b
  */
 object StompFrameConstants {
   type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
+  type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)]
   var NO_DATA = new Buffer(0);
 }
 
@@ -141,6 +143,7 @@ case class StompFrame(action:AsciiBuffer
         destination = value
       case (Stomp.Headers.Message.EXPIRATION_TIME, value) =>
         expiration = java.lang.Long.parseLong(value)
+      case _ =>
     }
   }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:42:44 2010
@@ -1,5 +1,5 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
@@ -16,14 +16,14 @@
  */
 package org.apache.activemq.apollo.stomp
 
+import _root_.java.io.{DataOutput, DataInput, EOFException, IOException}
+import _root_.java.nio.channels.{ReadableByteChannel, SocketChannel}
 import _root_.java.util.{LinkedList, ArrayList}
 import _root_.org.apache.activemq.apollo.broker._
 
-import _root_.org.apache.activemq.wireformat.WireFormat
+import _root_.org.apache.activemq.wireformat.{WireFormatFactory, WireFormat}
 import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
-import java.nio.channels.{SocketChannel}
 import java.nio.ByteBuffer
-import java.io.{EOFException, IOException}
 import _root_.org.apache.activemq.util.buffer._
 import collection.mutable.{ListBuffer, HashMap}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
@@ -101,9 +101,11 @@ class StompProtocolHandler extends Proto
     this.connection = connection
 
     // We will be using the default virtual host
-    connection.broker.getDefaultVirtualHost(
+    connection.transport.suspendRead
+    connection.broker.runtime.getDefaultVirtualHost(
       queue.wrap { (host)=>
         this.host=host
+        connection.transport.resumeRead
       }
     )
   }
@@ -111,20 +113,24 @@ class StompProtocolHandler extends Proto
   def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
 
   def onCommand(command:Any) = {
-    val frame = command.asInstanceOf[StompFrame]
-    frame match {
-      case StompFrame(Commands.CONNECT, headers, _) =>
-        on_stomp_connect(headers)
+    command match {
       case StompFrame(Commands.SEND, headers, content) =>
-        on_stomp_send(frame)
-      case StompFrame(Commands.SUBSCRIBE, headers, content) =>
-        on_stomp_subscribe(headers)
+        on_stomp_send(command.asInstanceOf[StompFrame])
       case StompFrame(Commands.ACK, headers, content) =>
         // TODO:
+      case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+        on_stomp_subscribe(headers)
+      case StompFrame(Commands.CONNECT, headers, _) =>
+        on_stomp_connect(headers)
       case StompFrame(Commands.DISCONNECT, headers, content) =>
         stop
+      case s:StompWireFormat =>
+        // this is passed on to us by the protocol discriminator
+        // so we know which wire format is being used.
       case StompFrame(unknown, _, _) =>
         die("Unsupported STOMP command: "+unknown);
+      case _ =>
+        die("Unsupported command: "+command);
     }
   }
 
@@ -230,6 +236,7 @@ class StompProtocolHandler extends Proto
 
   def onException(error:Exception) = {
     println("Shutting connection down due to: "+error)
+    error.printStackTrace
     stop
   }
 
@@ -252,6 +259,28 @@ class StompProtocolHandler extends Proto
   }
 }
 
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
+ */
+class StompWireFormatFactory extends WireFormatFactory {
+  import Stomp.Commands.CONNECT
+
+    def createWireFormat() = new StompWireFormat();
+
+    def isDiscriminatable() = true
+
+    def maxWireformatHeaderLength() = CONNECT.length+10;
+
+    def matchesWireformatHeader(header:Buffer) = {
+        if( header.length < CONNECT.length) {
+          false
+        } else {
+          // the magic can be preceded with newlines / whitespace..
+          header.trimFront.startsWith(CONNECT);
+        }
+    }
+}
+
 object StompWireFormat {
     val READ_BUFFFER_SIZE = 1024*64;
     val MAX_COMMAND_LENGTH = 1024;
@@ -262,7 +291,7 @@ object StompWireFormat {
     val SIZE_CHECK=false
   }
 
-class StompWireFormat {
+class StompWireFormat extends WireFormat {
   import StompWireFormat._
 
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
@@ -270,48 +299,10 @@ class StompWireFormat {
     ByteBuffer.wrap(Array(x));
   }
 
-  var outbound_frame: ByteBuffer = null
-  /**
-   * @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
-   */
-  def drain_source(socket:SocketChannel)(source: =>StompFrame ):Boolean = {
-    while(true) {
-      // if we have a pending frame that is being sent over the socket...
-      if( outbound_frame!=null ) {
-        socket.write(outbound_frame)
-        if( outbound_frame.remaining != 0 ) {
-          // non blocking socket returned before the buffers were fully written to disk..
-          // we are not yet fully drained.. but need to quit now.
-          return false
-        } else {
-          outbound_frame = null
-        }
-      } else {
-
-        // marshall all the available frames..
-        val buffer = new ByteArrayOutputStream()
-        var frame = source
-        while( frame!=null ) {
-          marshall(buffer, frame)
-          frame = source
-        }
-
-
-        if( buffer.size() ==0 ) {
-          // the source is now drained...
-          return true
-        } else {
-          val b = buffer.toBuffer;
-          outbound_frame = ByteBuffer.wrap(b.data, b.offset, b.length)
-        }
-      }
-    }
-    true
-  }
-
-  def marshall(buffer:ByteArrayOutputStream, frame:StompFrame) = {
-    buffer.write(frame.action)
-    buffer.write(NEWLINE)
+  def marshal(command:Any, os:DataOutput) = {
+    val frame = command.asInstanceOf[StompFrame]
+    frame.action.writeTo(os)
+    os.write(NEWLINE)
 
     // we can optimize a little if the headers and content are in the same buffer..
     if( !frame.headers.isEmpty && !frame.content.isEmpty &&
@@ -321,224 +312,218 @@ class StompWireFormat {
       val buffer1 = frame.headers.head._1;
       val buffer2 = frame.content;
       val length = (buffer2.offset-buffer1.offset)+buffer2.length
-      buffer.write( buffer1.data, offset, length)
+      os.write( buffer1.data, offset, length)
 
     } else {
       for( (key, value) <- frame.headers ) {
-        buffer.write(key)
-        buffer.write(SEPERATOR)
-        buffer.write(value)
-        buffer.write(NEWLINE)
+        key.writeTo(os)
+        os.write(SEPERATOR)
+        value.writeTo(os)
+        os.write(NEWLINE)
       }
-
-      buffer.write(NEWLINE)
-      buffer.write(frame.content)
+      os.write(NEWLINE)
+      frame.content.writeTo(os)
     }
-    buffer.write(END_OF_FRAME_BUFFER)
+    END_OF_FRAME_BUFFER.writeTo(os)
   }
 
+  def marshal(command:Any):Buffer= {
+    val frame = command.asInstanceOf[StompFrame]
+    // make a little bigger since size can be an estimate and we want to avoid
+    // a capacity re-size.
+    val os = new DataByteArrayOutputStream(frame.size + 100);
+    marshal(frame, os)
+    os.toBuffer
+  }
 
-  var read_pos = 0
-  var read_offset = 0
-  var read_data:Array[Byte] = new Array[Byte](READ_BUFFFER_SIZE)
-  var read_bytebuffer:ByteBuffer = ByteBuffer.wrap(read_data)
+  def unmarshal(packet:Buffer) = {
+    throw new UnsupportedOperationException
+  }
+  def unmarshal(in: DataInput):Object = {
+    throw new UnsupportedOperationException
+  }
 
-  def drain_socket(socket:SocketChannel)(handler:(StompFrame)=>Boolean) = {
-    var done = false
+  def getName() = "stomp"
 
-    // keep going until the socket buffer is drained.
-    while( !done ) {
-      val frame = unmarshall()
-      if( frame!=null ) {
-        // the handler might want us to stop looping..
-        done = handler(frame)
-      } else {
+  def getWireFormatFactory() = new StompWireFormatFactory
 
-        // do we need to read in more data???
-        if( read_pos==read_bytebuffer.position ) {
+  //
+  // state associated with un-marshalling stomp frames from
+  // a non-blocking NIO channel.
+  //
+  def createUnmarshalSession() = new StompUnmarshalSession
 
-          // do we need a new data buffer to read data into??
-          if(read_bytebuffer.remaining==0) {
+  class StompUnmarshalSession extends UnmarshalSession {
 
-            // The capacity needed grows by powers of 2...
-            val new_capacity = if( read_offset != 0 ) { READ_BUFFFER_SIZE } else { read_data.length << 2 }
-            val tmp_buffer = new Array[Byte](new_capacity)
-
-            // If there was un-consummed data.. copy it over...
-            val size = read_pos - read_offset
-            if( size > 0 ) {
-              System.arraycopy(read_data, read_offset, tmp_buffer, 0, size)
-            }
-            read_data = tmp_buffer
-            read_bytebuffer = ByteBuffer.wrap(read_data)
-            read_bytebuffer.position(size)
-            read_offset = 0;
-            read_pos = size
+    type FrameReader = (ByteBuffer)=>StompFrame
 
-          }
+    var next_action:FrameReader = read_action
+    var end = 0
+    var start = 0
 
-          // Try to fill the buffer with data from the nio socket..
-          var p = read_bytebuffer.position
-          if( socket.read(read_bytebuffer) == -1 ) {
-            throw new EOFException();
-          }
-          // we are done if there was no data on the socket.
-          done = read_bytebuffer.position==p
-        }
+    def getStartPos() = start
+    def setStartPos(pos:Int):Unit = {start=pos}
+
+    def getEndPos() = end
+    def setEndPos(pos:Int):Unit = { end = pos }
+
+    def unmarshal(buffer:ByteBuffer):Object = {
+      // keep running the next action until
+      // a frame is decoded or we run out of input
+      var rc:StompFrame = null
+      while( rc == null && end!=buffer.position ) {
+        rc = next_action(buffer)
       }
+      rc
     }
-  }
 
+    def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
+        val read_limit = buffer.position
+        while( end < read_limit ) {
+          if( buffer.array()(end) =='\n') {
+            var rc = new Buffer(buffer.array, start, end-start)
+            end += 1;
+            start = end;
+            return rc
+          }
+          if (SIZE_CHECK && end-start > maxLength) {
+              throw new IOException(errorMessage);
+          }
+          end += 1;
+        }
+        return null;
+    }
 
-  type FrameReader = ()=>StompFrame
-  var unmarshall:FrameReader = read_action
-
-  def read_line( maxLength:Int, errorMessage:String):Buffer = {
-      val read_limit = read_bytebuffer.position
-      while( read_pos < read_limit ) {
-        if( read_data(read_pos) =='\n') {
-          var rc = new Buffer(read_data, read_offset, read_pos-read_offset)
-          read_pos += 1;
-          read_offset = read_pos;
-          return rc
+    def read_action:FrameReader = (buffer)=> {
+      val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+      if( line !=null ) {
+        var action = line
+        if( TRIM ) {
+            action = action.trim();
         }
-        if (SIZE_CHECK && read_pos-read_offset > maxLength) {
-            throw new IOException(errorMessage);
+        if (action.length() > 0) {
+            next_action = read_headers(action)
         }
-        read_pos += 1;
-      }
-      return null;
-  }
-
-
-  def read_action:FrameReader = ()=> {
-    val line = read_line(MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
-    if( line !=null ) {
-      var action = line
-      if( TRIM ) {
-          action = action.trim();
-      }
-      if (action.length() > 0) {
-          unmarshall = read_headers(action)
       }
+      null
     }
-    null
-  }
-
-  def read_headers(action:Buffer, headers:HeaderMap=Nil):FrameReader = ()=> {
-    val line = read_line(MAX_HEADER_LENGTH, "The maximum header length was exceeded")
-    if( line !=null ) {
-      if( line.trim().length() > 0 ) {
 
-        if (SIZE_CHECK && headers.size > MAX_HEADERS) {
-            throw new IOException("The maximum number of headers was exceeded");
-        }
+    def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
+      val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+      if( line !=null ) {
+        if( line.trim().length() > 0 ) {
 
-        try {
-            val seperatorIndex = line.indexOf(SEPERATOR);
-            if( seperatorIndex<0 ) {
-                throw new IOException("Header line missing seperator [" + ascii(line) + "]");
-            }
-            var name = line.slice(0, seperatorIndex);
-            if( TRIM ) {
-                name = name.trim();
-            }
-            var value = line.slice(seperatorIndex + 1, line.length());
-            if( TRIM ) {
-                value = value.trim();
-            }
-            headers.add((ascii(name), ascii(value)));
-        } catch {
-            case e:Exception=>
-              throw new IOException("Unable to parser header line [" + line + "]");
-        }
+          if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+              throw new IOException("The maximum number of headers was exceeded");
+          }
 
-      } else {
-        val contentLength = get(headers, CONTENT_LENGTH)
-        if (contentLength.isDefined) {
-          // Bless the client, he's telling us how much data to read in.
-          var length=0;
           try {
-              length = Integer.parseInt(contentLength.get.trim().toString());
+              val seperatorIndex = line.indexOf(SEPERATOR);
+              if( seperatorIndex<0 ) {
+                  throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+              }
+              var name = line.slice(0, seperatorIndex);
+              if( TRIM ) {
+                  name = name.trim();
+              }
+              var value = line.slice(seperatorIndex + 1, line.length());
+              if( TRIM ) {
+                  value = value.trim();
+              }
+              headers.add((ascii(name), ascii(value)))
           } catch {
-            case e:NumberFormatException=>
-              throw new IOException("Specified content-length is not a valid integer");
+              case e:Exception=>
+                e.printStackTrace
+                throw new IOException("Unable to parser header line [" + line + "]");
           }
 
-          if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
-              throw new IOException("The maximum data length was exceeded");
-          }
-          unmarshall = read_binary_body(action, headers, length)
-
         } else {
-          unmarshall = read_text_body(action, headers)
+          val contentLength = get(headers, CONTENT_LENGTH)
+          if (contentLength.isDefined) {
+            // Bless the client, he's telling us how much data to read in.
+            var length=0;
+            try {
+                length = Integer.parseInt(contentLength.get.trim().toString());
+            } catch {
+              case e:NumberFormatException=>
+                throw new IOException("Specified content-length is not a valid integer");
+            }
+
+            if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+                throw new IOException("The maximum data length was exceeded");
+            }
+            next_action = read_binary_body(action, headers, length)
+
+          } else {
+            next_action = read_text_body(action, headers)
+          }
         }
       }
+      null
     }
-    null
-  }
 
-  def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
-    val i = headers.iterator
-    while( i.hasNext ) {
-      val entry = i.next
-      if( entry._1 == name ) {
-        return Some(entry._2)
+    def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
+      val i = headers.iterator
+      while( i.hasNext ) {
+        val entry = i.next
+        if( entry._1 == name ) {
+          return Some(entry._2)
+        }
       }
+      None
     }
-    None
-  }
 
 
-  def read_binary_body(action:Buffer, headers:HeaderMap, contentLength:Int):FrameReader = ()=> {
-    val content:Buffer=read_content(contentLength)
-    if( content != null ) {
-      unmarshall = read_action
-      new StompFrame(ascii(action), headers, content)
-    } else {
-      null
+    def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
+      val content:Buffer=read_content(buffer, contentLength)
+      if( content != null ) {
+        next_action = read_action
+        new StompFrame(ascii(action), headers.toList, content)
+      } else {
+        null
+      }
     }
-  }
 
 
-  def read_content(contentLength:Int):Buffer = {
-      val read_limit = read_bytebuffer.position
-      if( (read_limit-read_offset) < contentLength+1 ) {
-        read_pos = read_limit;
-        null
-      } else {
-        if( read_data(read_offset+contentLength)!= 0 ) {
-           throw new IOException("Exected null termintor after "+contentLength+" content bytes");
+    def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
+        val read_limit = buffer.position
+        if( (read_limit-start) < contentLength+1 ) {
+          end = read_limit;
+          null
+        } else {
+          if( buffer.array()(start+contentLength)!= 0 ) {
+             throw new IOException("Exected null termintor after "+contentLength+" content bytes");
+          }
+          var rc = new Buffer(buffer.array, start, contentLength)
+          end = start+contentLength+1;
+          start = end;
+          rc;
         }
-        var rc = new Buffer(read_data, read_offset, contentLength)
-        read_pos = read_offset+contentLength+1;
-        read_offset = read_pos;
-        rc;
-      }
-  }
+    }
 
-  def read_to_null():Buffer = {
-      val read_limit = read_bytebuffer.position
-      while( read_pos < read_limit ) {
-        if( read_data(read_pos) ==0) {
-          var rc = new Buffer(read_data, read_offset, read_pos-read_offset)
-          read_pos += 1;
-          read_offset = read_pos;
-          return rc;
+    def read_to_null(buffer:ByteBuffer):Buffer = {
+        val read_limit = buffer.position
+        while( end < read_limit ) {
+          if( buffer.array()(end) ==0) {
+            var rc = new Buffer(buffer.array, start, end-start)
+            end += 1;
+            start = end;
+            return rc;
+          }
+          end += 1;
         }
-        read_pos += 1;
-      }
-      return null;
-  }
+        return null;
+    }
 
 
-  def read_text_body(action:Buffer, headers:HeaderMap):FrameReader = ()=> {
-    val content:Buffer=read_to_null
-    if( content != null ) {
-      unmarshall = read_action
-      new StompFrame(ascii(action), headers, content)
-    } else {
-      null
+    def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
+      val content:Buffer=read_to_null(buffer)
+      if( content != null ) {
+        next_action = read_action
+        new StompFrame(ascii(action), headers.toList, content)
+      } else {
+        null
+      }
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:42:44 2010
@@ -30,6 +30,13 @@ import Stomp._
 import _root_.org.apache.activemq.apollo.stomp.StompFrame
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
+object StompBrokerPerfTest {
+  def main(args:Array[String]) = {
+    val test = new StompBrokerPerfTest();
+    test.setUp
+    test.benchmark_1_1_1
+  }
+}
 class StompBrokerPerfTest extends BaseBrokerPerfTest {
 
     override def createProducer() =  new StompRemoteProducer()
@@ -65,6 +72,8 @@ class StompRemoteConsumer extends Remote
         case StompFrame(Responses.CONNECTED, headers, _) =>
         case StompFrame(Responses.MESSAGE, headers, content) =>
           messageReceived();
+        case StompFrame(Responses.ERROR, headers, content) =>
+          onFailure(new Exception("Server reported an error: " + frame.content));
         case _ =>
           onFailure(new Exception("Unexpected stomp command: " + frame.action));
       }
@@ -93,20 +102,27 @@ class StompRemoteProducer extends Remote
     val send_next:CompletionCallback = new CompletionCallback() {
       def onCompletion() = {
         rate.increment();
-        if( !stopping ) {
+        val task = ^ {
+          if( !stopping ) {
 
-          var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-          headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
-          if (property != null) {
-              headers ::= (ascii(property), ascii(property));
-          }
+            var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+            headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+            if (property != null) {
+                headers ::= (ascii(property), ascii(property));
+            }
 //          var p = this.priority;
 //          if (priorityMod > 0) {
 //              p = if ((counter % priorityMod) == 0) { 0 } else { priority }
 //          }
 
-          var content = ascii(createPayload());
-          transport.oneway(StompFrame(Stomp.Commands.SEND, headers, content), send_next)
+            var content = ascii(createPayload());
+            transport.oneway(StompFrame(Stomp.Commands.SEND, headers, content), send_next)
+          }
+        } 
+        if( thinkTime > 0 ) {
+          dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
+        } else {
+          dispatchQueue << task
         }
       }
       def onFailure(error:Throwable) = {
@@ -128,6 +144,8 @@ class StompRemoteProducer extends Remote
       var frame = command.asInstanceOf[StompFrame]
       frame match {
         case StompFrame(Responses.CONNECTED, headers, _) =>
+        case StompFrame(Responses.ERROR, headers, content) =>
+          onFailure(new Exception("Server reported an error: " + frame.content.utf8));
         case _ =>
           onFailure(new Exception("Unexpected stomp command: " + frame.action));
       }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:42:44 2010
@@ -26,6 +26,7 @@ import org.fusesource.hawtdispatch.Dispa
 import org.fusesource.hawtdispatch.DispatchQueue;
 import org.fusesource.hawtdispatch.DispatchSource;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.*;
 import java.nio.ByteBuffer;
@@ -44,6 +45,7 @@ import static org.apache.activemq.transp
  */
 public class TcpTransport implements Transport {
     private Map<String, Object> socketOptions;
+    private WireFormat.UnmarshalSession unmarshalSession;
 
     enum SocketState {
         CONNECTING,
@@ -77,6 +79,10 @@ public class TcpTransport implements Tra
     ByteBuffer outbound_frame;
     protected boolean useLocalHost = true;
 
+    int READ_BUFFFER_SIZE = 1024*32;
+    ByteBuffer readBuffer = ByteBuffer.allocate(1024*32);
+
+
     static final class OneWay {
         final Buffer buffer;
         final CompletionCallback callback;
@@ -114,6 +120,8 @@ public class TcpTransport implements Tra
     }
 
     public void start() throws Exception {
+        assert Dispatch.getCurrentQueue() == dispatchQueue;
+
         if (dispatchQueue == null) {
             throw new IllegalArgumentException("dispatchQueue is not set");
         }
@@ -124,6 +132,8 @@ public class TcpTransport implements Tra
             throw new IllegalStateException("can only be started from the created stae");
         }
         transportState=RUNNING;
+        
+        unmarshalSession = wireformat.createUnmarshalSession();
 
         if( socketState == CONNECTING ) {
             channel = SocketChannel.open();
@@ -183,7 +193,11 @@ public class TcpTransport implements Tra
         readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
         readSource.setEventHandler(new Runnable(){
             public void run() {
-                drainInbound();
+                try {
+                    drainInbound();
+                } catch (IOException e) {
+                    listener.onException(e);
+                }
             }
         });
 
@@ -207,6 +221,8 @@ public class TcpTransport implements Tra
 
 
     public void stop() throws Exception {
+        assert Dispatch.getCurrentQueue() == dispatchQueue;
+
         if( readSource!=null ) {
             readSource.release();
             readSource = null;
@@ -225,6 +241,7 @@ public class TcpTransport implements Tra
     }
 
     public void oneway(Object command, CompletionCallback callback) {
+        assert Dispatch.getCurrentQueue() == dispatchQueue;
         try {
             if( socketState != CONNECTED ) {
                 throw new IllegalStateException("Not connected.");
@@ -307,11 +324,56 @@ public class TcpTransport implements Tra
         return outbound.isEmpty() && outbound_frame==null;
     }
 
-    private void drainInbound() {
-        Object command = null;
-        // the transport may be suspended after processing a command.
-        while( !readSource.isSuspended() && (command=wireformat.unmarshal(channel))!=null ) {
+
+    private void drainInbound() throws IOException {
+        if( transportState==DISPOSED || readSource.isSuspended() ) {
+            return;
+        }
+        while( true ) {
+
+            // do we need to read in more data???
+            if (unmarshalSession.getEndPos() == readBuffer.position()) {
+
+                // do we need a new data buffer to read data into??
+                if (readBuffer.remaining() == 0) {
+
+                    // double the capacity size if needed...
+                    int new_capacity = unmarshalSession.getStartPos() != 0 ? READ_BUFFFER_SIZE : readBuffer.capacity() << 2;
+                    byte[] new_buffer = new byte[new_capacity];
+
+                    // If there was un-consummed data.. move it to the start of the new buffer.
+                    int size = unmarshalSession.getEndPos() - unmarshalSession.getStartPos();
+                    if (size > 0) {
+                        System.arraycopy(readBuffer.array(), unmarshalSession.getStartPos(), new_buffer, 0, size);
+                    }
+
+                    readBuffer = ByteBuffer.wrap(new_buffer);
+                    readBuffer.position(size);
+                    unmarshalSession.setStartPos(0);
+                    unmarshalSession.setEndPos(size);
+                }
+
+                // Try to fill the buffer with data from the socket..
+                int p = readBuffer.position();
+                int count = channel.read(readBuffer);
+                if (count == -1) {
+                    throw new EOFException();
+                } else if (count == 0) {
+                    return;
+                }
+            }
+
+            Object command=unmarshalSession.unmarshal(readBuffer);
+            if( command==null ) {
+                return;
+            }
+
             listener.onCommand(command);
+
+            // the transport may be suspended after processing a command.
+            if( transportState==DISPOSED || readSource.isSuspended() ) {
+                return;
+            }
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul  7 03:42:44 2010
@@ -21,8 +21,6 @@ import org.apache.activemq.transport.Tra
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.ServiceSupport;
-import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.hawtdispatch.DispatchQueue;
@@ -87,30 +85,6 @@ public class TcpTransportServer implemen
     }
 
     public void start() throws IOException {
-        bind();
-        acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue);
-        acceptSource.setEventHandler(new Runnable() {
-            public void run() {
-                try {
-                    SocketChannel client = channel.accept();
-                    handleSocket(client);
-                } catch (IOException e) {
-                    listener.onAcceptError(e);
-                }
-            }
-        });
-        acceptSource.setCancelHandler(new Runnable() {
-            public void run() {
-                try {
-                    channel.close();
-                } catch (IOException e) {
-                }
-            }
-        });
-        acceptSource.resume();
-    }
-
-    public void bind() throws IOException {
         URI bind = bindURI;
 
         String host = bind.getHost();
@@ -122,6 +96,7 @@ public class TcpTransportServer implemen
         InetAddress addr = InetAddress.getByName(host);
         try {
             channel = ServerSocketChannel.open();
+            channel.configureBlocking(false);
             channel.socket().bind(new InetSocketAddress(addr, bind.getPort()), backlog);
         } catch (IOException e) {
             throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
@@ -139,6 +114,27 @@ public class TcpTransportServer implemen
                 throw IOExceptionSupport.create(e2);
             }
         }
+        
+        acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue);
+        acceptSource.setEventHandler(new Runnable() {
+            public void run() {
+                try {
+                    SocketChannel client = channel.accept();
+                    handleSocket(client);
+                } catch (IOException e) {
+                    listener.onAcceptError(e);
+                }
+            }
+        });
+        acceptSource.setCancelHandler(new Runnable() {
+            public void run() {
+                try {
+                    channel.close();
+                } catch (IOException e) {
+                }
+            }
+        });
+        acceptSource.resume();
     }
 
     private URI connectURI(String hostname) throws URISyntaxException {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jul  7 03:42:44 2010
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.util.ArrayList;
 import java.util.List;
@@ -100,8 +101,50 @@ public class MultiWireFormatFactory impl
             return rc;
         }
 
-        public Object unmarshal(ReadableByteChannel channel) {
-            throw new UnsupportedOperationException();
+        public UnmarshalSession createUnmarshalSession() {
+            return new UnmarshalSession() {
+                int start=0;
+                int end=0;
+                UnmarshalSession session;
+
+                public int getStartPos() {
+                    return start;
+                }
+
+                public void setStartPos(int pos) {
+                    start=pos;
+                }
+
+                public int getEndPos() {
+                    return end;
+                }
+
+                public void setEndPos(int pos) {
+                    end = pos;
+                }
+
+                public Object unmarshal(ByteBuffer buffer) throws IOException {
+                    if( session!=null ) {
+                        return session.unmarshal(buffer);
+                    }
+
+                    Buffer b = new Buffer(buffer.array(), start, buffer.position());
+                    for (WireFormatFactory wff : wireFormatFactories) {
+                        if (wff.matchesWireformatHeader( b )) {
+                            wireFormat = wff.createWireFormat();
+                            session = wireFormat.createUnmarshalSession();
+                            session.setStartPos(start);
+                            session.setEndPos(end);
+                            return wireFormat;
+                        }
+                    }
+                    
+                    if( end >= maxHeaderLength ) {
+                        throw new IOException("Could not discriminate the protocol.");
+                    }
+                    return null;
+                }
+            };
         }
 
         public void marshal(Object command, DataOutput out) throws IOException {
@@ -109,11 +152,11 @@ public class MultiWireFormatFactory impl
         }
 
         public Buffer marshal(Object command) throws IOException {
-            throw new UnsupportedOperationException();
+            return wireFormat.marshal(command);
         }
 
         public Object unmarshal(Buffer packet) throws IOException {
-            throw new UnsupportedOperationException();
+            return wireFormat.marshal(packet);
         }
 
         public ArrayList<WireFormatFactory> getWireFormatFactories() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Wed Jul  7 03:42:44 2010
@@ -72,6 +72,10 @@ public class ObjectStreamWireFormat impl
         }
     }
 
+    public UnmarshalSession createUnmarshalSession() {
+        throw new UnsupportedOperationException();
+    }
+
     public Object unmarshal(ReadableByteChannel channel) {
         throw new UnsupportedOperationException();
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Wed Jul  7 03:42:44 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.wireformat;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
@@ -49,13 +50,27 @@ public interface WireFormat {
      * Stream based marshaling 
      */
     void marshal(Object command, DataOutput out) throws IOException;
-    
+
     /**
-     * Packet based un-marshaling 
+     * Stream based un-marshaling
      */
     Object unmarshal(DataInput in) throws IOException;
 
-    Object unmarshal(ReadableByteChannel channel);
+    /**
+      * For a unmarshal session is used for non-blocking
+      * unmarshalling.
+      */
+     interface UnmarshalSession {
+        int getStartPos();
+        void setStartPos(int pos);
+
+        int getEndPos();
+        void setEndPos(int pos);
+
+        Object unmarshal(ByteBuffer buffer) throws IOException;
+    }
+
+    UnmarshalSession createUnmarshalSession();
 
     /**
      * @return The name of the wireformat

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java Wed Jul  7 03:42:44 2010
@@ -49,6 +49,10 @@ public class MockWireFormatFactory imple
 	        throw new UnsupportedOperationException();
 		}
 
+        public UnmarshalSession createUnmarshalSession() {
+            throw new UnsupportedOperationException();
+        }
+
         public Object unmarshal(ReadableByteChannel channel) {
             throw new UnsupportedOperationException();
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java Wed Jul  7 03:42:44 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.util.buffer;
 
+import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -333,7 +334,35 @@ public class Buffer implements Comparabl
         }
         return length - oLength;
     }
-    
+
+    /**
+     * same as out.write(data, offset, length);
+     */
+    public void writeTo(DataOutput out) throws IOException {
+        out.write(data, offset, length);
+    }
+
+    /**
+     * same as out.write(data, offset, length);
+     */
+    public void writeTo(OutputStream out) throws IOException {
+        out.write(data, offset, length);
+    }
+
+    /**
+     * same as in.readFully(data, offset, length);
+     */
+    public void readFrom(DataInput in) throws IOException {
+        in.readFully(data, offset, length);
+    }
+
+    /**
+     * same as in.read(data, offset, length);
+     */
+    public int readFrom(InputStream in) throws IOException {
+        return in.read(data, offset, length);
+    }
+
     ///////////////////////////////////////////////////////////////////
     // Statics
     ///////////////////////////////////////////////////////////////////