You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:42:45 UTC
svn commit: r961071 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
activemq-jaxb/src/main/java/org/apache/activemq/a...
Author: chirino
Date: Wed Jul 7 03:42:44 2010
New Revision: 961071
URL: http://svn.apache.org/viewvc?rev=961071&view=rev
Log:
working on getting a stimple use case running
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
- copied, changed from r961070, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (from r961070, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala&r1=961070&r2=961071&rev=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ScalaBroker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:42:44 2010
@@ -27,6 +27,7 @@ import _root_.org.fusesource.hawtdispatc
import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
import _root_.scala.collection.JavaConversions._
+import _root_.scala.reflect.BeanProperty
object BrokerFactory {
@@ -94,56 +95,25 @@ class Broker() extends Service with Logg
import BrokerConstants._
override protected def log = BrokerConstants
- class BrokerAcceptListener extends TransportAcceptListener {
- def onAcceptError(error: Exception): Unit = {
- warn("Accept error: " + error)
- debug("Accept error details: ", error)
- }
-
- def onAccept(transport: Transport): Unit = {
- var connection = new BrokerConnection(Broker.this)
- connection.transport = transport
- clientConnections.add(connection)
- try {
- connection.start
- }
- catch {
- case e1: Exception => {
- onAcceptError(e1)
- }
- }
- }
- }
-
- val q = createQueue("broker");
-
- var connectUris: List[String] = Nil
+ // The configuration state of the broker... It can be modified directly until the broker
+ // is started.
+ @BeanProperty
+ val connectUris: ArrayList[String] = new ArrayList[String]
+ @BeanProperty
val virtualHosts: LinkedHashMap[AsciiBuffer, VirtualHost] = new LinkedHashMap[AsciiBuffer, VirtualHost]
+ @BeanProperty
val transportServers: ArrayList[TransportServer] = new ArrayList[TransportServer]
- val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
+ @BeanProperty
var dataDirectory: File = null
- var state = CONFIGURATION
+ @BeanProperty
var name = "broker";
+ @BeanProperty
var defaultVirtualHost: VirtualHost = null
- def removeConnectUri(uri: String): Unit = ^ {
- this.connectUris = this.connectUris.filterNot(_==uri)
- } ->: q
-
- def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
- virtualHosts.get(name)
- } ->: q
-
- def getConnectUris(cb: (List[String]) => Unit) = callback(cb) {
- connectUris
- } ->: q
+ def start = runtime.start
+ def stop = runtime.stop
-
- def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
- defaultVirtualHost
- } ->: q
-
- def addVirtualHost(host: VirtualHost) = ^ {
+ def addVirtualHost(host: VirtualHost) = {
if (host.names.isEmpty) {
throw new IllegalArgumentException("Virtual host must be configured with at least one host name.")
}
@@ -156,138 +126,127 @@ class Broker() extends Service with Logg
virtualHosts.put(name, host)
}
if (defaultVirtualHost == null) {
- setDefaultVirtualHost(host)
- }
- } ->: q
-
- def addTransportServer(server: TransportServer) = ^ {
- state match {
- case RUNNING =>
- start(server)
- case CONFIGURATION =>
- this.transportServers.add(server)
- case _ =>
- throw new IllegalStateException("Cannot add a transport server when broker is: " + state)
- }
- } ->: q
-
- def removeTransportServer(server: TransportServer) = ^ {
- state match {
- case RUNNING =>
- stopTransportServerWrapException(server)
- case STOPPED =>
- this.transportServers.remove(server)
- case CONFIGURATION =>
- this.transportServers.remove(server)
- case _ =>
- throw new IllegalStateException("Cannot add a transport server when broker is: " + state)
- }
- } ->: q
-
-
- def getState(cb: (String) => Unit) = callback(cb) {state} ->: q
-
-
- def addConnectUri(uri: String) = ^ {
- this.connectUris = this.connectUris ::: uri::Nil
- } ->: q
-
- def removeVirtualHost(host: VirtualHost) = ^ {
- for (name <- host.names) {
- virtualHosts.remove(name)
- }
- if (host == defaultVirtualHost) {
- if (virtualHosts.isEmpty) {
- defaultVirtualHost = null
- }
- else {
- defaultVirtualHost = virtualHosts.values.iterator.next
- }
+ defaultVirtualHost = host
}
- } ->: q
-
- def setDefaultVirtualHost(defaultVirtualHost: VirtualHost) = ^ {
- this.defaultVirtualHost = defaultVirtualHost
- } ->: q
-
- def getName(cb: (String) => Unit) = callback(cb) {
- name;
- } ->: q
-
-
- private def start(server: TransportServer): Unit = {
- server.setDispatchQueue(q)
- server.setAcceptListener(new BrokerAcceptListener)
- server.start
}
-
- final def stop: Unit = ^ {
- if (state == RUNNING) {
- state = STOPPING
-
- for (server <- transportServers) {
- stop(server)
- }
- for (connection <- clientConnections) {
- stop(connection)
- }
- for (virtualHost <- virtualHosts.values) {
- stop(virtualHost)
+ // Holds the runtime state of the broker all access should be serialized
+ // via a the dispatch queue and therefore all requests are setup to return
+ // results via callbacks.
+ object runtime {
+
+ class BrokerAcceptListener extends TransportAcceptListener {
+ def onAcceptError(error: Exception): Unit = {
+ error.printStackTrace
+ warn("Accept error: " + error)
+ debug("Accept error details: ", error)
+ }
+
+ def onAccept(transport: Transport): Unit = {
+ var connection = new BrokerConnection(Broker.this)
+ connection.transport = transport
+ clientConnections.add(connection)
+ try {
+ connection.start
+ }
+ catch {
+ case e1: Exception => {
+ onAcceptError(e1)
+ }
+ }
}
- state = STOPPED;
}
- } ->: q
-
- def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
- new ArrayList[VirtualHost](virtualHosts.values)
- } ->: q
-
- def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
- new ArrayList[TransportServer](transportServers)
- } ->: q
-
-
-
+ var state = CONFIGURATION
+ val dispatchQueue = createQueue("broker");
+ val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
+
+ def removeConnectUri(uri: String): Unit = ^ {
+ connectUris.remove(uri)
+ } ->: dispatchQueue
+
+ def getVirtualHost(name: AsciiBuffer, cb: (VirtualHost) => Unit) = callback(cb) {
+ virtualHosts.get(name)
+ } ->: dispatchQueue
+
+ def getConnectUris(cb: (ArrayList[String]) => Unit) = callback(cb) {
+ new ArrayList(connectUris)
+ } ->: dispatchQueue
+
+
+ def getDefaultVirtualHost(cb: (VirtualHost) => Unit) = callback(cb) {
+ defaultVirtualHost
+ } ->: dispatchQueue
+
+ def addVirtualHost(host: VirtualHost) = ^ {
+ Broker.this.addVirtualHost(host)
+ } ->: dispatchQueue
+
+ def getState(cb: (String) => Unit) = callback(cb) {state} ->: dispatchQueue
+
+ def addConnectUri(uri: String) = ^ {
+ connectUris.add(uri)
+ } ->: dispatchQueue
+
+ def getName(cb: (String) => Unit) = callback(cb) {
+ name;
+ } ->: dispatchQueue
+
+ def getVirtualHosts(cb: (ArrayList[VirtualHost]) => Unit) = callback(cb) {
+ new ArrayList[VirtualHost](virtualHosts.values)
+ } ->: dispatchQueue
+
+ def getTransportServers(cb: (ArrayList[TransportServer]) => Unit) = callback(cb) {
+ new ArrayList[TransportServer](transportServers)
+ } ->: dispatchQueue
+
+ def start = ^ {
+ if (state == CONFIGURATION) {
+ // We can apply defaults now
+ if (dataDirectory == null) {
+ dataDirectory = new File(IOHelper.getDefaultDataDirectory)
+ }
- def start = ^ {
- if (state == CONFIGURATION) {
- // We can apply defaults now
- if (dataDirectory == null) {
- dataDirectory = new File(IOHelper.getDefaultDataDirectory)
- }
+ if (defaultVirtualHost == null) {
+ defaultVirtualHost = new VirtualHost()
+ defaultVirtualHost.broker = Broker.this
+ defaultVirtualHost.names = DEFAULT_VIRTUAL_HOST_NAME.toString :: Nil
+ virtualHosts.put(DEFAULT_VIRTUAL_HOST_NAME, defaultVirtualHost)
+ }
- if (defaultVirtualHost == null) {
- defaultVirtualHost = new VirtualHost()
- defaultVirtualHost.broker = Broker.this
- defaultVirtualHost.names = DEFAULT_VIRTUAL_HOST_NAME.toString :: Nil
- virtualHosts.put(DEFAULT_VIRTUAL_HOST_NAME, defaultVirtualHost)
- }
+ state = STARTING
- state = STARTING
+ for (virtualHost <- virtualHosts.values) {
+ virtualHost.start
+ }
+ for (server <- transportServers) {
+ server.setDispatchQueue(dispatchQueue)
+ server.setAcceptListener(new BrokerAcceptListener)
+ server.start
+ }
+ state = RUNNING
+ } else {
+ warn("Can only start a broker that is in the " + CONFIGURATION + " state. Broker was " + state)
+ }
+ } ->: dispatchQueue
+
+ def stop: Unit = ^ {
+ if (state == RUNNING) {
+ state = STOPPING
- for (virtualHost <- virtualHosts.values) {
- virtualHost.start
- }
- for (server <- transportServers) {
- start(server)
+ for (server <- transportServers) {
+ stopService(server)
+ }
+ for (connection <- clientConnections) {
+ stopService(connection)
+ }
+ for (virtualHost <- virtualHosts.values) {
+ stopService(virtualHost)
+ }
+ state = STOPPED;
}
- state = RUNNING
- } else {
- warn("Can only start a broker that is in the " + CONFIGURATION + " state. Broker was " + state)
- }
- } ->: q
- private def stopTransportServerWrapException(server: TransportServer): Unit = {
- try {
- server.stop
- }
- catch {
- case e: Exception => {
- throw new RuntimeException(e)
- }
- }
+ } ->: dispatchQueue
}
@@ -295,7 +254,7 @@ class Broker() extends Service with Logg
* Helper method to help stop broker services and log error if they fail to start.
* @param server
*/
- private def stop(server: Service): Unit = {
+ private def stopService(server: Service): Unit = {
try {
server.stop
} catch {
@@ -339,16 +298,16 @@ object Queue {
*/
class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer {
-
-
override val queue:DispatchQueue = createQueue("queue:"+destination);
queue.setTargetQueue(getRandomThreadQueue)
setDisposer(^{
queue.release
})
-
val delivery_buffer = new DeliveryBuffer
+ delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
+
+ val delivery_sessions = new DeliveryCreditBufferProtocol(delivery_buffer, queue)
class ConsumerState(val consumer:DeliverySession) {
var bound=true
@@ -381,7 +340,7 @@ class Queue(val destination:Destination)
allConsumers += consumer->cs
readyConsumers.addLast(cs)
}
- delivery_buffer.eventHandler.run
+ drain_delivery_buffer
} ->: queue
def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
@@ -407,22 +366,22 @@ class Queue(val destination:Destination)
}
- delivery_buffer.eventHandler = ^{
- while( !readyConsumers.isEmpty && !delivery_buffer.isEmpty ) {
+ def drain_delivery_buffer: Unit = {
+ while (!readyConsumers.isEmpty && !delivery_buffer.isEmpty) {
val cs = readyConsumers.removeFirst
val delivery = delivery_buffer.receive
cs.deliver(delivery)
}
}
-
- val deliveryQueue = new DeliveryCreditBufferProtocol(delivery_buffer, queue)
def open_session(producer_queue:DispatchQueue) = new DeliverySession {
- val session = deliveryQueue.session(producer_queue)
+
+ val session = delivery_sessions.session(producer_queue)
val consumer = Queue.this
retain
def deliver(delivery:Delivery) = session.send(delivery)
+
def close = {
session.close
release
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:42:44 2010
@@ -38,29 +38,28 @@ abstract class Connection() extends Tran
var transport:Transport = null
var exceptionListener:ExceptionListener = null;
- def start() = {
+ def start() = ^{
transport.setDispatchQueue(dispatchQueue);
- transport.getDispatchQueue.release
- transport.setTransportListener(this);
+ transport.setTransportListener(Connection.this);
transport.start()
- }
+ } ->: dispatchQueue
- def stop() = {
+ def stop() = ^{
stopping=true
transport.stop()
dispatchQueue.release
- }
+ } ->: dispatchQueue
def onException(error:IOException) = {
- if (!stopping) {
- onFailure(error);
- }
+ if (!stopping) {
+ onFailure(error);
+ }
}
def onFailure(error:Exception) = {
- if (exceptionListener != null) {
- exceptionListener.exceptionThrown(error);
- }
+ if (exceptionListener != null) {
+ exceptionListener.exceptionThrown(error);
+ }
}
def onDisconnected() = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destination.scala Wed Jul 7 03:42:44 2010
@@ -89,7 +89,7 @@ object Destination {
}
}
-class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
+case class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
def getDestinations():Seq[Destination] = null;
def getDomain():AsciiBuffer = domain
@@ -97,8 +97,7 @@ class SingleDestination(var domain:Ascii
override def toString() = ""+domain+":"+name
}
-
-class MultiDestination(var destinations:List[Destination]=Nil) extends Destination {
+case class MultiDestination(var destinations:List[Destination]=Nil) extends Destination {
def getDestinations():Seq[Destination] = destinations;
def getDomain():AsciiBuffer = null
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala Wed Jul 7 03:42:44 2010
@@ -18,15 +18,10 @@ package org.apache.activemq.apollo.broke
import _root_.java.util.{LinkedHashMap, HashMap}
-class TransactionManagerConfig {
-
- def apply(host:VirtualHost): TransactionManager = {
- new TransactionManager(host, this);
- }
-}
+class TransactionManager() {
+ var virtualHost:VirtualHost = null
-class TransactionManager(val virtualHost:VirtualHost, config:TransactionManagerConfig) {
// TODO:
// private static final Log LOG = LogFactory.getLog(TransactionManager.class);
// private static final String TX_QUEUE_PREFIX = "TX-";
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 03:42:44 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.apollo.broker;
import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.broker.store.memory.MemoryStore
import _root_.org.apache.activemq.broker.store.{Store}
import _root_.org.apache.activemq.Service
import _root_.java.lang.{String}
@@ -50,9 +51,9 @@ class VirtualHost() extends Service with
}
@BeanProperty
- var database:BrokerDatabase = null
+ var database:BrokerDatabase = new BrokerDatabase
@BeanProperty
- var txnManager:TransactionManager = null
+ var transactionManager:TransactionManager = new TransactionManager
def start():Unit = {
@@ -60,6 +61,7 @@ class VirtualHost() extends Service with
return;
}
+ database.virtualHost = this
database.start();
// router.setDatabase(database);
@@ -84,7 +86,8 @@ class VirtualHost() extends Service with
// }
//Recover transactions:
- txnManager.loadTransactions();
+ transactionManager.virtualHost = this
+ transactionManager.loadTransactions();
started = true;
}
@@ -212,7 +215,7 @@ class VirtualHost() extends Service with
class BrokerDatabase() extends Service {
@BeanProperty
- var store:Store=null;
+ var store:Store=new MemoryStore;
@BeanProperty
var virtualHost:VirtualHost=null;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul 7 03:42:44 2010
@@ -32,6 +32,7 @@ import _root_.org.junit.{Test, Before}
import org.apache.activemq.transport.TransportFactory
import _root_.scala.collection.JavaConversions._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
abstract class RemoteConsumer extends Connection {
@@ -48,6 +49,10 @@ abstract class RemoteConsumer extends Co
totalConsumerRate.add(consumerRate);
transport = TransportFactory.connect(uri);
super.start();
+ }
+
+
+ override def onConnected() = {
setupSubscription();
}
@@ -69,6 +74,7 @@ abstract class RemoteProducer extends Co
var property: String = null
var totalProducerRate: MetricAggregator = null
var next: Delivery = null
+ var thinkTime: Long = 0
var filler: String = null
var payloadSize = 20
@@ -89,10 +95,13 @@ abstract class RemoteProducer extends Co
transport = TransportFactory.connect(uri);
super.start();
- setupProducer();
}
+ override def onConnected() = {
+ setupProducer();
+ }
+
def setupProducer()
def createPayload(): String = {
@@ -119,7 +128,7 @@ def createPayload(): String = {
}
object BaseBrokerPerfTest {
- var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+ var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3000000"))
var IO_WORK_AMOUNT = 0
var FANIN_COUNT = 10
var FANOUT_COUNT = 10
@@ -229,6 +238,7 @@ abstract class BaseBrokerPerfTest {
consumerCount = 1;
createConnections();
+ producers.get(0).thinkTime = 50;
// Start 'em up.
startClients();
@@ -573,8 +583,6 @@ abstract class BaseBrokerPerfTest {
val broker = new Broker()
broker.transportServers.add(TransportFactory.bind(new URI(bindURI)))
broker.connectUris.add(connectUri)
- // TODO:
- // broker.defaultVirtualHost.setStore(createStore(broker))
broker
}
@@ -609,14 +617,15 @@ abstract class BaseBrokerPerfTest {
}
private def startClients() = {
-
- for (connection <- consumers) {
- connection.start();
- }
-
- for (connection <- producers) {
- connection.start();
- }
+ // Start the clients after a delay to give the server a chance to startup.
+ getGlobalQueue.dispatchAfter(200, TimeUnit.MILLISECONDS, ^{
+ for (connection <- consumers) {
+ connection.start();
+ }
+ for (connection <- producers) {
+ connection.start();
+ }
+ })
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java Wed Jul 7 03:42:44 2010
@@ -57,10 +57,10 @@ public class BrokerXml {
} catch (Exception e) {
throw new Exception("Unable to bind transport server '"+element+" due to: "+e.getMessage(), e);
}
- rc.addTransportServer(server);
+ rc.transportServers().add(server);
}
for (String element : connectUris) {
- rc.addConnectUri(element);
+ rc.connectUris().add(element);
}
return rc;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 03:42:44 2010
@@ -20,6 +20,7 @@ import _root_.java.util.LinkedList
import _root_.org.apache.activemq.apollo.broker.{BufferConversions, Destination, Message}
import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
import _root_.org.apache.activemq.util.buffer._
+import collection.mutable.ListBuffer
/**
*
@@ -27,6 +28,7 @@ import _root_.org.apache.activemq.util.b
*/
object StompFrameConstants {
type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
+ type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)]
var NO_DATA = new Buffer(0);
}
@@ -141,6 +143,7 @@ case class StompFrame(action:AsciiBuffer
destination = value
case (Stomp.Headers.Message.EXPIRATION_TIME, value) =>
expiration = java.lang.Long.parseLong(value)
+ case _ =>
}
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:42:44 2010
@@ -1,5 +1,5 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
@@ -16,14 +16,14 @@
*/
package org.apache.activemq.apollo.stomp
+import _root_.java.io.{DataOutput, DataInput, EOFException, IOException}
+import _root_.java.nio.channels.{ReadableByteChannel, SocketChannel}
import _root_.java.util.{LinkedList, ArrayList}
import _root_.org.apache.activemq.apollo.broker._
-import _root_.org.apache.activemq.wireformat.WireFormat
+import _root_.org.apache.activemq.wireformat.{WireFormatFactory, WireFormat}
import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
-import java.nio.channels.{SocketChannel}
import java.nio.ByteBuffer
-import java.io.{EOFException, IOException}
import _root_.org.apache.activemq.util.buffer._
import collection.mutable.{ListBuffer, HashMap}
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
@@ -101,9 +101,11 @@ class StompProtocolHandler extends Proto
this.connection = connection
// We will be using the default virtual host
- connection.broker.getDefaultVirtualHost(
+ connection.transport.suspendRead
+ connection.broker.runtime.getDefaultVirtualHost(
queue.wrap { (host)=>
this.host=host
+ connection.transport.resumeRead
}
)
}
@@ -111,20 +113,24 @@ class StompProtocolHandler extends Proto
def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
def onCommand(command:Any) = {
- val frame = command.asInstanceOf[StompFrame]
- frame match {
- case StompFrame(Commands.CONNECT, headers, _) =>
- on_stomp_connect(headers)
+ command match {
case StompFrame(Commands.SEND, headers, content) =>
- on_stomp_send(frame)
- case StompFrame(Commands.SUBSCRIBE, headers, content) =>
- on_stomp_subscribe(headers)
+ on_stomp_send(command.asInstanceOf[StompFrame])
case StompFrame(Commands.ACK, headers, content) =>
// TODO:
+ case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+ on_stomp_subscribe(headers)
+ case StompFrame(Commands.CONNECT, headers, _) =>
+ on_stomp_connect(headers)
case StompFrame(Commands.DISCONNECT, headers, content) =>
stop
+ case s:StompWireFormat =>
+ // this is passed on to us by the protocol discriminator
+ // so we know which wire format is being used.
case StompFrame(unknown, _, _) =>
die("Unsupported STOMP command: "+unknown);
+ case _ =>
+ die("Unsupported command: "+command);
}
}
@@ -230,6 +236,7 @@ class StompProtocolHandler extends Proto
def onException(error:Exception) = {
println("Shutting connection down due to: "+error)
+ error.printStackTrace
stop
}
@@ -252,6 +259,28 @@ class StompProtocolHandler extends Proto
}
}
+/**
+ * Creates WireFormat objects that marshalls the <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
+ */
+class StompWireFormatFactory extends WireFormatFactory {
+ import Stomp.Commands.CONNECT
+
+ def createWireFormat() = new StompWireFormat();
+
+ def isDiscriminatable() = true
+
+ def maxWireformatHeaderLength() = CONNECT.length+10;
+
+ def matchesWireformatHeader(header:Buffer) = {
+ if( header.length < CONNECT.length) {
+ false
+ } else {
+ // the magic can be preceded with newlines / whitespace..
+ header.trimFront.startsWith(CONNECT);
+ }
+ }
+}
+
object StompWireFormat {
val READ_BUFFFER_SIZE = 1024*64;
val MAX_COMMAND_LENGTH = 1024;
@@ -262,7 +291,7 @@ object StompWireFormat {
val SIZE_CHECK=false
}
-class StompWireFormat {
+class StompWireFormat extends WireFormat {
import StompWireFormat._
implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
@@ -270,48 +299,10 @@ class StompWireFormat {
ByteBuffer.wrap(Array(x));
}
- var outbound_frame: ByteBuffer = null
- /**
- * @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
- */
- def drain_source(socket:SocketChannel)(source: =>StompFrame ):Boolean = {
- while(true) {
- // if we have a pending frame that is being sent over the socket...
- if( outbound_frame!=null ) {
- socket.write(outbound_frame)
- if( outbound_frame.remaining != 0 ) {
- // non blocking socket returned before the buffers were fully written to disk..
- // we are not yet fully drained.. but need to quit now.
- return false
- } else {
- outbound_frame = null
- }
- } else {
-
- // marshall all the available frames..
- val buffer = new ByteArrayOutputStream()
- var frame = source
- while( frame!=null ) {
- marshall(buffer, frame)
- frame = source
- }
-
-
- if( buffer.size() ==0 ) {
- // the source is now drained...
- return true
- } else {
- val b = buffer.toBuffer;
- outbound_frame = ByteBuffer.wrap(b.data, b.offset, b.length)
- }
- }
- }
- true
- }
-
- def marshall(buffer:ByteArrayOutputStream, frame:StompFrame) = {
- buffer.write(frame.action)
- buffer.write(NEWLINE)
+ def marshal(command:Any, os:DataOutput) = {
+ val frame = command.asInstanceOf[StompFrame]
+ frame.action.writeTo(os)
+ os.write(NEWLINE)
// we can optimize a little if the headers and content are in the same buffer..
if( !frame.headers.isEmpty && !frame.content.isEmpty &&
@@ -321,224 +312,218 @@ class StompWireFormat {
val buffer1 = frame.headers.head._1;
val buffer2 = frame.content;
val length = (buffer2.offset-buffer1.offset)+buffer2.length
- buffer.write( buffer1.data, offset, length)
+ os.write( buffer1.data, offset, length)
} else {
for( (key, value) <- frame.headers ) {
- buffer.write(key)
- buffer.write(SEPERATOR)
- buffer.write(value)
- buffer.write(NEWLINE)
+ key.writeTo(os)
+ os.write(SEPERATOR)
+ value.writeTo(os)
+ os.write(NEWLINE)
}
-
- buffer.write(NEWLINE)
- buffer.write(frame.content)
+ os.write(NEWLINE)
+ frame.content.writeTo(os)
}
- buffer.write(END_OF_FRAME_BUFFER)
+ END_OF_FRAME_BUFFER.writeTo(os)
}
+ def marshal(command:Any):Buffer= {
+ val frame = command.asInstanceOf[StompFrame]
+ // make a little bigger since size can be an estimate and we want to avoid
+ // a capacity re-size.
+ val os = new DataByteArrayOutputStream(frame.size + 100);
+ marshal(frame, os)
+ os.toBuffer
+ }
- var read_pos = 0
- var read_offset = 0
- var read_data:Array[Byte] = new Array[Byte](READ_BUFFFER_SIZE)
- var read_bytebuffer:ByteBuffer = ByteBuffer.wrap(read_data)
+ def unmarshal(packet:Buffer) = {
+ throw new UnsupportedOperationException
+ }
+ def unmarshal(in: DataInput):Object = {
+ throw new UnsupportedOperationException
+ }
- def drain_socket(socket:SocketChannel)(handler:(StompFrame)=>Boolean) = {
- var done = false
+ def getName() = "stomp"
- // keep going until the socket buffer is drained.
- while( !done ) {
- val frame = unmarshall()
- if( frame!=null ) {
- // the handler might want us to stop looping..
- done = handler(frame)
- } else {
+ def getWireFormatFactory() = new StompWireFormatFactory
- // do we need to read in more data???
- if( read_pos==read_bytebuffer.position ) {
+ //
+ // state associated with un-marshalling stomp frames from
+ // a non-blocking NIO channel.
+ //
+ def createUnmarshalSession() = new StompUnmarshalSession
- // do we need a new data buffer to read data into??
- if(read_bytebuffer.remaining==0) {
+ class StompUnmarshalSession extends UnmarshalSession {
- // The capacity needed grows by powers of 2...
- val new_capacity = if( read_offset != 0 ) { READ_BUFFFER_SIZE } else { read_data.length << 2 }
- val tmp_buffer = new Array[Byte](new_capacity)
-
- // If there was un-consummed data.. copy it over...
- val size = read_pos - read_offset
- if( size > 0 ) {
- System.arraycopy(read_data, read_offset, tmp_buffer, 0, size)
- }
- read_data = tmp_buffer
- read_bytebuffer = ByteBuffer.wrap(read_data)
- read_bytebuffer.position(size)
- read_offset = 0;
- read_pos = size
+ type FrameReader = (ByteBuffer)=>StompFrame
- }
+ var next_action:FrameReader = read_action
+ var end = 0
+ var start = 0
- // Try to fill the buffer with data from the nio socket..
- var p = read_bytebuffer.position
- if( socket.read(read_bytebuffer) == -1 ) {
- throw new EOFException();
- }
- // we are done if there was no data on the socket.
- done = read_bytebuffer.position==p
- }
+ def getStartPos() = start
+ def setStartPos(pos:Int):Unit = {start=pos}
+
+ def getEndPos() = end
+ def setEndPos(pos:Int):Unit = { end = pos }
+
+ def unmarshal(buffer:ByteBuffer):Object = {
+ // keep running the next action until
+ // a frame is decoded or we run out of input
+ var rc:StompFrame = null
+ while( rc == null && end!=buffer.position ) {
+ rc = next_action(buffer)
}
+ rc
}
- }
+ def read_line(buffer:ByteBuffer, maxLength:Int, errorMessage:String):Buffer = {
+ val read_limit = buffer.position
+ while( end < read_limit ) {
+ if( buffer.array()(end) =='\n') {
+ var rc = new Buffer(buffer.array, start, end-start)
+ end += 1;
+ start = end;
+ return rc
+ }
+ if (SIZE_CHECK && end-start > maxLength) {
+ throw new IOException(errorMessage);
+ }
+ end += 1;
+ }
+ return null;
+ }
- type FrameReader = ()=>StompFrame
- var unmarshall:FrameReader = read_action
-
- def read_line( maxLength:Int, errorMessage:String):Buffer = {
- val read_limit = read_bytebuffer.position
- while( read_pos < read_limit ) {
- if( read_data(read_pos) =='\n') {
- var rc = new Buffer(read_data, read_offset, read_pos-read_offset)
- read_pos += 1;
- read_offset = read_pos;
- return rc
+ def read_action:FrameReader = (buffer)=> {
+ val line = read_line(buffer, MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
+ if( line !=null ) {
+ var action = line
+ if( TRIM ) {
+ action = action.trim();
}
- if (SIZE_CHECK && read_pos-read_offset > maxLength) {
- throw new IOException(errorMessage);
+ if (action.length() > 0) {
+ next_action = read_headers(action)
}
- read_pos += 1;
- }
- return null;
- }
-
-
- def read_action:FrameReader = ()=> {
- val line = read_line(MAX_COMMAND_LENGTH, "The maximum command length was exceeded")
- if( line !=null ) {
- var action = line
- if( TRIM ) {
- action = action.trim();
- }
- if (action.length() > 0) {
- unmarshall = read_headers(action)
}
+ null
}
- null
- }
-
- def read_headers(action:Buffer, headers:HeaderMap=Nil):FrameReader = ()=> {
- val line = read_line(MAX_HEADER_LENGTH, "The maximum header length was exceeded")
- if( line !=null ) {
- if( line.trim().length() > 0 ) {
- if (SIZE_CHECK && headers.size > MAX_HEADERS) {
- throw new IOException("The maximum number of headers was exceeded");
- }
+ def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> {
+ val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded")
+ if( line !=null ) {
+ if( line.trim().length() > 0 ) {
- try {
- val seperatorIndex = line.indexOf(SEPERATOR);
- if( seperatorIndex<0 ) {
- throw new IOException("Header line missing seperator [" + ascii(line) + "]");
- }
- var name = line.slice(0, seperatorIndex);
- if( TRIM ) {
- name = name.trim();
- }
- var value = line.slice(seperatorIndex + 1, line.length());
- if( TRIM ) {
- value = value.trim();
- }
- headers.add((ascii(name), ascii(value)));
- } catch {
- case e:Exception=>
- throw new IOException("Unable to parser header line [" + line + "]");
- }
+ if (SIZE_CHECK && headers.size > MAX_HEADERS) {
+ throw new IOException("The maximum number of headers was exceeded");
+ }
- } else {
- val contentLength = get(headers, CONTENT_LENGTH)
- if (contentLength.isDefined) {
- // Bless the client, he's telling us how much data to read in.
- var length=0;
try {
- length = Integer.parseInt(contentLength.get.trim().toString());
+ val seperatorIndex = line.indexOf(SEPERATOR);
+ if( seperatorIndex<0 ) {
+ throw new IOException("Header line missing seperator [" + ascii(line) + "]");
+ }
+ var name = line.slice(0, seperatorIndex);
+ if( TRIM ) {
+ name = name.trim();
+ }
+ var value = line.slice(seperatorIndex + 1, line.length());
+ if( TRIM ) {
+ value = value.trim();
+ }
+ headers.add((ascii(name), ascii(value)))
} catch {
- case e:NumberFormatException=>
- throw new IOException("Specified content-length is not a valid integer");
+ case e:Exception=>
+ e.printStackTrace
+ throw new IOException("Unable to parser header line [" + line + "]");
}
- if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
- throw new IOException("The maximum data length was exceeded");
- }
- unmarshall = read_binary_body(action, headers, length)
-
} else {
- unmarshall = read_text_body(action, headers)
+ val contentLength = get(headers, CONTENT_LENGTH)
+ if (contentLength.isDefined) {
+ // Bless the client, he's telling us how much data to read in.
+ var length=0;
+ try {
+ length = Integer.parseInt(contentLength.get.trim().toString());
+ } catch {
+ case e:NumberFormatException=>
+ throw new IOException("Specified content-length is not a valid integer");
+ }
+
+ if (SIZE_CHECK && length > MAX_DATA_LENGTH) {
+ throw new IOException("The maximum data length was exceeded");
+ }
+ next_action = read_binary_body(action, headers, length)
+
+ } else {
+ next_action = read_text_body(action, headers)
+ }
}
}
+ null
}
- null
- }
- def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
- val i = headers.iterator
- while( i.hasNext ) {
- val entry = i.next
- if( entry._1 == name ) {
- return Some(entry._2)
+ def get(headers:HeaderMapBuffer, name:AsciiBuffer):Option[AsciiBuffer] = {
+ val i = headers.iterator
+ while( i.hasNext ) {
+ val entry = i.next
+ if( entry._1 == name ) {
+ return Some(entry._2)
+ }
}
+ None
}
- None
- }
- def read_binary_body(action:Buffer, headers:HeaderMap, contentLength:Int):FrameReader = ()=> {
- val content:Buffer=read_content(contentLength)
- if( content != null ) {
- unmarshall = read_action
- new StompFrame(ascii(action), headers, content)
- } else {
- null
+ def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> {
+ val content:Buffer=read_content(buffer, contentLength)
+ if( content != null ) {
+ next_action = read_action
+ new StompFrame(ascii(action), headers.toList, content)
+ } else {
+ null
+ }
}
- }
- def read_content(contentLength:Int):Buffer = {
- val read_limit = read_bytebuffer.position
- if( (read_limit-read_offset) < contentLength+1 ) {
- read_pos = read_limit;
- null
- } else {
- if( read_data(read_offset+contentLength)!= 0 ) {
- throw new IOException("Exected null termintor after "+contentLength+" content bytes");
+ def read_content(buffer:ByteBuffer, contentLength:Int):Buffer = {
+ val read_limit = buffer.position
+ if( (read_limit-start) < contentLength+1 ) {
+ end = read_limit;
+ null
+ } else {
+ if( buffer.array()(start+contentLength)!= 0 ) {
+ throw new IOException("Exected null termintor after "+contentLength+" content bytes");
+ }
+ var rc = new Buffer(buffer.array, start, contentLength)
+ end = start+contentLength+1;
+ start = end;
+ rc;
}
- var rc = new Buffer(read_data, read_offset, contentLength)
- read_pos = read_offset+contentLength+1;
- read_offset = read_pos;
- rc;
- }
- }
+ }
- def read_to_null():Buffer = {
- val read_limit = read_bytebuffer.position
- while( read_pos < read_limit ) {
- if( read_data(read_pos) ==0) {
- var rc = new Buffer(read_data, read_offset, read_pos-read_offset)
- read_pos += 1;
- read_offset = read_pos;
- return rc;
+ def read_to_null(buffer:ByteBuffer):Buffer = {
+ val read_limit = buffer.position
+ while( end < read_limit ) {
+ if( buffer.array()(end) ==0) {
+ var rc = new Buffer(buffer.array, start, end-start)
+ end += 1;
+ start = end;
+ return rc;
+ }
+ end += 1;
}
- read_pos += 1;
- }
- return null;
- }
+ return null;
+ }
- def read_text_body(action:Buffer, headers:HeaderMap):FrameReader = ()=> {
- val content:Buffer=read_to_null
- if( content != null ) {
- unmarshall = read_action
- new StompFrame(ascii(action), headers, content)
- } else {
- null
+ def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> {
+ val content:Buffer=read_to_null(buffer)
+ if( content != null ) {
+ next_action = read_action
+ new StompFrame(ascii(action), headers.toList, content)
+ } else {
+ null
+ }
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:42:44 2010
@@ -30,6 +30,13 @@ import Stomp._
import _root_.org.apache.activemq.apollo.stomp.StompFrame
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+object StompBrokerPerfTest {
+ def main(args:Array[String]) = {
+ val test = new StompBrokerPerfTest();
+ test.setUp
+ test.benchmark_1_1_1
+ }
+}
class StompBrokerPerfTest extends BaseBrokerPerfTest {
override def createProducer() = new StompRemoteProducer()
@@ -65,6 +72,8 @@ class StompRemoteConsumer extends Remote
case StompFrame(Responses.CONNECTED, headers, _) =>
case StompFrame(Responses.MESSAGE, headers, content) =>
messageReceived();
+ case StompFrame(Responses.ERROR, headers, content) =>
+ onFailure(new Exception("Server reported an error: " + frame.content));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));
}
@@ -93,20 +102,27 @@ class StompRemoteProducer extends Remote
val send_next:CompletionCallback = new CompletionCallback() {
def onCompletion() = {
rate.increment();
- if( !stopping ) {
+ val task = ^ {
+ if( !stopping ) {
- var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
- if (property != null) {
- headers ::= (ascii(property), ascii(property));
- }
+ var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+ headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+ if (property != null) {
+ headers ::= (ascii(property), ascii(property));
+ }
// var p = this.priority;
// if (priorityMod > 0) {
// p = if ((counter % priorityMod) == 0) { 0 } else { priority }
// }
- var content = ascii(createPayload());
- transport.oneway(StompFrame(Stomp.Commands.SEND, headers, content), send_next)
+ var content = ascii(createPayload());
+ transport.oneway(StompFrame(Stomp.Commands.SEND, headers, content), send_next)
+ }
+ }
+ if( thinkTime > 0 ) {
+ dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
+ } else {
+ dispatchQueue << task
}
}
def onFailure(error:Throwable) = {
@@ -128,6 +144,8 @@ class StompRemoteProducer extends Remote
var frame = command.asInstanceOf[StompFrame]
frame match {
case StompFrame(Responses.CONNECTED, headers, _) =>
+ case StompFrame(Responses.ERROR, headers, content) =>
+ onFailure(new Exception("Server reported an error: " + frame.content.utf8));
case _ =>
onFailure(new Exception("Unexpected stomp command: " + frame.action));
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:42:44 2010
@@ -26,6 +26,7 @@ import org.fusesource.hawtdispatch.Dispa
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.DispatchSource;
+import java.io.EOFException;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
@@ -44,6 +45,7 @@ import static org.apache.activemq.transp
*/
public class TcpTransport implements Transport {
private Map<String, Object> socketOptions;
+ private WireFormat.UnmarshalSession unmarshalSession;
enum SocketState {
CONNECTING,
@@ -77,6 +79,10 @@ public class TcpTransport implements Tra
ByteBuffer outbound_frame;
protected boolean useLocalHost = true;
+ int READ_BUFFFER_SIZE = 1024*32;
+ ByteBuffer readBuffer = ByteBuffer.allocate(1024*32);
+
+
static final class OneWay {
final Buffer buffer;
final CompletionCallback callback;
@@ -114,6 +120,8 @@ public class TcpTransport implements Tra
}
public void start() throws Exception {
+ assert Dispatch.getCurrentQueue() == dispatchQueue;
+
if (dispatchQueue == null) {
throw new IllegalArgumentException("dispatchQueue is not set");
}
@@ -124,6 +132,8 @@ public class TcpTransport implements Tra
throw new IllegalStateException("can only be started from the created stae");
}
transportState=RUNNING;
+
+ unmarshalSession = wireformat.createUnmarshalSession();
if( socketState == CONNECTING ) {
channel = SocketChannel.open();
@@ -183,7 +193,11 @@ public class TcpTransport implements Tra
readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
readSource.setEventHandler(new Runnable(){
public void run() {
- drainInbound();
+ try {
+ drainInbound();
+ } catch (IOException e) {
+ listener.onException(e);
+ }
}
});
@@ -207,6 +221,8 @@ public class TcpTransport implements Tra
public void stop() throws Exception {
+ assert Dispatch.getCurrentQueue() == dispatchQueue;
+
if( readSource!=null ) {
readSource.release();
readSource = null;
@@ -225,6 +241,7 @@ public class TcpTransport implements Tra
}
public void oneway(Object command, CompletionCallback callback) {
+ assert Dispatch.getCurrentQueue() == dispatchQueue;
try {
if( socketState != CONNECTED ) {
throw new IllegalStateException("Not connected.");
@@ -307,11 +324,56 @@ public class TcpTransport implements Tra
return outbound.isEmpty() && outbound_frame==null;
}
- private void drainInbound() {
- Object command = null;
- // the transport may be suspended after processing a command.
- while( !readSource.isSuspended() && (command=wireformat.unmarshal(channel))!=null ) {
+
+ private void drainInbound() throws IOException {
+ if( transportState==DISPOSED || readSource.isSuspended() ) {
+ return;
+ }
+ while( true ) {
+
+ // do we need to read in more data???
+ if (unmarshalSession.getEndPos() == readBuffer.position()) {
+
+ // do we need a new data buffer to read data into??
+ if (readBuffer.remaining() == 0) {
+
+ // double the capacity size if needed...
+ int new_capacity = unmarshalSession.getStartPos() != 0 ? READ_BUFFFER_SIZE : readBuffer.capacity() << 2;
+ byte[] new_buffer = new byte[new_capacity];
+
+ // If there was un-consummed data.. move it to the start of the new buffer.
+ int size = unmarshalSession.getEndPos() - unmarshalSession.getStartPos();
+ if (size > 0) {
+ System.arraycopy(readBuffer.array(), unmarshalSession.getStartPos(), new_buffer, 0, size);
+ }
+
+ readBuffer = ByteBuffer.wrap(new_buffer);
+ readBuffer.position(size);
+ unmarshalSession.setStartPos(0);
+ unmarshalSession.setEndPos(size);
+ }
+
+ // Try to fill the buffer with data from the socket..
+ int p = readBuffer.position();
+ int count = channel.read(readBuffer);
+ if (count == -1) {
+ throw new EOFException();
+ } else if (count == 0) {
+ return;
+ }
+ }
+
+ Object command=unmarshalSession.unmarshal(readBuffer);
+ if( command==null ) {
+ return;
+ }
+
listener.onCommand(command);
+
+ // the transport may be suspended after processing a command.
+ if( transportState==DISPOSED || readSource.isSuspended() ) {
+ return;
+ }
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul 7 03:42:44 2010
@@ -21,8 +21,6 @@ import org.apache.activemq.transport.Tra
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.ServiceSupport;
-import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
@@ -87,30 +85,6 @@ public class TcpTransportServer implemen
}
public void start() throws IOException {
- bind();
- acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue);
- acceptSource.setEventHandler(new Runnable() {
- public void run() {
- try {
- SocketChannel client = channel.accept();
- handleSocket(client);
- } catch (IOException e) {
- listener.onAcceptError(e);
- }
- }
- });
- acceptSource.setCancelHandler(new Runnable() {
- public void run() {
- try {
- channel.close();
- } catch (IOException e) {
- }
- }
- });
- acceptSource.resume();
- }
-
- public void bind() throws IOException {
URI bind = bindURI;
String host = bind.getHost();
@@ -122,6 +96,7 @@ public class TcpTransportServer implemen
InetAddress addr = InetAddress.getByName(host);
try {
channel = ServerSocketChannel.open();
+ channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(addr, bind.getPort()), backlog);
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
@@ -139,6 +114,27 @@ public class TcpTransportServer implemen
throw IOExceptionSupport.create(e2);
}
}
+
+ acceptSource = Dispatch.createSource(channel, SelectionKey.OP_ACCEPT, dispatchQueue);
+ acceptSource.setEventHandler(new Runnable() {
+ public void run() {
+ try {
+ SocketChannel client = channel.accept();
+ handleSocket(client);
+ } catch (IOException e) {
+ listener.onAcceptError(e);
+ }
+ }
+ });
+ acceptSource.setCancelHandler(new Runnable() {
+ public void run() {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ }
+ }
+ });
+ acceptSource.resume();
}
private URI connectURI(String hostname) throws URISyntaxException {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jul 7 03:42:44 2010
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;
@@ -100,8 +101,50 @@ public class MultiWireFormatFactory impl
return rc;
}
- public Object unmarshal(ReadableByteChannel channel) {
- throw new UnsupportedOperationException();
+ public UnmarshalSession createUnmarshalSession() {
+ return new UnmarshalSession() {
+ int start=0;
+ int end=0;
+ UnmarshalSession session;
+
+ public int getStartPos() {
+ return start;
+ }
+
+ public void setStartPos(int pos) {
+ start=pos;
+ }
+
+ public int getEndPos() {
+ return end;
+ }
+
+ public void setEndPos(int pos) {
+ end = pos;
+ }
+
+ public Object unmarshal(ByteBuffer buffer) throws IOException {
+ if( session!=null ) {
+ return session.unmarshal(buffer);
+ }
+
+ Buffer b = new Buffer(buffer.array(), start, buffer.position());
+ for (WireFormatFactory wff : wireFormatFactories) {
+ if (wff.matchesWireformatHeader( b )) {
+ wireFormat = wff.createWireFormat();
+ session = wireFormat.createUnmarshalSession();
+ session.setStartPos(start);
+ session.setEndPos(end);
+ return wireFormat;
+ }
+ }
+
+ if( end >= maxHeaderLength ) {
+ throw new IOException("Could not discriminate the protocol.");
+ }
+ return null;
+ }
+ };
}
public void marshal(Object command, DataOutput out) throws IOException {
@@ -109,11 +152,11 @@ public class MultiWireFormatFactory impl
}
public Buffer marshal(Object command) throws IOException {
- throw new UnsupportedOperationException();
+ return wireFormat.marshal(command);
}
public Object unmarshal(Buffer packet) throws IOException {
- throw new UnsupportedOperationException();
+ return wireFormat.marshal(packet);
}
public ArrayList<WireFormatFactory> getWireFormatFactories() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Wed Jul 7 03:42:44 2010
@@ -72,6 +72,10 @@ public class ObjectStreamWireFormat impl
}
}
+ public UnmarshalSession createUnmarshalSession() {
+ throw new UnsupportedOperationException();
+ }
+
public Object unmarshal(ReadableByteChannel channel) {
throw new UnsupportedOperationException();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Wed Jul 7 03:42:44 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.wireformat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
import java.util.Map;
@@ -49,13 +50,27 @@ public interface WireFormat {
* Stream based marshaling
*/
void marshal(Object command, DataOutput out) throws IOException;
-
+
/**
- * Packet based un-marshaling
+ * Stream based un-marshaling
*/
Object unmarshal(DataInput in) throws IOException;
- Object unmarshal(ReadableByteChannel channel);
+ /**
+ * For a unmarshal session is used for non-blocking
+ * unmarshalling.
+ */
+ interface UnmarshalSession {
+ int getStartPos();
+ void setStartPos(int pos);
+
+ int getEndPos();
+ void setEndPos(int pos);
+
+ Object unmarshal(ByteBuffer buffer) throws IOException;
+ }
+
+ UnmarshalSession createUnmarshalSession();
/**
* @return The name of the wireformat
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/test/java/org/apache/activemq/wireformat/mock/MockWireFormatFactory.java Wed Jul 7 03:42:44 2010
@@ -49,6 +49,10 @@ public class MockWireFormatFactory imple
throw new UnsupportedOperationException();
}
+ public UnmarshalSession createUnmarshalSession() {
+ throw new UnsupportedOperationException();
+ }
+
public Object unmarshal(ReadableByteChannel channel) {
throw new UnsupportedOperationException();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java?rev=961071&r1=961070&r2=961071&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java Wed Jul 7 03:42:44 2010
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.util.buffer;
+import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -333,7 +334,35 @@ public class Buffer implements Comparabl
}
return length - oLength;
}
-
+
+ /**
+ * same as out.write(data, offset, length);
+ */
+ public void writeTo(DataOutput out) throws IOException {
+ out.write(data, offset, length);
+ }
+
+ /**
+ * same as out.write(data, offset, length);
+ */
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(data, offset, length);
+ }
+
+ /**
+ * same as in.readFully(data, offset, length);
+ */
+ public void readFrom(DataInput in) throws IOException {
+ in.readFully(data, offset, length);
+ }
+
+ /**
+ * same as in.read(data, offset, length);
+ */
+ public int readFrom(InputStream in) throws IOException {
+ return in.read(data, offset, length);
+ }
+
///////////////////////////////////////////////////////////////////
// Statics
///////////////////////////////////////////////////////////////////