You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:42:14 UTC

svn commit: r961070 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apo...

Author: chirino
Date: Wed Jul  7 03:42:13 2010
New Revision: 961070

URL: http://svn.apache.org/viewvc?rev=961070&view=rev
Log:
bringing back the stomp module.. converting it to scala.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java
      - copied, changed from r961069, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java
      - copied, changed from r961069, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
      - copied, changed from r961069, activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/package.html
      - copied, changed from r961069, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/DefaultFrameTranslator.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/FrameTranslator.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompConnection.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompFrame.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageEvaluationContext.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompSslTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:42:13 2010
@@ -31,7 +31,7 @@ class ConnectionConfig {
 }
 abstract class Connection() extends TransportListener with Service {
 
-  val q = createQueue("connection")
+  val dispatchQueue = createQueue("connection")
   var name = "connection"
   var stopping = false;
 
@@ -39,7 +39,7 @@ abstract class Connection() extends Tran
   var exceptionListener:ExceptionListener = null;
 
   def start() = {
-    transport.setDispatchQueue(q);
+    transport.setDispatchQueue(dispatchQueue);
     transport.getDispatchQueue.release
     transport.setTransportListener(this);
     transport.start()
@@ -48,7 +48,7 @@ abstract class Connection() extends Tran
   def stop() = {
     stopping=true
     transport.stop()
-    q.release
+    dispatchQueue.release
   }
 
   def onException(error:IOException) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:42:13 2010
@@ -84,41 +84,41 @@ trait Message {
 }
 
 object Delivery {
-  def apply(o:Delivery) = new Delivery(o.message, o.encoded, o.encoding, o.size, o.ack, o.tx_id, o.store_id)
+  def apply(o:Delivery) = new Delivery(o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
 }
 
 case class Delivery (
 
-        /**
-         *  the message being delivered
-         */
+  /**
+   *  the message being delivered
+   */
   message: Message,
 
   /**
-   * the encoded form of the message being delivered.
+   * memory size of the delivery.  Used for resource allocation tracking
    */
-  encoded: Buffer,
+  size:Int,
 
   /**
-   * the encoding format of the message
+   * the encoded form of the message being delivered.
    */
-  encoding: String,
+  encoded: Buffer = null,
 
   /**
-   * memory size of the delivery.  Used for resource allocation tracking
+   * the encoding format of the message
    */
-  size:Int,
+  encoding: String = null,
 
   /**
    *  true if this delivery requires acknowledgment.
    */
-  ack:Boolean,
+  ack:Boolean = false,
 
   /**
    * The id used to identify the transaction that the message
    * belongs to.
    */
-  tx_id:Long,
+  tx_id:Long = -1,
 
   /**
    * The id used to identify this message in the message
@@ -126,7 +126,7 @@ case class Delivery (
    *
    * @return The store tracking or -1 if not set.
    */
-  store_id: Long
+  store_id: Long = -1
 
 ) extends BaseRetained {
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java (from r961069, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java&r1=961069&r2=961070&rev=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java Wed Jul  7 03:42:13 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker;
+package org.apache.activemq.apollo.broker;
 
 import junit.framework.TestCase;
 

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961070&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul  7 03:42:13 2010
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.perf
+
+import _root_.java.beans.ExceptionListener
+import _root_.java.io.{File}
+import _root_.java.net.URI
+import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import _root_.java.util.concurrent.TimeUnit
+import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.broker.store.{StoreFactory, Store}
+import _root_.org.apache.activemq.metric.{Period, MetricAggregator, MetricCounter}
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
+import _root_.org.junit.{Test, Before}
+
+import org.apache.activemq.transport.TransportFactory
+
+import _root_.scala.collection.JavaConversions._
+
+
+abstract class RemoteConsumer extends Connection {
+  val consumerRate = new MetricCounter();
+  var totalConsumerRate: MetricAggregator = null
+  var thinkTime: Long = 0
+  var destination: Destination = null
+  var selector: String = null;
+  var durable = false;
+  var uri: URI = null
+
+  override def start() = {
+    consumerRate.name("Consumer " + name + " Rate");
+    totalConsumerRate.add(consumerRate);
+    transport = TransportFactory.connect(uri);
+    super.start();
+    setupSubscription();
+  }
+
+  protected def setupSubscription()
+
+}
+
+
+abstract class RemoteProducer extends Connection {
+  val rate = new MetricCounter();
+
+  var messageIdGenerator: AtomicLong = null
+  var priority = 0
+  var persistentDelivery = false
+  var priorityMod = 0
+  var counter = 0
+  var producerId = 0
+  var destination: Destination = null
+  var property: String = null
+  var totalProducerRate: MetricAggregator = null
+  var next: Delivery = null
+
+  var filler: String = null
+  var payloadSize = 20
+  var uri: URI = null
+
+  override def start() = {
+
+    if (payloadSize > 0) {
+      var sb = new StringBuilder(payloadSize);
+      for (i <- 0 until payloadSize) {
+        sb.append(('a' + (i % 26)).toChar);
+      }
+      filler = sb.toString();
+    }
+
+    rate.name("Producer " + name + " Rate");
+    totalProducerRate.add(rate);
+
+    transport = TransportFactory.connect(uri);
+    super.start();
+    setupProducer();
+
+  }
+
+  def setupProducer()
+
+def createPayload(): String = {
+    if (payloadSize >= 0) {
+      var sb = new StringBuilder(payloadSize);
+      sb.append(name);
+      sb.append(':');
+      counter += 1
+      sb.append(counter);
+      sb.append(':');
+      var length = sb.length;
+      if (length <= payloadSize) {
+        sb.append(filler.subSequence(0, payloadSize - length));
+        return sb.toString();
+      } else {
+        return sb.substring(0, payloadSize);
+      }
+    } else {
+      counter += 1
+      return name + ":" + (counter);
+    }
+  }
+
+}
+
+object BaseBrokerPerfTest {
+  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+  var IO_WORK_AMOUNT = 0
+  var FANIN_COUNT = 10
+  var FANOUT_COUNT = 10
+  var PRIORITY_LEVELS = 10
+  var USE_INPUT_QUEUES = true
+
+  var USE_KAHA_DB = true;
+  var PURGE_STORE = true;
+  var PERSISTENT = false;
+  var DURABLE = false;
+
+}
+abstract class BaseBrokerPerfTest {
+  import BaseBrokerPerfTest._
+
+  // Set to put senders and consumers on separate brokers.
+  protected var multibroker = false;
+
+  // Set to mockup up ptp:
+  protected var ptp = false;
+
+  // Set to use tcp IO
+  protected var tcp = true;
+  // set to force marshalling even in the NON tcp case.
+  protected var forceMarshalling = true;
+
+  protected var sendBrokerBindURI: String = null
+  protected var receiveBrokerBindURI: String = null
+  protected var sendBrokerConnectURI: String = null
+  protected var receiveBrokerConnectURI: String = null
+
+  protected var producerCount = 0
+  protected var consumerCount = 0
+  protected var destCount = 0
+
+  protected val totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
+  protected val totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items")
+
+  protected var sendBroker: Broker = null
+  protected var rcvBroker: Broker = null
+  protected val brokers = new ArrayList[Broker]()
+  protected val msgIdGenerator = new AtomicLong()
+  protected val stopping = new AtomicBoolean()
+
+  val producers = new ArrayList[RemoteProducer]()
+  val consumers = new ArrayList[RemoteConsumer]()
+  var name: String = null;
+
+  @Before
+  def setUp() = {
+    if (tcp) {
+      sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
+      receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
+
+      sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
+      receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat();
+    } else {
+      sendBrokerConnectURI = "pipe://SendBroker";
+      receiveBrokerConnectURI = "pipe://ReceiveBroker";
+      if (forceMarshalling) {
+        sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
+        receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
+      } else {
+        sendBrokerBindURI = sendBrokerConnectURI;
+        receiveBrokerBindURI = receiveBrokerConnectURI;
+      }
+    }
+  }
+
+  def setName(name: String) = {
+    if (this.name == null) {
+      this.name = name;
+    }
+  }
+
+  def getName() = name
+
+  def getBrokerWireFormat() = "multi"
+
+  def getRemoteWireFormat(): String
+
+  @Test
+  def benchmark_1_1_0(): Unit = {
+    setName("1 producer -> 1 destination -> 0 consumers");
+    if (ptp) {
+      return;
+    }
+    producerCount = 1;
+    destCount = 1;
+
+    createConnections();
+
+    // Start 'em up.
+    startClients();
+    try {
+      reportRates();
+    } finally {
+      stopServices();
+    }
+  }
+
+  @Test
+  def benchmark_1_1_1() = {
+    setName("1 producer -> 1 destination -> 1 consumers");
+    producerCount = 1;
+    destCount = 1;
+    consumerCount = 1;
+
+    createConnections();
+
+    // Start 'em up.
+    startClients();
+    try {
+      reportRates();
+    } finally {
+      stopServices();
+    }
+  }
+
+  @Test
+  def benchmark_10_1_10() = {
+    setName(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT));
+    producerCount = FANIN_COUNT;
+    consumerCount = FANOUT_COUNT;
+    destCount = 1;
+
+    createConnections();
+
+    // Start 'em up.
+    startClients();
+    try {
+      reportRates();
+    } finally {
+      stopServices();
+    }
+  }
+
+  @Test
+  def benchmark_10_1_1() = {
+    setName(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT));
+    producerCount = FANIN_COUNT;
+    destCount = 1;
+    consumerCount = 1;
+
+    createConnections();
+
+    // Start 'em up.
+    startClients();
+    try {
+      reportRates();
+    } finally {
+      stopServices();
+    }
+  }
+
+  @Test
+  def benchmark_1_1_10() = {
+    setName(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT));
+    producerCount = 1;
+    destCount = 1;
+    consumerCount = FANOUT_COUNT;
+
+    createConnections();
+
+    // Start 'em up.
+    startClients();
+    try {
+      reportRates();
+    } finally {
+      stopServices();
+    }
+  }
+
+  @Test
+  def benchmark_2_2_2() = {
+    setName(format("2 producer -> 2 destination -> 2 consumers"));
+    producerCount = 2;
+    destCount = 2;
+    consumerCount = 2;
+
+    createConnections();
+
+    // Start 'em up.
+    startClients();
+    try {
+      reportRates();
+    } finally {
+      stopServices();
+    }
+  }
+
+  @Test
+  def benchmark_10_10_10() = {
+    setName(format("10 producers -> 10 destinations -> 10 consumers"));
+    producerCount = 10;
+    destCount = 10;
+    consumerCount = 10;
+
+    createConnections();
+
+    // Start 'em up.
+    startClients();
+    try {
+      reportRates();
+    } finally {
+      stopServices();
+    }
+  }
+
+  /**
+   * Tests 2 producers sending to 1 destination with 2 consumres, but with
+   * consumers set to select only messages from each producer. 1 consumers is
+   * set to slow, the other producer should be able to send quickly.
+   *
+   * @throws Exception
+   */
+  @Test
+  def benchmark_2_2_2_SlowConsumer() = {
+    setName(format("2 producer -> 2 destination -> 2 slow consumers"));
+    producerCount = 2;
+    destCount = 2;
+    consumerCount = 2;
+
+    createConnections();
+    consumers.get(0).thinkTime = 50;
+
+    // Start 'em up.
+    startClients();
+    try {
+      reportRates();
+    } finally {
+      stopServices();
+    }
+  }
+
+  @Test
+  def benchmark_2_2_2_Selector() = {
+    setName(format("2 producer -> 2 destination -> 2 selector consumers"));
+    producerCount = 2;
+    destCount = 2;
+    consumerCount = 2;
+
+    createConnections();
+
+    // Add properties to match producers to their consumers
+    for (i <- 0 until consumerCount) {
+      var property = "match" + i;
+      consumers.get(i).selector = property;
+      producers.get(i).property = property;
+    }
+
+    // Start 'em up.
+    startClients();
+    try {
+      reportRates();
+    } finally {
+      stopServices();
+    }
+  }
+
+  /**
+   * Test sending with 1 high priority sender. The high priority sender should
+   * have higher throughput than the other low priority senders.
+   *
+   * @throws Exception
+   */
+  @Test
+  def benchmark_2_1_1_HighPriorityProducer() = {
+
+    setName(format("1 high and 1 normal priority producer -> 1 destination -> 1 consumer"));
+    producerCount = 2;
+    destCount = 1;
+    consumerCount = 1;
+
+    createConnections();
+    var producer = producers.get(0);
+    producer.priority = 1
+    producer.rate.setName("High Priority Producer Rate");
+
+    consumers.get(0).thinkTime = 1;
+
+    // Start 'em up.
+    startClients();
+    try {
+
+      System.out.println("Checking rates for test: " + getName());
+      for (i <- 0 until PERFORMANCE_SAMPLES) {
+        var p = new Period();
+        Thread.sleep(1000 * 5);
+        System.out.println(producer.rate.getRateSummary(p));
+        System.out.println(totalProducerRate.getRateSummary(p));
+        System.out.println(totalConsumerRate.getRateSummary(p));
+        totalProducerRate.reset();
+        totalConsumerRate.reset();
+      }
+
+    } finally {
+      stopServices();
+    }
+  }
+
+  /**
+   * Test sending with 1 high priority sender. The high priority sender should
+   * have higher throughput than the other low priority senders.
+   *
+   * @throws Exception
+   */
+  @Test
+  def benchmark_2_1_1_MixedHighPriorityProducer() = {
+
+    setName(format("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer"));
+    producerCount = 2;
+    destCount = 1;
+    consumerCount = 1;
+
+    createConnections();
+    var producer = producers.get(0);
+    producer.priority = 1;
+    producer.priorityMod = 3;
+    producer.rate.setName("High Priority Producer Rate");
+
+    consumers.get(0).thinkTime = 1
+
+    // Start 'em up.
+    startClients();
+    try {
+
+      System.out.println("Checking rates for test: " + getName());
+      for (i <- 0 until PERFORMANCE_SAMPLES) {
+        var p = new Period();
+        Thread.sleep(1000 * 5);
+        System.out.println(producer.rate.getRateSummary(p));
+        System.out.println(totalProducerRate.getRateSummary(p));
+        System.out.println(totalConsumerRate.getRateSummary(p));
+        totalProducerRate.reset();
+        totalConsumerRate.reset();
+      }
+
+    } finally {
+      stopServices();
+    }
+  }
+
+  def reportRates() = {
+    System.out.println("Checking rates for test: " + getName() + ", " + (if (ptp) {"ptp"} else {"topic"}));
+    for (i <- 0 until PERFORMANCE_SAMPLES) {
+      var p = new Period();
+      Thread.sleep(1000 * 5);
+      System.out.println(totalProducerRate.getRateSummary(p));
+      System.out.println(totalConsumerRate.getRateSummary(p));
+      totalProducerRate.reset();
+      totalConsumerRate.reset();
+    }
+  }
+
+  def createConnections() = {
+
+    if (multibroker) {
+      sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
+      rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
+      brokers.add(sendBroker);
+      brokers.add(rcvBroker);
+    } else {
+      sendBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
+      rcvBroker = sendBroker
+      brokers.add(sendBroker);
+    }
+
+    startBrokers();
+
+    var dests = new Array[Destination](destCount);
+
+    for (i <- 0 until destCount) {
+      val domain = if (ptp) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+      val name = new AsciiBuffer("dest" + (i + 1))
+      var bean = new SingleDestination(domain, name)
+      dests(i) = bean;
+      if (ptp) {
+        sendBroker.defaultVirtualHost.createQueue(dests(i));
+        if (multibroker) {
+          rcvBroker.defaultVirtualHost.createQueue(dests(i));
+        }
+      }
+    }
+
+    for (i <- 0 until producerCount) {
+      var destination = dests(i % destCount);
+      var producer = createProducer(i, destination);
+      producer.persistentDelivery = PERSISTENT;
+      producers.add(producer);
+    }
+
+    for (i <- 0 until consumerCount) {
+      var destination = dests(i % destCount);
+      var consumer = createConsumer(i, destination);
+      consumer.durable = DURABLE;
+      consumers.add(consumer);
+    }
+
+    // Create MultiBroker connections:
+    // if (multibroker) {
+    // Pipe<Message> pipe = new Pipe<Message>();
+    // sendBroker.createBrokerConnection(rcvBroker, pipe);
+    // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+    // }
+  }
+
+  def createConsumer(i: Int, destination: Destination): RemoteConsumer = {
+
+    var consumer = createConsumer();
+    consumer.exceptionListener = new ExceptionListener() {
+      def exceptionThrown(error: Exception) = {
+        if (!stopping.get()) {
+          System.err.println("Consumer Async Error:");
+          error.printStackTrace();
+        }
+      }
+    }
+
+    consumer.uri = new URI(rcvBroker.connectUris.head)
+    consumer.destination = destination
+    consumer.name = "consumer" + (i + 1)
+    consumer.totalConsumerRate = totalConsumerRate
+    return consumer;
+  }
+
+  protected def createConsumer(): RemoteConsumer
+
+  private def createProducer(id: Int, destination: Destination): RemoteProducer = {
+    var producer = createProducer();
+    producer.exceptionListener = new ExceptionListener() {
+      def exceptionThrown(error: Exception) = {
+        if (!stopping.get()) {
+          System.err.println("Producer Async Error:");
+          error.printStackTrace();
+        }
+      }
+    }
+    producer.uri = new URI(sendBroker.connectUris.head)
+    producer.producerId = id + 1
+    producer.name = "producer" + (id + 1)
+    producer.destination = destination
+    producer.messageIdGenerator = msgIdGenerator
+    producer.totalProducerRate = totalProducerRate
+    producer
+  }
+
+  protected def createProducer(): RemoteProducer
+
+  private def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
+    val broker = new Broker()
+    broker.transportServers.add(TransportFactory.bind(new URI(bindURI)))
+    broker.connectUris.add(connectUri)
+    //     TODO:
+    //    broker.defaultVirtualHost.setStore(createStore(broker))
+    broker
+  }
+
+  protected def createStore(broker: Broker): Store = {
+    val store = if (USE_KAHA_DB) {
+      StoreFactory.createStore("kaha-db");
+    } else {
+      StoreFactory.createStore("memory");
+    }
+    store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.name));
+    store.setDeleteAllMessages(PURGE_STORE);
+    store
+  }
+
+  private def stopServices() = {
+    stopping.set(true);
+    for (broker <- brokers) {
+      broker.stop();
+    }
+    for (connection <- producers) {
+      connection.stop();
+    }
+    for (connection <- consumers) {
+      connection.stop();
+    }
+  }
+
+  private def startBrokers() = {
+    for (broker <- brokers) {
+      broker.start();
+    }
+  }
+
+  private def startClients() = {
+
+    for (connection <- consumers) {
+      connection.start();
+    }
+
+    for (connection <- producers) {
+      connection.start();
+    }
+  }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala Wed Jul  7 03:42:13 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.ng
 
 import _root_.java.util.{LinkedList}
-import _root_.org.apache.activemq.ng.Stomp
 import java.nio.channels.SelectionKey._
 import _root_.org.apache.activemq.util.buffer._
 import _root_.org.fusesource.hawtdispatch._

Modified: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala Wed Jul  7 03:42:13 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.ng
 
 import _root_.java.util.{LinkedList, ArrayList}
-import _root_.org.apache.activemq.ng.Stomp
 import java.nio.channels.{SocketChannel}
 import java.nio.ByteBuffer
 import java.io.{EOFException, IOException}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java (from r961069, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java&r1=961069&r2=961070&rev=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java Wed Jul  7 03:42:13 2010
@@ -19,15 +19,16 @@ package org.apache.activemq.apollo.stomp
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
 
+
 public interface Stomp {
-    
+
     Buffer EMPTY_BUFFER = new Buffer(0);
     byte NULL = 0;
     Buffer NULL_BUFFER = new Buffer(new byte[]{NULL});
     byte NEWLINE = '\n';
     Buffer NEWLINE_BUFFER = new Buffer(new byte[]{NEWLINE});
     Buffer END_OF_FRAME_BUFFER = new Buffer(new byte[]{NULL, NEWLINE});
-    
+
     AsciiBuffer TRUE = new AsciiBuffer("true");
     AsciiBuffer FALSE = new AsciiBuffer("false");
 
@@ -35,8 +36,8 @@ public interface Stomp {
         AsciiBuffer CONNECT = new AsciiBuffer("CONNECT");
         AsciiBuffer SEND = new AsciiBuffer("SEND");
         AsciiBuffer DISCONNECT = new AsciiBuffer("DISCONNECT");
-        AsciiBuffer SUBSCRIBE = new AsciiBuffer("SUB");
-        AsciiBuffer UNSUBSCRIBE = new AsciiBuffer("UNSUB");
+        AsciiBuffer SUBSCRIBE = new AsciiBuffer("SUBSCRIBE");
+        AsciiBuffer UNSUBSCRIBE = new AsciiBuffer("UNSUBSCRIBE");
 
         AsciiBuffer BEGIN_TRANSACTION = new AsciiBuffer("BEGIN");
         AsciiBuffer COMMIT_TRANSACTION = new AsciiBuffer("COMMIT");
@@ -57,7 +58,7 @@ public interface Stomp {
     public interface Headers {
         byte SEPERATOR = ':';
         Buffer SEPERATOR_BUFFER = new Buffer(new byte[]{SEPERATOR});
-        
+
         AsciiBuffer RECEIPT_REQUESTED = new AsciiBuffer("receipt");
         AsciiBuffer TRANSACTION = new AsciiBuffer("transaction");
         AsciiBuffer CONTENT_LENGTH = new AsciiBuffer("content-length");
@@ -129,16 +130,16 @@ public interface Stomp {
             AsciiBuffer MESSAGE_ID = new AsciiBuffer("message-id");
         }
     }
-    
+
 	public enum Transformations {
 		JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON;
-		
+
 		public String toString() {
 			return name().replaceAll("_", "-").toLowerCase();
 		}
-		
+
 		public static Transformations getValue(String value) {
 			return valueOf(value.replaceAll("-", "_").toUpperCase());
 		}
-	}    
-}
+	}
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961070&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 03:42:13 2010
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp
+
+import _root_.java.util.LinkedList
+import _root_.org.apache.activemq.apollo.broker.{BufferConversions, Destination, Message}
+import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
+import _root_.org.apache.activemq.util.buffer._
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+object StompFrameConstants {
+  type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
+  var NO_DATA = new Buffer(0);
+}
+
+import StompFrameConstants._
+import StompConstants._;
+import BufferConversions._
+  
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:Buffer=NO_DATA) extends Message {
+
+  def headerSize = {
+    if( headers.isEmpty ) {
+      0
+    } else {
+      // if all the headers were part of the same input buffer.. size can be calculated by
+      // subtracting positions in the buffer.
+      val firstBuffer = headers.head._1
+      val lastBuffer =  headers.last._2
+      if( firstBuffer.data eq lastBuffer.data ) {
+        (lastBuffer.offset-firstBuffer.offset)+lastBuffer.length+1
+      } else {
+        // gota do it the hard way
+        var rc = 0;
+        val i = headers.iterator
+        while( i.hasNext ) {
+          val (key, value) = i.next
+          rc += (key.length + value.length+2)
+        }
+        rc
+      }
+    }
+  }
+
+  def size = {
+     if( action.data eq content.data ) {
+        (content.offset-action.offset)+content.length
+     } else {
+       action.length + 1 +
+       headerSize + 1 + content.length
+     }
+  }
+
+  /**
+   * the globally unique id of the message
+   */
+  var id: AsciiBuffer = null
+
+  /**
+   * the globally unique id of the producer
+   */
+  var producer: AsciiBuffer = null
+
+  /**
+   *  the message priority.
+   */
+  var priority:Byte = 4;
+
+  /**
+   * a positive value indicates that the delivery has an expiration
+   * time.
+   */
+  var expiration: Long = -1;
+
+  /**
+   * true if the delivery is persistent
+   */
+  var persistent = false
+
+  /**
+   * where the message was sent to.
+   */
+  var destination: Destination = null
+
+  /**
+   * used to apply a selector against the message.
+   */
+  lazy val messageEvaluationContext = new MessageEvaluationContext() {
+
+    def getBodyAs[T](clazz:Class[T]) = {
+      throw new UnsupportedOperationException
+    }
+
+    def getPropertyExpression(name:String):Expression = {
+      throw new UnsupportedOperationException
+    }
+
+    @deprecated("this should go away.")
+    def getLocalConnectionId() = {
+      throw new UnsupportedOperationException
+    }
+
+    def getDestination[T]():T = {
+      throw new UnsupportedOperationException
+    }
+
+    def setDestination(destination:Any):Unit = {
+      throw new UnsupportedOperationException
+    }
+  }
+
+  for( header <- headers ) {
+    header match {
+      case (Stomp.Headers.Message.MESSAGE_ID, value) =>
+        id = value
+      case (Stomp.Headers.Message.PRORITY, value) =>
+        priority = java.lang.Integer.parseInt(value).toByte
+      case (Stomp.Headers.Message.DESTINATION, value) =>
+        destination = value
+      case (Stomp.Headers.Message.EXPIRATION_TIME, value) =>
+        expiration = java.lang.Long.parseLong(value)
+    }
+  }
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (from r961069, activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala&r1=961069&r2=961070&rev=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:42:13 2010
@@ -14,20 +14,244 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.ng
+package org.apache.activemq.apollo.stomp
 
 import _root_.java.util.{LinkedList, ArrayList}
-import _root_.org.apache.activemq.ng.Stomp
+import _root_.org.apache.activemq.apollo.broker._
+
+import _root_.org.apache.activemq.wireformat.WireFormat
+import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
 import java.nio.channels.{SocketChannel}
 import java.nio.ByteBuffer
 import java.io.{EOFException, IOException}
 import _root_.org.apache.activemq.util.buffer._
 import collection.mutable.{ListBuffer, HashMap}
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 import AsciiBuffer._
 import Stomp._
 import Stomp.Headers._
 
+import BufferConversions._
+import _root_.scala.collection.JavaConversions._
+import StompFrameConstants._;
+
+
+class StompProtocolException(msg:String) extends Exception(msg)
+
+object StompConstants {
+  val QUEUE_PREFIX = new AsciiBuffer("/topic/")
+  val TOPIC_PREFIX = new AsciiBuffer("/queue/")
+
+  implicit def toDestination(value:AsciiBuffer):Destination = {
+    if( value.startsWith(QUEUE_PREFIX) ) {
+      new SingleDestination(Domain.QUEUE_DOMAIN, value.slice(QUEUE_PREFIX.length, -QUEUE_PREFIX.length))
+    } else if( value.startsWith(TOPIC_PREFIX) ) {
+      new SingleDestination(Domain.TOPIC_DOMAIN, value.slice(TOPIC_PREFIX.length, -TOPIC_PREFIX.length))
+    } else {
+      throw new StompProtocolException("Invalid stomp destiantion name: "+value);
+    }
+  }
+
+}
+
+import StompConstants._
+
+class StompProtocolHandler extends ProtocolHandler {
+
+
+  class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
+
+    val queue = StompProtocolHandler.this.dispatchQueue
+    queue.retain
+    setDisposer(^{ queue.release  })
+
+    val deliveryQueue = new DeliveryCreditBufferProtocol(outboundChannel, queue)
+
+    def matches(message:Delivery) = true
+
+    def open_session(producer_queue:DispatchQueue) = new DeliverySession {
+      val session = deliveryQueue.session(producer_queue)
+
+      val consumer = SimpleConsumer.this
+      retain
+
+      def deliver(delivery:Delivery) = session.send(delivery)
+
+      def close = {
+        session.close
+        release
+      }
+    }
+  }
+
+  def dispatchQueue = connection.dispatchQueue
+  val outboundChannel  = new DeliveryBuffer
+  var closed = false
+  var consumer:SimpleConsumer = null
+
+  var connection:BrokerConnection = null
+  var wireformat:WireFormat = null
+  var producerRoute:DeliveryProducerRoute=null
+  var host:VirtualHost = null
+
+  private def queue = connection.dispatchQueue
+
+  def setConnection(connection:BrokerConnection) = {
+    this.connection = connection
+
+    // We will be using the default virtual host
+    connection.broker.getDefaultVirtualHost(
+      queue.wrap { (host)=>
+        this.host=host
+      }
+    )
+  }
+
+  def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
+
+  def onCommand(command:Any) = {
+    val frame = command.asInstanceOf[StompFrame]
+    frame match {
+      case StompFrame(Commands.CONNECT, headers, _) =>
+        on_stomp_connect(headers)
+      case StompFrame(Commands.SEND, headers, content) =>
+        on_stomp_send(frame)
+      case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+        on_stomp_subscribe(headers)
+      case StompFrame(Commands.ACK, headers, content) =>
+        // TODO:
+      case StompFrame(Commands.DISCONNECT, headers, content) =>
+        stop
+      case StompFrame(unknown, _, _) =>
+        die("Unsupported STOMP command: "+unknown);
+    }
+  }
+
+
+  def on_stomp_connect(headers:HeaderMap) = {
+    println("connected on: "+Thread.currentThread.getName);
+    connection.transport.oneway(StompFrame(Responses.CONNECTED))
+  }
+
+  def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
+    val i = headers.iterator
+    while( i.hasNext ) {
+      val entry = i.next
+      if( entry._1 == name ) {
+        return Some(entry._2)
+      }
+    }
+    None
+  }
+
+  def on_stomp_send(frame:StompFrame) = {
+    get(frame.headers, Headers.Send.DESTINATION) match {
+      case Some(dest)=>
+        // create the producer route...
+        if( producerRoute==null || producerRoute.destination!= dest ) {
+
+          // clean up the previous producer..
+          if( producerRoute!=null ) {
+            host.router.disconnect(producerRoute)
+            producerRoute=null
+          }
+
+          val producer = new DeliveryProducer() {
+            override def collocate(value:DispatchQueue):Unit = ^{
+//              TODO:
+//              if( value.getTargetQueue ne queue.getTargetQueue ) {
+//                println("sender on "+queue.getLabel+" co-locating with: "+value.getLabel);
+//                queue.setTargetQueue(value.getTargetQueue)
+//                write_source.setTargetQueue(queue);
+//                read_source.setTargetQueue(queue)
+//              }
+
+            } ->: queue
+          }
+
+          // don't process frames until we are connected..
+          connection.transport.suspendRead
+          host.router.connect(dest, queue, producer) {
+            (route) =>
+              connection.transport.resumeRead
+              producerRoute = route
+              send_via_route(producerRoute, frame)
+          }
+        } else {
+          // we can re-use the existing producer route
+          send_via_route(producerRoute, frame)
+        }
+      case None=>
+        die("destination not set.")
+    }
+  }
+
+  def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
+    if( !route.targets.isEmpty ) {
+      val delivery = Delivery(frame, frame.size)
+      connection.transport.suspendRead
+      delivery.setDisposer(^{
+        connection.transport.resumeRead
+      })
+      route.targets.foreach(consumer=>{
+        consumer.deliver(delivery)
+      })
+      delivery.release;
+    }
+  }
+
+  def on_stomp_subscribe(headers:HeaderMap) = {
+    println("Consumer on "+Thread.currentThread.getName)
+    get(headers, Headers.Subscribe.DESTINATION) match {
+      case Some(dest)=>
+        if( consumer !=null ) {
+          die("Only one subscription supported.")
+
+        } else {
+          consumer = new SimpleConsumer(dest);
+          host.router.bind(dest, consumer :: Nil)
+          consumer.release
+        }
+      case None=>
+        die("destination not set.")
+    }
+
+  }
+
+  private def die(msg:String) = {
+    println("Shutting connection down due to: "+msg)
+    connection.transport.suspendRead
+    connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)))
+    ^ {
+      stop
+    } ->: queue
+  }
+
+  def onException(error:Exception) = {
+    println("Shutting connection down due to: "+error)
+    stop
+  }
+
+  def start = {
+  }
+  
+  def stop = {
+    if( !closed ) {
+      closed=true;
+      if( producerRoute!=null ) {
+        host.router.disconnect(producerRoute)
+        producerRoute=null
+      }
+      if( consumer!=null ) {
+        host.router.unbind(consumer.dest, consumer::Nil)
+        consumer=null
+      }
+      connection.stop
+    }
+  }
+}
+
 object StompWireFormat {
     val READ_BUFFFER_SIZE = 1024*64;
     val MAX_COMMAND_LENGTH = 1024;
@@ -46,78 +270,6 @@ class StompWireFormat {
     ByteBuffer.wrap(Array(x));
   }
 
-//  var outbound_pos=0
-//  var outbound_limit=0
-//  var outbound_buffers: ListBuffer[ByteBuffer] = new ListBuffer[ByteBuffer]()
-//
-//  /**
-//   * @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
-//   */
-//  def drain_source(socket:SocketChannel)(source: =>StompFrame ):Boolean = {
-//    while(true) {
-//      // if we have a pending frame that is being sent over the socket...
-//      if( !outbound_buffers.isEmpty ) {
-//
-//        val data = outbound_buffers.toArray
-//
-//        socket.write(data)
-//
-//        // remove all the written buffers...
-//        while( !outbound_buffers.isEmpty && outbound_buffers.head.remaining==0 ) {
-//          outbound_buffers.remove(0)
-//        }
-//
-//        if( !outbound_buffers.isEmpty ) {
-//          // non blocking socket returned before the buffers were fully written to disk..
-//          // we are not yet fully drained.. but need to quit now.
-//          return false
-//        }
-//
-//      } else {
-//
-//        var frame = source
-//        while( frame!=null ) {
-//          marshall(outbound_buffers, frame)
-//          frame = source
-//        }
-//
-//        if( outbound_buffers.size == 0 ) {
-//          // the source is now drained...
-//          return true
-//        }
-//      }
-//    }
-//    true
-//  }
-//
-//  implicit def toByteBuffer(data:AsciiBuffer) = ByteBuffer.wrap(data.data, data.offset, data.length)
-//  implicit def toByteBuffer(data:Buffer) = ByteBuffer.wrap(data.data, data.offset, data.length)
-//
-//  def marshall(buffer:ListBuffer[ByteBuffer], frame:StompFrame) = {
-//    buffer.append(frame.action)
-//    buffer.append(NEWLINE)
-//
-//    // we can optimize a little if the headers and content are in the same buffer..
-//    if( !frame.headers.isEmpty && !frame.content.isEmpty &&
-//            ( frame.headers.getFirst._1.data eq frame.content.data ) ) {
-//      buffer.append(  ByteBuffer.wrap(frame.content.data, frame.headers.getFirst._1.offset, (frame.content.offset-frame.headers.getFirst._1.offset)+ frame.content.length) )
-//
-//    } else {
-//      val i = frame.headers.iterator
-//      while( i.hasNext ) {
-//        val (key, value) = i.next
-//        buffer.append(key)
-//        buffer.append(SEPERATOR)
-//        buffer.append(value)
-//        buffer.append(NEWLINE)
-//      }
-//
-//      buffer.append(NEWLINE)
-//      buffer.append(toByteBuffer(frame.content))
-//    }
-//    buffer.append(toByteBuffer(END_OF_FRAME_BUFFER))
-//  }
-
   var outbound_frame: ByteBuffer = null
   /**
    * @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
@@ -163,18 +315,16 @@ class StompWireFormat {
 
     // we can optimize a little if the headers and content are in the same buffer..
     if( !frame.headers.isEmpty && !frame.content.isEmpty &&
-            ( frame.headers.getFirst._1.data eq frame.content.data ) ) {
+            ( frame.headers.head._1.data eq frame.content.data ) ) {
 
-      val offset = frame.headers.getFirst._1.offset;
-      val buffer1 = frame.headers.getFirst._1;
+      val offset = frame.headers.head._1.offset;
+      val buffer1 = frame.headers.head._1;
       val buffer2 = frame.content;
       val length = (buffer2.offset-buffer1.offset)+buffer2.length
       buffer.write( buffer1.data, offset, length)
 
     } else {
-      val i = frame.headers.iterator
-      while( i.hasNext ) {
-        val (key, value) = i.next
+      for( (key, value) <- frame.headers ) {
         buffer.write(key)
         buffer.write(SEPERATOR)
         buffer.write(value)
@@ -275,9 +425,7 @@ class StompWireFormat {
     null
   }
 
-  type HeaderMap = LinkedList[(AsciiBuffer, AsciiBuffer)]
-
-  def read_headers(action:Buffer, headers:HeaderMap=new LinkedList()):FrameReader = ()=> {
+  def read_headers(action:Buffer, headers:HeaderMap=Nil):FrameReader = ()=> {
     val line = read_line(MAX_HEADER_LENGTH, "The maximum header length was exceeded")
     if( line !=null ) {
       if( line.trim().length() > 0 ) {
@@ -340,7 +488,7 @@ class StompWireFormat {
     }
     None
   }
-  
+
 
   def read_binary_body(action:Buffer, headers:HeaderMap, contentLength:Int):FrameReader = ()=> {
     val content:Buffer=read_content(contentLength)
@@ -394,6 +542,4 @@ class StompWireFormat {
     }
   }
 
-
-
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/package.html (from r961069, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/package.html?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/package.html&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html&r1=961069&r2=961070&rev=961070&view=diff
==============================================================================
    (empty)

Added: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961070&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:42:13 2010
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.perf
+
+import _root_.java.util.concurrent.TimeUnit
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.apollo.broker.perf._
+import _root_.org.apache.activemq.apollo.stomp._
+
+import _root_.org.apache.activemq.transport.CompletionCallback
+import _root_.org.apache.activemq.util.buffer._
+import collection.mutable.{ListBuffer, HashMap}
+
+import AsciiBuffer._
+import Stomp._
+import _root_.org.apache.activemq.apollo.stomp.StompFrame
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+class StompBrokerPerfTest extends BaseBrokerPerfTest {
+
+    override def createProducer() =  new StompRemoteProducer()
+    override def createConsumer() = new StompRemoteConsumer()
+    override def getRemoteWireFormat() = "stomp"
+
+}
+
+class StompRemoteConsumer extends RemoteConsumer {
+
+    def setupSubscription() = {
+        val stompDestination = if( destination.getDomain() == Domain.QUEUE_DOMAIN ) {
+            ascii("/queue/"+destination.getName().toString());
+        } else {
+            ascii("/topic/"+destination.getName().toString());
+        }
+
+        var frame = StompFrame(Stomp.Commands.CONNECT);
+        transport.oneway(frame);
+
+        var headers:List[(AsciiBuffer, AsciiBuffer)] = Nil
+        headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
+        headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-"+name))
+        headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO)
+
+        frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+        transport.oneway(frame);
+    }
+
+    def onCommand(command:Object) = {
+      var frame = command.asInstanceOf[StompFrame]
+      frame match {
+        case StompFrame(Responses.CONNECTED, headers, _) =>
+        case StompFrame(Responses.MESSAGE, headers, content) =>
+          messageReceived();
+        case _ =>
+          onFailure(new Exception("Unexpected stomp command: " + frame.action));
+      }
+    }
+
+  protected def messageReceived() {
+    if (thinkTime > 0) {
+      transport.suspendRead
+      dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+        consumerRate.increment();
+        if (!stopping) {
+          transport.resumeRead
+        }
+      })
+    } else {
+      consumerRate.increment();
+    }
+  }
+
+}
+
+class StompRemoteProducer extends RemoteProducer {
+
+    var stompDestination:AsciiBuffer = null
+
+    val send_next:CompletionCallback = new CompletionCallback() {
+      def onCompletion() = {
+        rate.increment();
+        if( !stopping ) {
+
+          var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+          headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+          if (property != null) {
+              headers ::= (ascii(property), ascii(property));
+          }
+//          var p = this.priority;
+//          if (priorityMod > 0) {
+//              p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+//          }
+
+          var content = ascii(createPayload());
+          transport.oneway(StompFrame(Stomp.Commands.SEND, headers, content), send_next)
+        }
+      }
+      def onFailure(error:Throwable) = {
+        println("stopping due to: "+error);
+        stop
+      }
+    }
+
+    override def setupProducer() = {
+      if( destination.getDomain() == Domain.QUEUE_DOMAIN  ) {
+          stompDestination = ascii("/queue/"+destination.getName().toString());
+      } else {
+          stompDestination = ascii("/topic/"+destination.getName().toString());
+      }
+      transport.oneway(StompFrame(Stomp.Commands.CONNECT), send_next);
+    }
+
+    def onCommand(command:Object) = {
+      var frame = command.asInstanceOf[StompFrame]
+      frame match {
+        case StompFrame(Responses.CONNECTED, headers, _) =>
+        case _ =>
+          onFailure(new Exception("Unexpected stomp command: " + frame.action));
+      }
+    }
+
+}
+

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:42:13 2010
@@ -327,11 +327,11 @@ public class TcpTransport implements Tra
         return null;
     }
 
-    public void suspend() {
+    public void suspendRead() {
         readSource.suspend();
     }
 
-    public void resume() {
+    public void resumeRead() {
         readSource.resume();
     }
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul  7 03:42:13 2010
@@ -74,12 +74,12 @@ public interface Transport extends Servi
     /**
      * suspend delivery of commands.
      */
-    void suspend();
+    void suspendRead();
 
     /**
      * resume delivery of commands.
      */
-    void resume();
+    void resumeRead();
 
     /**
      * @param target

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul  7 03:42:13 2010
@@ -66,12 +66,12 @@ public class TransportFilter implements 
         next.setDispatchQueue(queue);
     }
 
-    public void suspend() {
-        next.suspend();
+    public void suspendRead() {
+        next.suspendRead();
     }
 
-    public void resume() {
-        next.resume();
+    public void resumeRead() {
+        next.resumeRead();
     }
 
     /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul  7 03:42:13 2010
@@ -193,11 +193,11 @@ public class PipeTransport implements Tr
         return null;
     }
 
-    public void suspend() {
+    public void suspendRead() {
         dispatchSource.suspend();
     }
 
-    public void resume() {
+    public void resumeRead() {
         dispatchSource.resume();
     }
     public void reconnect(URI uri, CompletionCallback callback) {