You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:43:29 UTC

svn commit: r961073 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/ activemq-broker/src/test/scala/org/apache/active...

Author: chirino
Date: Wed Jul  7 03:43:29 2010
New Revision: 961073

URL: http://svn.apache.org/viewvc?rev=961073&view=rev
Log:
- relaced used of uri in the tansport factory methods with the more generic string
- added stomp broker main object
- added stomp load client

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
      - copied, changed from r961072, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
    activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:43:29 2010
@@ -52,11 +52,11 @@ object BrokerFactory {
      * @throws Exception
      */
     def createBroker(brokerURI:String, startBroker:Boolean=false):Broker = {
-      var split = brokerURI.split(":")
-      if (split.length < 2 ) {
+      var scheme = FactoryFinder.getScheme(brokerURI)
+      if (scheme==null ) {
           throw new IllegalArgumentException("Invalid broker URI, no scheme specified: " + brokerURI)
       }
-      var handler = createHandler(split(0))
+      var handler = createHandler(scheme)
       var broker = handler.createBroker(brokerURI)
       if (startBroker) {
           broker.start();
@@ -300,14 +300,16 @@ class Queue(val destination:Destination)
 
   override val queue:DispatchQueue = createQueue("queue:"+destination);
   queue.setTargetQueue(getRandomThreadQueue)
-  setDisposer(^{
-    queue.release
-  })
 
   val delivery_buffer  = new DeliveryBuffer
   delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
 
-  val delivery_sessions = new DeliveryCreditBufferProtocol(delivery_buffer, queue)
+  val session_manager = new DeliverySessionManager(delivery_buffer, queue)
+
+  setDisposer(^{
+    queue.release
+    session_manager.release
+  })
 
   class ConsumerState(val consumer:DeliverySession) {
     var bound=true
@@ -376,7 +378,7 @@ class Queue(val destination:Destination)
 
   def open_session(producer_queue:DispatchQueue) = new DeliverySession {
 
-    val session = delivery_sessions.session(producer_queue)
+    val session = session_manager.session(producer_queue)
     val consumer = Queue.this
     retain
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:43:29 2010
@@ -461,9 +461,9 @@ class DeliveryOverflowBuffer(val deliver
 
 }
 
-class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained with Suspendable {
+class DeliverySessionManager(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained {
 
-  var sessions = List[CreditServer]()
+  var sessions = List[SessionServer]()
 
   var session_min_credits = 1024*4;
   var session_credit_capacity = 1024*32
@@ -478,10 +478,7 @@ class DeliveryCreditBufferProtocol(val d
   // use a event aggregating source to coalesce multiple events from the same thread.
   val source = createSource(new ListEventAggregator[Delivery](), queue)
   source.setEventHandler(^{drain_source});
-
-  def suspend() = source.suspend
-  def resume() = source.resume
-  def isSuspended() = source.isSuspended
+  source.resume
 
   def drain_source = {
     val deliveries = source.getData
@@ -491,7 +488,7 @@ class DeliveryCreditBufferProtocol(val d
     }
   }
 
-  class CreditServer(val producer_queue:DispatchQueue) {
+  class SessionServer(val producer_queue:DispatchQueue) {
     private var _capacity = 0
 
     def capacity(value:Int) = {
@@ -504,9 +501,9 @@ class DeliveryCreditBufferProtocol(val d
       client.drain(callback)
     }
 
-    val client = new CreditClient()
+    val client = new SessionClient()
 
-    class CreditClient() extends DeliveryOverflowBuffer(delivery_buffer) {
+    class SessionClient() extends DeliveryOverflowBuffer(delivery_buffer) {
 
       producer_queue.retain
       val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
@@ -560,8 +557,8 @@ class DeliveryCreditBufferProtocol(val d
     }
   }
 
-  def session(queue:DispatchQueue) = {
-    val session = new CreditServer(queue)
+  def session(producer_queue:DispatchQueue) = {
+    val session = new SessionServer(producer_queue)
     sessions = session :: sessions
     session.capacity(session_max_credits)
     session.client

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul  7 03:43:29 2010
@@ -96,21 +96,21 @@ class VMTransportFactory extends PipeTra
 		}
 	}
 
-  override def bind(uri:URI):TransportServer = {
+  override def bind(uri:String):TransportServer = {
     new VmTransportServer();
   }
 
-  override def connect(location:URI):Transport = {
+  override def connect(location:String):Transport = {
 		try {
-
+      var uri = new URI(location)
 			var brokerURI:String = null;
 			var create = true;
-			var name = location.getHost();
+			var name = uri.getHost();
 			if (name == null) {
 				name = DEFAULT_PIPE_NAME;
 			}
 
-			var options = URISupport.parseParamters(location);
+			var options = URISupport.parseParamters(uri);
 			var config = options.remove("broker").asInstanceOf[String]
 			if (config != null) {
 				brokerURI = config;
@@ -139,7 +139,7 @@ class VMTransportFactory extends PipeTra
 				}
 
 				// We want to use a vm transport server impl.
-				var vmTransportServer = TransportFactory.bind(new URI("vm://" + name+"?wireFormat=null")).asInstanceOf[VmTransportServer]
+				var vmTransportServer = TransportFactory.bind("vm://" + name+"?wireFormat=null").asInstanceOf[VmTransportServer]
 				vmTransportServer.setBroker(broker);
 				broker.transportServers.add(vmTransportServer);
 				broker.start();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul  7 03:43:29 2010
@@ -42,7 +42,7 @@ abstract class RemoteConsumer extends Co
   var destination: Destination = null
   var selector: String = null;
   var durable = false;
-  var uri: URI = null
+  var uri: String = null
 
   override def start() = {
     consumerRate.name("Consumer " + name + " Rate");
@@ -78,7 +78,7 @@ abstract class RemoteProducer extends Co
 
   var filler: String = null
   var payloadSize = 20
-  var uri: URI = null
+  var uri: String = null
 
   override def start() = {
 
@@ -238,7 +238,7 @@ abstract class BaseBrokerPerfTest {
     consumerCount = 1;
 
     createConnections();
-    producers.get(0).thinkTime = 50;
+    producers.get(0).thinkTime = 500000*1000;
 
     // Start 'em up.
     startClients();
@@ -549,7 +549,7 @@ abstract class BaseBrokerPerfTest {
       }
     }
 
-    consumer.uri = new URI(rcvBroker.connectUris.head)
+    consumer.uri = rcvBroker.connectUris.head
     consumer.destination = destination
     consumer.name = "consumer" + (i + 1)
     consumer.totalConsumerRate = totalConsumerRate
@@ -568,7 +568,7 @@ abstract class BaseBrokerPerfTest {
         }
       }
     }
-    producer.uri = new URI(sendBroker.connectUris.head)
+    producer.uri = sendBroker.connectUris.head
     producer.producerId = id + 1
     producer.name = "producer" + (id + 1)
     producer.destination = destination
@@ -581,7 +581,7 @@ abstract class BaseBrokerPerfTest {
 
   private def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
     val broker = new Broker()
-    broker.transportServers.add(TransportFactory.bind(new URI(bindURI)))
+    broker.transportServers.add(TransportFactory.bind(bindURI))
     broker.connectUris.add(connectUri)
     broker
   }
@@ -622,6 +622,8 @@ abstract class BaseBrokerPerfTest {
       for (connection <- consumers) {
         connection.start();
       }
+    })
+    getGlobalQueue.dispatchAfter(400, TimeUnit.MILLISECONDS, ^{
       for (connection <- producers) {
         connection.start();
       }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Wed Jul  7 03:43:29 2010
@@ -36,7 +36,7 @@ public class VMTransportTest {
 	
 	@Test()
 	public void autoCreateBroker() throws Exception {
-		Transport connect = TransportFactory.connect(new URI("vm://test1?wireFormat=mock"));
+		Transport connect = TransportFactory.connect("vm://test1?wireFormat=mock");
 		connect.start();
 		assertNotNull(connect);
 		connect.stop();
@@ -44,12 +44,12 @@ public class VMTransportTest {
 	
 	@Test(expected=IOException.class)
 	public void noAutoCreateBroker() throws Exception {
-		TransportFactory.connect(new URI("vm://test2?create=false&wireFormat=mock"));
+		TransportFactory.connect("vm://test2?create=false&wireFormat=mock");
 	}
 	
 	@Test(expected=IllegalArgumentException.class)
 	public void badOptions() throws Exception {
-		TransportFactory.connect(new URI("vm://test3?crazy-option=false&wireFormat=mock"));
+		TransportFactory.connect("vm://test3?crazy-option=false&wireFormat=mock");
 	}
 	
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java Wed Jul  7 03:43:29 2010
@@ -53,7 +53,7 @@ public class BrokerXml {
 		for (String element : transportServers) {
 			TransportServer server;
 			try {
-				server = TransportFactory.bind(new URI(element));
+				server = TransportFactory.bind(element);
 			} catch (Exception e) {
 				throw new Exception("Unable to bind transport server '"+element+" due to: "+e.getMessage(), e);
 			}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (from r961072, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java&r1=961072&r2=961073&rev=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul  7 03:43:29 2010
@@ -14,12 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.stomp
+
+import org.apache.activemq.apollo.broker.Broker
+import org.apache.activemq.transport.TransportFactory
 
 /**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public interface CompletionCallback {
-    void onCompletion();
-    public void onFailure(Throwable caught);
+object StompBroker {
+
+  var address = "0.0.0.0"
+  var port = 61613
+
+  def main(args:Array[String]) = {
+    println("Starting stomp broker...")
+
+    val broker = new Broker()
+
+    val uri = "tcp://"+address+":"+port+"?wireFormat=multi"
+    val server = TransportFactory.bind(uri)
+    broker.transportServers.add(server)
+    broker.start
+
+    println("Startup complete.")
+    System.in.read
+    println("Shutting down...")
+    broker.stop
+    println("Shutdown complete.")
+  }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:43:29 2010
@@ -27,14 +27,15 @@ import _root_.org.fusesource.hawtdispatc
 import AsciiBuffer._
 import Stomp._
 import BufferConversions._
-import StompFrameConstants._;
+import StompFrameConstants._
+import org.apache.activemq.transport.CompletionCallback;
 
 
 class StompProtocolException(msg:String) extends Exception(msg)
 
 object StompConstants {
-  val QUEUE_PREFIX = new AsciiBuffer("/topic/")
-  val TOPIC_PREFIX = new AsciiBuffer("/queue/")
+  val QUEUE_PREFIX = new AsciiBuffer("/queue/")
+  val TOPIC_PREFIX = new AsciiBuffer("/topic/")
 
   implicit def toDestination(value:AsciiBuffer):Destination = {
     if( value.startsWith(QUEUE_PREFIX) ) {
@@ -56,15 +57,18 @@ class StompProtocolHandler extends Proto
   class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
 
     val queue = StompProtocolHandler.this.dispatchQueue
-    queue.retain
-    setDisposer(^{ queue.release  })
+    val session_manager = new DeliverySessionManager(outboundChannel, queue)
 
-    val deliveryQueue = new DeliveryCreditBufferProtocol(outboundChannel, queue)
+    queue.retain
+    setDisposer(^{
+      session_manager.release
+      queue.release
+    })
 
     def matches(message:Delivery) = true
 
     def open_session(producer_queue:DispatchQueue) = new DeliverySession {
-      val session = deliveryQueue.session(producer_queue)
+      val session = session_manager.session(producer_queue)
 
       val consumer = SimpleConsumer.this
       retain
@@ -79,7 +83,7 @@ class StompProtocolHandler extends Proto
   }
 
   def dispatchQueue = connection.dispatchQueue
-  val outboundChannel  = new DeliveryBuffer
+  val outboundChannel = new DeliveryBuffer
   var closed = false
   var consumer:SimpleConsumer = null
 
@@ -88,17 +92,30 @@ class StompProtocolHandler extends Proto
   var producerRoute:DeliveryProducerRoute=null
   var host:VirtualHost = null
 
+  outboundChannel.eventHandler = ^{
+    var delivery = outboundChannel.receive
+    while( delivery!=null ) {
+      connection.transport.oneway(delivery.message, new CompletionCallback() {
+        def onCompletion() = {
+          outboundChannel.ack(delivery)
+        }
+        def onFailure(e:Exception) = {
+          StompProtocolHandler.this.onException(e)
+        }
+      });
+    }
+  }
+
+
   private def queue = connection.dispatchQueue
 
   def setConnection(connection:BrokerConnection) = {
     this.connection = connection
 
     // We will be using the default virtual host
-    println("waiting for host")
     connection.transport.suspendRead
     connection.broker.runtime.getDefaultVirtualHost(
       queue.wrap { (host)=>
-        println("got host")
         this.host=host
         connection.transport.resumeRead
       }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:43:29 2010
@@ -125,7 +125,7 @@ class StompRemoteProducer extends Remote
           dispatchQueue << task
         }
       }
-      def onFailure(error:Throwable) = {
+      def onFailure(error:Exception) = {
         println("stopping due to: "+error);
         stop
       }

Added: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961073&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul  7 03:43:29 2010
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.perf
+
+import _root_.java.io._
+import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import _root_.org.apache.activemq.util.buffer.AsciiBuffer
+import java.net.{ProtocolException, InetSocketAddress, URI, Socket}
+
+import java.lang.String._
+import java.util.concurrent.TimeUnit._
+import collection.mutable.Map
+
+/**
+ *
+ * Simulates load on the a stomp broker.
+ *
+ */
+object StompLoadClient {
+
+  val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+  import StompLoadClient._
+  implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
+
+  var producerSleep = 0;
+  var consumerSleep = 0;
+  var producers = 1;
+  var consumers = 1;
+  var sampleInterval = 5 * 1000;
+  var uri = "stomp://127.0.0.1:61613";
+  var bufferSize = 64*1204
+  var messageSize = 1024;
+  var useContentLength=true
+
+  var destinationType = "queue";
+  var destinationCount = 1;
+
+  val producerCounter = new AtomicLong();
+  val consumerCounter = new AtomicLong();
+  val done = new AtomicBoolean()
+
+  def main(args:Array[String]) = run
+
+  def run() = {
+
+    println("=======================")
+    println("Press ENTER to shutdown");
+    println("=======================")
+    println("")
+
+
+    done.set(false)
+    var producerThreads = List[ProducerThread]()
+    for (i <- 0 until producers) {
+      val producerThread = new ProducerThread(i);
+      producerThreads = producerThread :: producerThreads
+      producerThread.start();
+    }
+
+    var consumerThreads = List[ConsumerThread]()
+    for (i <- 0 until consumers) {
+      val consumerThread = new ConsumerThread(i);
+      consumerThreads = consumerThread :: consumerThreads
+      consumerThread.start();
+    }
+
+    // start a sampling thread...
+    val sampleThread = new Thread() {
+      override def run() = {
+        try {
+          var start = System.nanoTime();
+          while( !done.get ) {
+            Thread.sleep(sampleInterval)
+            val end = System.nanoTime();
+            printRate("Producer", producerCounter, end - start);
+            printRate("Consumer", consumerCounter, end - start);
+            start = end;
+          }
+        } catch {
+          case e:InterruptedException =>
+        }
+      }
+    }
+    sampleThread.start()
+
+
+    System.in.read()
+    println("=======================")
+    done.set(true)
+
+    // wait for the threads to finish..
+    for( thread <- consumerThreads ) {
+      thread.client.close
+      thread.interrupt
+      thread.join
+    }
+    for( thread <- producerThreads ) {
+      thread.client.close
+      thread.interrupt
+      thread.join
+    }
+    sampleThread.interrupt
+    sampleThread.join
+
+    println("Shutdown");
+    println("=======================")
+
+  }
+
+  override def toString() = {
+    "--------------------------------------\n"+
+    "StompLoadClient Properties\n"+
+    "--------------------------------------\n"+
+    "uri              = "+uri+"\n"+
+    "producers        = "+producers+"\n"+
+    "consumers        = "+consumers+"\n"+
+    "destinationType  = "+destinationType+"\n"+
+    "destinationCount = "+destinationCount+"\n" +
+    "messageSize      = "+messageSize+"\n"+
+    "producerSleep    = "+producerSleep+"\n"+
+    "consumerSleep    = "+consumerSleep+"\n"+
+    "bufferSize       = "+bufferSize+"\n"+
+    "useContentLength = "+useContentLength+"\n"+
+    "sampleInterval   = "+sampleInterval+"\n"
+  }
+
+  def printRate(name: String, counter: AtomicLong, nanos: Long) = {
+    val c = counter.getAndSet(0);
+    val rate_per_second: java.lang.Float = ((1.0f * c / nanos) * NANOS_PER_SECOND);
+    println(format("%s rate: %,.3f per second", name, rate_per_second));
+  }
+
+  def destination(i:Int) = "/"+destinationType+"/load-"+(i%destinationCount)
+
+
+  object StompClient {
+    def connect(proc: StompClient=>Unit ) = {
+      val client = new StompClient();
+      try {
+        val connectUri = new URI(uri);
+        client.open(connectUri.getHost(), connectUri.getPort());
+        client.send("""CONNECT
+
+""")
+        client.flush
+        client.receive("CONNECTED")
+        proc(client)
+      } catch {
+        case e: Throwable =>
+          if(!done.get) {
+            println("failure occured: "+e);
+            Thread.sleep(1000);
+          }
+      } finally {
+        try {
+          client.close();
+        } catch {
+          case ignore: Throwable =>
+        }
+      }
+    }
+  }
+
+  class StompClient {
+
+    var socket:Socket = null
+    var out:OutputStream = null;
+    var in:InputStream = null
+
+    def open(host: String, port: Int) = {
+      socket = new Socket
+      socket.connect(new InetSocketAddress(host, port))
+      socket.setSoLinger(true, 0);
+      out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
+      in = new BufferedInputStream(socket.getInputStream, bufferSize)
+    }
+
+    def close() = {
+      if( socket!=null ) {
+        socket.close
+        socket = null
+        out = null
+        in = null
+      }
+    }
+
+    def flush() = {
+      out.flush
+    }
+
+    def send(frame:String) = {
+      out.write(frame.getBytes("UTF-8"))
+      out.write(0)
+      out.write('\n')
+    }
+
+    def send(frame:Array[Byte]) = {
+      out.write(frame)
+      out.write(0)
+      out.write('\n')
+    }
+
+    def skip():Unit = {
+      var c = in.read;
+      while( c >= 0 ) {
+        if( c==0 ) {
+          return;
+        }
+        c = in.read()
+      }
+      throw new EOFException()
+    }
+
+    def receive():String = {
+      val buffer = new ByteArrayOutputStream(500)
+      var c = in.read;
+      while( c >= 0 ) {
+        if( c==0 ) {
+          return new String(buffer.toByteArray, "UTF-8")
+        }
+        buffer.write(c);
+        c = in.read()
+      }
+      throw new EOFException()
+    }
+
+    def receive(expect:String):String = {
+      val rc = receive()
+      if( !rc.trimFront.startsWith(expect) ) {
+        throw new ProtocolException("Expected "+expect)
+      }
+      rc
+    }
+
+  }
+
+  class ProducerThread(val id: Int) extends Thread {
+    val name: String = "producer " + id;
+    var client:StompClient=null
+    val content = ("SEND\n" +
+              "destination:"+destination(id)+"\n"+
+               { if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
+              "\n"+message(name)).getBytes("UTF-8")
+
+    override def run() {
+      while (!done.get) {
+        StompClient.connect { client =>
+          this.client=client
+          var i =0;
+          while (!done.get) {
+            client.send(content)
+            producerCounter.incrementAndGet();
+            Thread.sleep(producerSleep);
+            i += 1
+          }
+        }
+      }
+    }
+  }
+
+  def message(name:String) = {
+    val buffer = new StringBuffer(messageSize)
+    buffer.append("Message from " + name+"\n");
+    for( i <- buffer.length to messageSize ) {
+      buffer.append(('a'+(i%26)).toChar)
+    }
+    var rc = buffer.toString
+    if( rc.length > messageSize ) {
+      rc.substring(0, messageSize)
+    } else {
+      rc
+    }
+  }
+
+  class ConsumerThread(val id: Int) extends Thread {
+    val name: String = "producer " + id;
+    var client:StompClient=null
+
+    override def run() {
+      while (!done.get) {
+        StompClient.connect { client =>
+          this.client=client
+          val headers = Map[AsciiBuffer, AsciiBuffer]();
+          client.send("""
+SUBSCRIBE
+destination:"""+destination(id)+"""
+
+""")
+          client.flush
+
+          while (!done.get) {
+            client.skip
+            consumerCounter.incrementAndGet();
+            Thread.sleep(consumerSleep);
+          }
+        }
+      }
+    }
+  }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:43:29 2010
@@ -364,16 +364,15 @@ public class TcpTransport implements Tra
             }
 
             Object command=unmarshalSession.unmarshal(readBuffer);
-            if( command==null ) {
-                return;
-            }
-
-            listener.onCommand(command);
+            if( command!=null ) {
+                listener.onCommand(command);
 
-            // the transport may be suspended after processing a command.
-            if( transportState==DISPOSED || readSource.isSuspended() ) {
-                return;
+                // the transport may be suspended after processing a command.
+                if( transportState==DISPOSED || readSource.isSuspended() ) {
+                    return;
+                }
             }
+
         }
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Jul  7 03:43:29 2010
@@ -48,9 +48,10 @@ import static org.apache.activemq.transp
 public class TcpTransportFactory implements TransportFactory.TransportFactorySPI {
     private static final Log LOG = LogFactory.getLog(TcpTransportFactory.class);
 
-    public TransportServer bind(URI location) throws Exception {
-        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
-        TcpTransportServer server = createTcpTransportServer(location);
+    public TransportServer bind(String location) throws Exception {
+        URI uri = new URI(location);
+        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+        TcpTransportServer server = createTcpTransportServer(uri);
         server.setWireFormatFactory(TransportFactorySupport.createWireFormatFactory(options));
         IntrospectionSupport.setProperties(server, options);
         Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
@@ -67,12 +68,13 @@ public class TcpTransportFactory impleme
     }
 
 
-    public Transport connect(URI location) throws Exception {
-        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
-        URI localLocation = getLocalLocation(location);
+    public Transport connect(String location) throws Exception {
+        URI uri = new URI(location);
+        Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+        URI localLocation = getLocalLocation(uri);
 
         TcpTransport transport = new TcpTransport();
-        transport.connecting(location, localLocation);
+        transport.connecting(uri, localLocation);
 
         Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
         transport.setSocketOptions(socketOptions);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java Wed Jul  7 03:43:29 2010
@@ -21,5 +21,5 @@ package org.apache.activemq.transport;
  */
 public interface CompletionCallback {
     void onCompletion();
-    public void onFailure(Throwable caught);
+    public void onFailure(Exception caught);
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java Wed Jul  7 03:43:29 2010
@@ -34,14 +34,14 @@ public class TransportFactory {
     private static final ConcurrentHashMap<String, TransportFactorySPI> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactorySPI>();
 
     public interface TransportFactorySPI {
-        public TransportServer bind(URI location) throws Exception;
-        public Transport connect(URI location) throws Exception;
+        public TransportServer bind(String location) throws Exception;
+        public Transport connect(String location) throws Exception;
     }
     
     /**
      */
-    private static TransportFactorySPI factory(URI location) throws IOException {
-        String scheme = location.getScheme();
+    private static TransportFactorySPI factory(String location) throws IOException {
+        String scheme = FactoryFinder.getScheme(location);
         if (scheme == null) {
             throw new IOException("Transport not scheme specified: [" + location + "]");
         }
@@ -71,14 +71,14 @@ public class TransportFactory {
     /**
      * Creates a client transport.
      */
-    public static Transport connect(URI location) throws Exception {
+    public static Transport connect(String location) throws Exception {
         return factory(location).connect(location);
     }
 
     /**
      * Creates a transport server.
      */
-    public static TransportServer bind(URI location) throws Exception {
+    public static TransportServer bind(String location) throws Exception {
         return factory(location).bind(location);
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Wed Jul  7 03:43:29 2010
@@ -35,8 +35,8 @@ public class PipeTransportFactory implem
 
     public static final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
 
-    public TransportServer bind(URI uri) throws URISyntaxException, IOException {
-
+    public TransportServer bind(String location) throws URISyntaxException, IOException {
+        URI uri = new URI(location);
         Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
         String node = uri.getHost();
         synchronized(servers) {
@@ -58,8 +58,9 @@ public class PipeTransportFactory implem
 
     }
 
-    public Transport connect(URI location) throws IOException, URISyntaxException {
-        String name = location.getHost();
+    public Transport connect(String location) throws IOException, URISyntaxException {
+        URI uri = new URI(location);
+        String name = uri.getHost();
         synchronized(servers) {
             PipeTransportServer server = lookup(name);
             if (server == null) {
@@ -67,7 +68,7 @@ public class PipeTransportFactory implem
             }
             PipeTransport transport = server.connect();
 
-            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
             return verify( configure(transport, options), options);
         }
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java Wed Jul  7 03:43:29 2010
@@ -31,6 +31,14 @@ public class FactoryFinder {
         this.path = path;
     }
 
+    public static String getScheme(String uri) {
+        String split[] = uri.split(":");
+        if (split.length < 2 ) {
+            return null;
+        }
+        return split[0];
+    }
+
     /**
      * Creates a new instance of the given key
      *