You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:43:29 UTC
svn commit: r961073 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/
activemq-broker/src/test/scala/org/apache/active...
Author: chirino
Date: Wed Jul 7 03:43:29 2010
New Revision: 961073
URL: http://svn.apache.org/viewvc?rev=961073&view=rev
Log:
- relaced used of uri in the tansport factory methods with the more generic string
- added stomp broker main object
- added stomp load client
Added:
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala
- copied, changed from r961072, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:43:29 2010
@@ -52,11 +52,11 @@ object BrokerFactory {
* @throws Exception
*/
def createBroker(brokerURI:String, startBroker:Boolean=false):Broker = {
- var split = brokerURI.split(":")
- if (split.length < 2 ) {
+ var scheme = FactoryFinder.getScheme(brokerURI)
+ if (scheme==null ) {
throw new IllegalArgumentException("Invalid broker URI, no scheme specified: " + brokerURI)
}
- var handler = createHandler(split(0))
+ var handler = createHandler(scheme)
var broker = handler.createBroker(brokerURI)
if (startBroker) {
broker.start();
@@ -300,14 +300,16 @@ class Queue(val destination:Destination)
override val queue:DispatchQueue = createQueue("queue:"+destination);
queue.setTargetQueue(getRandomThreadQueue)
- setDisposer(^{
- queue.release
- })
val delivery_buffer = new DeliveryBuffer
delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
- val delivery_sessions = new DeliveryCreditBufferProtocol(delivery_buffer, queue)
+ val session_manager = new DeliverySessionManager(delivery_buffer, queue)
+
+ setDisposer(^{
+ queue.release
+ session_manager.release
+ })
class ConsumerState(val consumer:DeliverySession) {
var bound=true
@@ -376,7 +378,7 @@ class Queue(val destination:Destination)
def open_session(producer_queue:DispatchQueue) = new DeliverySession {
- val session = delivery_sessions.session(producer_queue)
+ val session = session_manager.session(producer_queue)
val consumer = Queue.this
retain
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:43:29 2010
@@ -461,9 +461,9 @@ class DeliveryOverflowBuffer(val deliver
}
-class DeliveryCreditBufferProtocol(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained with Suspendable {
+class DeliverySessionManager(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained {
- var sessions = List[CreditServer]()
+ var sessions = List[SessionServer]()
var session_min_credits = 1024*4;
var session_credit_capacity = 1024*32
@@ -478,10 +478,7 @@ class DeliveryCreditBufferProtocol(val d
// use a event aggregating source to coalesce multiple events from the same thread.
val source = createSource(new ListEventAggregator[Delivery](), queue)
source.setEventHandler(^{drain_source});
-
- def suspend() = source.suspend
- def resume() = source.resume
- def isSuspended() = source.isSuspended
+ source.resume
def drain_source = {
val deliveries = source.getData
@@ -491,7 +488,7 @@ class DeliveryCreditBufferProtocol(val d
}
}
- class CreditServer(val producer_queue:DispatchQueue) {
+ class SessionServer(val producer_queue:DispatchQueue) {
private var _capacity = 0
def capacity(value:Int) = {
@@ -504,9 +501,9 @@ class DeliveryCreditBufferProtocol(val d
client.drain(callback)
}
- val client = new CreditClient()
+ val client = new SessionClient()
- class CreditClient() extends DeliveryOverflowBuffer(delivery_buffer) {
+ class SessionClient() extends DeliveryOverflowBuffer(delivery_buffer) {
producer_queue.retain
val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
@@ -560,8 +557,8 @@ class DeliveryCreditBufferProtocol(val d
}
}
- def session(queue:DispatchQueue) = {
- val session = new CreditServer(queue)
+ def session(producer_queue:DispatchQueue) = {
+ val session = new SessionServer(producer_queue)
sessions = session :: sessions
session.capacity(session_max_credits)
session.client
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul 7 03:43:29 2010
@@ -96,21 +96,21 @@ class VMTransportFactory extends PipeTra
}
}
- override def bind(uri:URI):TransportServer = {
+ override def bind(uri:String):TransportServer = {
new VmTransportServer();
}
- override def connect(location:URI):Transport = {
+ override def connect(location:String):Transport = {
try {
-
+ var uri = new URI(location)
var brokerURI:String = null;
var create = true;
- var name = location.getHost();
+ var name = uri.getHost();
if (name == null) {
name = DEFAULT_PIPE_NAME;
}
- var options = URISupport.parseParamters(location);
+ var options = URISupport.parseParamters(uri);
var config = options.remove("broker").asInstanceOf[String]
if (config != null) {
brokerURI = config;
@@ -139,7 +139,7 @@ class VMTransportFactory extends PipeTra
}
// We want to use a vm transport server impl.
- var vmTransportServer = TransportFactory.bind(new URI("vm://" + name+"?wireFormat=null")).asInstanceOf[VmTransportServer]
+ var vmTransportServer = TransportFactory.bind("vm://" + name+"?wireFormat=null").asInstanceOf[VmTransportServer]
vmTransportServer.setBroker(broker);
broker.transportServers.add(vmTransportServer);
broker.start();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul 7 03:43:29 2010
@@ -42,7 +42,7 @@ abstract class RemoteConsumer extends Co
var destination: Destination = null
var selector: String = null;
var durable = false;
- var uri: URI = null
+ var uri: String = null
override def start() = {
consumerRate.name("Consumer " + name + " Rate");
@@ -78,7 +78,7 @@ abstract class RemoteProducer extends Co
var filler: String = null
var payloadSize = 20
- var uri: URI = null
+ var uri: String = null
override def start() = {
@@ -238,7 +238,7 @@ abstract class BaseBrokerPerfTest {
consumerCount = 1;
createConnections();
- producers.get(0).thinkTime = 50;
+ producers.get(0).thinkTime = 500000*1000;
// Start 'em up.
startClients();
@@ -549,7 +549,7 @@ abstract class BaseBrokerPerfTest {
}
}
- consumer.uri = new URI(rcvBroker.connectUris.head)
+ consumer.uri = rcvBroker.connectUris.head
consumer.destination = destination
consumer.name = "consumer" + (i + 1)
consumer.totalConsumerRate = totalConsumerRate
@@ -568,7 +568,7 @@ abstract class BaseBrokerPerfTest {
}
}
}
- producer.uri = new URI(sendBroker.connectUris.head)
+ producer.uri = sendBroker.connectUris.head
producer.producerId = id + 1
producer.name = "producer" + (id + 1)
producer.destination = destination
@@ -581,7 +581,7 @@ abstract class BaseBrokerPerfTest {
private def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
val broker = new Broker()
- broker.transportServers.add(TransportFactory.bind(new URI(bindURI)))
+ broker.transportServers.add(TransportFactory.bind(bindURI))
broker.connectUris.add(connectUri)
broker
}
@@ -622,6 +622,8 @@ abstract class BaseBrokerPerfTest {
for (connection <- consumers) {
connection.start();
}
+ })
+ getGlobalQueue.dispatchAfter(400, TimeUnit.MILLISECONDS, ^{
for (connection <- producers) {
connection.start();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Wed Jul 7 03:43:29 2010
@@ -36,7 +36,7 @@ public class VMTransportTest {
@Test()
public void autoCreateBroker() throws Exception {
- Transport connect = TransportFactory.connect(new URI("vm://test1?wireFormat=mock"));
+ Transport connect = TransportFactory.connect("vm://test1?wireFormat=mock");
connect.start();
assertNotNull(connect);
connect.stop();
@@ -44,12 +44,12 @@ public class VMTransportTest {
@Test(expected=IOException.class)
public void noAutoCreateBroker() throws Exception {
- TransportFactory.connect(new URI("vm://test2?create=false&wireFormat=mock"));
+ TransportFactory.connect("vm://test2?create=false&wireFormat=mock");
}
@Test(expected=IllegalArgumentException.class)
public void badOptions() throws Exception {
- TransportFactory.connect(new URI("vm://test3?crazy-option=false&wireFormat=mock"));
+ TransportFactory.connect("vm://test3?crazy-option=false&wireFormat=mock");
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java Wed Jul 7 03:43:29 2010
@@ -53,7 +53,7 @@ public class BrokerXml {
for (String element : transportServers) {
TransportServer server;
try {
- server = TransportFactory.bind(new URI(element));
+ server = TransportFactory.bind(element);
} catch (Exception e) {
throw new Exception("Unable to bind transport server '"+element+" due to: "+e.getMessage(), e);
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala (from r961072, activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java&r1=961072&r2=961073&rev=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompBroker.scala Wed Jul 7 03:43:29 2010
@@ -14,12 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.transport;
+package org.apache.activemq.apollo.stomp
+
+import org.apache.activemq.apollo.broker.Broker
+import org.apache.activemq.transport.TransportFactory
/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public interface CompletionCallback {
- void onCompletion();
- public void onFailure(Throwable caught);
+object StompBroker {
+
+ var address = "0.0.0.0"
+ var port = 61613
+
+ def main(args:Array[String]) = {
+ println("Starting stomp broker...")
+
+ val broker = new Broker()
+
+ val uri = "tcp://"+address+":"+port+"?wireFormat=multi"
+ val server = TransportFactory.bind(uri)
+ broker.transportServers.add(server)
+ broker.start
+
+ println("Startup complete.")
+ System.in.read
+ println("Shutting down...")
+ broker.stop
+ println("Shutdown complete.")
+ }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:43:29 2010
@@ -27,14 +27,15 @@ import _root_.org.fusesource.hawtdispatc
import AsciiBuffer._
import Stomp._
import BufferConversions._
-import StompFrameConstants._;
+import StompFrameConstants._
+import org.apache.activemq.transport.CompletionCallback;
class StompProtocolException(msg:String) extends Exception(msg)
object StompConstants {
- val QUEUE_PREFIX = new AsciiBuffer("/topic/")
- val TOPIC_PREFIX = new AsciiBuffer("/queue/")
+ val QUEUE_PREFIX = new AsciiBuffer("/queue/")
+ val TOPIC_PREFIX = new AsciiBuffer("/topic/")
implicit def toDestination(value:AsciiBuffer):Destination = {
if( value.startsWith(QUEUE_PREFIX) ) {
@@ -56,15 +57,18 @@ class StompProtocolHandler extends Proto
class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
val queue = StompProtocolHandler.this.dispatchQueue
- queue.retain
- setDisposer(^{ queue.release })
+ val session_manager = new DeliverySessionManager(outboundChannel, queue)
- val deliveryQueue = new DeliveryCreditBufferProtocol(outboundChannel, queue)
+ queue.retain
+ setDisposer(^{
+ session_manager.release
+ queue.release
+ })
def matches(message:Delivery) = true
def open_session(producer_queue:DispatchQueue) = new DeliverySession {
- val session = deliveryQueue.session(producer_queue)
+ val session = session_manager.session(producer_queue)
val consumer = SimpleConsumer.this
retain
@@ -79,7 +83,7 @@ class StompProtocolHandler extends Proto
}
def dispatchQueue = connection.dispatchQueue
- val outboundChannel = new DeliveryBuffer
+ val outboundChannel = new DeliveryBuffer
var closed = false
var consumer:SimpleConsumer = null
@@ -88,17 +92,30 @@ class StompProtocolHandler extends Proto
var producerRoute:DeliveryProducerRoute=null
var host:VirtualHost = null
+ outboundChannel.eventHandler = ^{
+ var delivery = outboundChannel.receive
+ while( delivery!=null ) {
+ connection.transport.oneway(delivery.message, new CompletionCallback() {
+ def onCompletion() = {
+ outboundChannel.ack(delivery)
+ }
+ def onFailure(e:Exception) = {
+ StompProtocolHandler.this.onException(e)
+ }
+ });
+ }
+ }
+
+
private def queue = connection.dispatchQueue
def setConnection(connection:BrokerConnection) = {
this.connection = connection
// We will be using the default virtual host
- println("waiting for host")
connection.transport.suspendRead
connection.broker.runtime.getDefaultVirtualHost(
queue.wrap { (host)=>
- println("got host")
this.host=host
connection.transport.resumeRead
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:43:29 2010
@@ -125,7 +125,7 @@ class StompRemoteProducer extends Remote
dispatchQueue << task
}
}
- def onFailure(error:Throwable) = {
+ def onFailure(error:Exception) = {
println("stopping due to: "+error);
stop
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961073&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul 7 03:43:29 2010
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.perf
+
+import _root_.java.io._
+import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import _root_.org.apache.activemq.util.buffer.AsciiBuffer
+import java.net.{ProtocolException, InetSocketAddress, URI, Socket}
+
+import java.lang.String._
+import java.util.concurrent.TimeUnit._
+import collection.mutable.Map
+
+/**
+ *
+ * Simulates load on the a stomp broker.
+ *
+ */
+object StompLoadClient {
+
+ val NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+ import StompLoadClient._
+ implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
+
+ var producerSleep = 0;
+ var consumerSleep = 0;
+ var producers = 1;
+ var consumers = 1;
+ var sampleInterval = 5 * 1000;
+ var uri = "stomp://127.0.0.1:61613";
+ var bufferSize = 64*1204
+ var messageSize = 1024;
+ var useContentLength=true
+
+ var destinationType = "queue";
+ var destinationCount = 1;
+
+ val producerCounter = new AtomicLong();
+ val consumerCounter = new AtomicLong();
+ val done = new AtomicBoolean()
+
+ def main(args:Array[String]) = run
+
+ def run() = {
+
+ println("=======================")
+ println("Press ENTER to shutdown");
+ println("=======================")
+ println("")
+
+
+ done.set(false)
+ var producerThreads = List[ProducerThread]()
+ for (i <- 0 until producers) {
+ val producerThread = new ProducerThread(i);
+ producerThreads = producerThread :: producerThreads
+ producerThread.start();
+ }
+
+ var consumerThreads = List[ConsumerThread]()
+ for (i <- 0 until consumers) {
+ val consumerThread = new ConsumerThread(i);
+ consumerThreads = consumerThread :: consumerThreads
+ consumerThread.start();
+ }
+
+ // start a sampling thread...
+ val sampleThread = new Thread() {
+ override def run() = {
+ try {
+ var start = System.nanoTime();
+ while( !done.get ) {
+ Thread.sleep(sampleInterval)
+ val end = System.nanoTime();
+ printRate("Producer", producerCounter, end - start);
+ printRate("Consumer", consumerCounter, end - start);
+ start = end;
+ }
+ } catch {
+ case e:InterruptedException =>
+ }
+ }
+ }
+ sampleThread.start()
+
+
+ System.in.read()
+ println("=======================")
+ done.set(true)
+
+ // wait for the threads to finish..
+ for( thread <- consumerThreads ) {
+ thread.client.close
+ thread.interrupt
+ thread.join
+ }
+ for( thread <- producerThreads ) {
+ thread.client.close
+ thread.interrupt
+ thread.join
+ }
+ sampleThread.interrupt
+ sampleThread.join
+
+ println("Shutdown");
+ println("=======================")
+
+ }
+
+ override def toString() = {
+ "--------------------------------------\n"+
+ "StompLoadClient Properties\n"+
+ "--------------------------------------\n"+
+ "uri = "+uri+"\n"+
+ "producers = "+producers+"\n"+
+ "consumers = "+consumers+"\n"+
+ "destinationType = "+destinationType+"\n"+
+ "destinationCount = "+destinationCount+"\n" +
+ "messageSize = "+messageSize+"\n"+
+ "producerSleep = "+producerSleep+"\n"+
+ "consumerSleep = "+consumerSleep+"\n"+
+ "bufferSize = "+bufferSize+"\n"+
+ "useContentLength = "+useContentLength+"\n"+
+ "sampleInterval = "+sampleInterval+"\n"
+ }
+
+ def printRate(name: String, counter: AtomicLong, nanos: Long) = {
+ val c = counter.getAndSet(0);
+ val rate_per_second: java.lang.Float = ((1.0f * c / nanos) * NANOS_PER_SECOND);
+ println(format("%s rate: %,.3f per second", name, rate_per_second));
+ }
+
+ def destination(i:Int) = "/"+destinationType+"/load-"+(i%destinationCount)
+
+
+ object StompClient {
+ def connect(proc: StompClient=>Unit ) = {
+ val client = new StompClient();
+ try {
+ val connectUri = new URI(uri);
+ client.open(connectUri.getHost(), connectUri.getPort());
+ client.send("""CONNECT
+
+""")
+ client.flush
+ client.receive("CONNECTED")
+ proc(client)
+ } catch {
+ case e: Throwable =>
+ if(!done.get) {
+ println("failure occured: "+e);
+ Thread.sleep(1000);
+ }
+ } finally {
+ try {
+ client.close();
+ } catch {
+ case ignore: Throwable =>
+ }
+ }
+ }
+ }
+
+ class StompClient {
+
+ var socket:Socket = null
+ var out:OutputStream = null;
+ var in:InputStream = null
+
+ def open(host: String, port: Int) = {
+ socket = new Socket
+ socket.connect(new InetSocketAddress(host, port))
+ socket.setSoLinger(true, 0);
+ out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
+ in = new BufferedInputStream(socket.getInputStream, bufferSize)
+ }
+
+ def close() = {
+ if( socket!=null ) {
+ socket.close
+ socket = null
+ out = null
+ in = null
+ }
+ }
+
+ def flush() = {
+ out.flush
+ }
+
+ def send(frame:String) = {
+ out.write(frame.getBytes("UTF-8"))
+ out.write(0)
+ out.write('\n')
+ }
+
+ def send(frame:Array[Byte]) = {
+ out.write(frame)
+ out.write(0)
+ out.write('\n')
+ }
+
+ def skip():Unit = {
+ var c = in.read;
+ while( c >= 0 ) {
+ if( c==0 ) {
+ return;
+ }
+ c = in.read()
+ }
+ throw new EOFException()
+ }
+
+ def receive():String = {
+ val buffer = new ByteArrayOutputStream(500)
+ var c = in.read;
+ while( c >= 0 ) {
+ if( c==0 ) {
+ return new String(buffer.toByteArray, "UTF-8")
+ }
+ buffer.write(c);
+ c = in.read()
+ }
+ throw new EOFException()
+ }
+
+ def receive(expect:String):String = {
+ val rc = receive()
+ if( !rc.trimFront.startsWith(expect) ) {
+ throw new ProtocolException("Expected "+expect)
+ }
+ rc
+ }
+
+ }
+
+ class ProducerThread(val id: Int) extends Thread {
+ val name: String = "producer " + id;
+ var client:StompClient=null
+ val content = ("SEND\n" +
+ "destination:"+destination(id)+"\n"+
+ { if(useContentLength) "content-length:"+messageSize+"\n" else "" } +
+ "\n"+message(name)).getBytes("UTF-8")
+
+ override def run() {
+ while (!done.get) {
+ StompClient.connect { client =>
+ this.client=client
+ var i =0;
+ while (!done.get) {
+ client.send(content)
+ producerCounter.incrementAndGet();
+ Thread.sleep(producerSleep);
+ i += 1
+ }
+ }
+ }
+ }
+ }
+
+ def message(name:String) = {
+ val buffer = new StringBuffer(messageSize)
+ buffer.append("Message from " + name+"\n");
+ for( i <- buffer.length to messageSize ) {
+ buffer.append(('a'+(i%26)).toChar)
+ }
+ var rc = buffer.toString
+ if( rc.length > messageSize ) {
+ rc.substring(0, messageSize)
+ } else {
+ rc
+ }
+ }
+
+ class ConsumerThread(val id: Int) extends Thread {
+ val name: String = "producer " + id;
+ var client:StompClient=null
+
+ override def run() {
+ while (!done.get) {
+ StompClient.connect { client =>
+ this.client=client
+ val headers = Map[AsciiBuffer, AsciiBuffer]();
+ client.send("""
+SUBSCRIBE
+destination:"""+destination(id)+"""
+
+""")
+ client.flush
+
+ while (!done.get) {
+ client.skip
+ consumerCounter.incrementAndGet();
+ Thread.sleep(consumerSleep);
+ }
+ }
+ }
+ }
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:43:29 2010
@@ -364,16 +364,15 @@ public class TcpTransport implements Tra
}
Object command=unmarshalSession.unmarshal(readBuffer);
- if( command==null ) {
- return;
- }
-
- listener.onCommand(command);
+ if( command!=null ) {
+ listener.onCommand(command);
- // the transport may be suspended after processing a command.
- if( transportState==DISPOSED || readSource.isSuspended() ) {
- return;
+ // the transport may be suspended after processing a command.
+ if( transportState==DISPOSED || readSource.isSuspended() ) {
+ return;
+ }
}
+
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Jul 7 03:43:29 2010
@@ -48,9 +48,10 @@ import static org.apache.activemq.transp
public class TcpTransportFactory implements TransportFactory.TransportFactorySPI {
private static final Log LOG = LogFactory.getLog(TcpTransportFactory.class);
- public TransportServer bind(URI location) throws Exception {
- Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
- TcpTransportServer server = createTcpTransportServer(location);
+ public TransportServer bind(String location) throws Exception {
+ URI uri = new URI(location);
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+ TcpTransportServer server = createTcpTransportServer(uri);
server.setWireFormatFactory(TransportFactorySupport.createWireFormatFactory(options));
IntrospectionSupport.setProperties(server, options);
Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
@@ -67,12 +68,13 @@ public class TcpTransportFactory impleme
}
- public Transport connect(URI location) throws Exception {
- Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
- URI localLocation = getLocalLocation(location);
+ public Transport connect(String location) throws Exception {
+ URI uri = new URI(location);
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
+ URI localLocation = getLocalLocation(uri);
TcpTransport transport = new TcpTransport();
- transport.connecting(location, localLocation);
+ transport.connecting(uri, localLocation);
Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
transport.setSocketOptions(socketOptions);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java Wed Jul 7 03:43:29 2010
@@ -21,5 +21,5 @@ package org.apache.activemq.transport;
*/
public interface CompletionCallback {
void onCompletion();
- public void onFailure(Throwable caught);
+ public void onFailure(Exception caught);
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java Wed Jul 7 03:43:29 2010
@@ -34,14 +34,14 @@ public class TransportFactory {
private static final ConcurrentHashMap<String, TransportFactorySPI> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactorySPI>();
public interface TransportFactorySPI {
- public TransportServer bind(URI location) throws Exception;
- public Transport connect(URI location) throws Exception;
+ public TransportServer bind(String location) throws Exception;
+ public Transport connect(String location) throws Exception;
}
/**
*/
- private static TransportFactorySPI factory(URI location) throws IOException {
- String scheme = location.getScheme();
+ private static TransportFactorySPI factory(String location) throws IOException {
+ String scheme = FactoryFinder.getScheme(location);
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + location + "]");
}
@@ -71,14 +71,14 @@ public class TransportFactory {
/**
* Creates a client transport.
*/
- public static Transport connect(URI location) throws Exception {
+ public static Transport connect(String location) throws Exception {
return factory(location).connect(location);
}
/**
* Creates a transport server.
*/
- public static TransportServer bind(URI location) throws Exception {
+ public static TransportServer bind(String location) throws Exception {
return factory(location).bind(location);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Wed Jul 7 03:43:29 2010
@@ -35,8 +35,8 @@ public class PipeTransportFactory implem
public static final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
- public TransportServer bind(URI uri) throws URISyntaxException, IOException {
-
+ public TransportServer bind(String location) throws URISyntaxException, IOException {
+ URI uri = new URI(location);
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
String node = uri.getHost();
synchronized(servers) {
@@ -58,8 +58,9 @@ public class PipeTransportFactory implem
}
- public Transport connect(URI location) throws IOException, URISyntaxException {
- String name = location.getHost();
+ public Transport connect(String location) throws IOException, URISyntaxException {
+ URI uri = new URI(location);
+ String name = uri.getHost();
synchronized(servers) {
PipeTransportServer server = lookup(name);
if (server == null) {
@@ -67,7 +68,7 @@ public class PipeTransportFactory implem
}
PipeTransport transport = server.connect();
- Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
+ Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
return verify( configure(transport, options), options);
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java?rev=961073&r1=961072&r2=961073&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/FactoryFinder.java Wed Jul 7 03:43:29 2010
@@ -31,6 +31,14 @@ public class FactoryFinder {
this.path = path;
}
+ public static String getScheme(String uri) {
+ String split[] = uri.split(":");
+ if (split.length < 2 ) {
+ return null;
+ }
+ return split[0];
+ }
+
/**
* Creates a new instance of the given key
*