You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:47:37 UTC
svn commit: r961086 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/
activemq-broker/src/test/scala/org/apache/active...
Author: chirino
Date: Wed Jul 7 03:47:36 2010
New Revision: 961086
URL: http://svn.apache.org/viewvc?rev=961086&view=rev
Log:
mostly work related to lifecycle/cleanup
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala?rev=961086&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BaseService.scala Wed Jul 7 03:47:36 2010
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import org.apache.activemq.Service
+import org.fusesource.hawtdispatch.DispatchQueue
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+/**
+ * <p>
+ * The BaseService provides helpers for dealing async service state.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait BaseService extends Service {
+
+ sealed class State {
+ override def toString = getClass.getSimpleName
+ }
+
+ trait CallbackSupport {
+ var callbacks:List[Runnable] = Nil
+ def << (r:Runnable) = if(r!=null) { callbacks ::= r }
+ def done = callbacks.foreach(_.run)
+ }
+
+ case class CREATED extends State
+ case class STARTING extends State with CallbackSupport
+ case class STARTED extends State
+ case class STOPPING extends State with CallbackSupport
+ case class STOPPED extends State
+
+ val dispatchQueue:DispatchQueue
+
+ final def start() = start(null)
+ final def stop() = stop(null)
+
+ protected var _serviceState:State = CREATED()
+ protected def serviceState = _serviceState
+
+ private def error(msg:String) {
+ try {
+ throw new AssertionError(msg)
+ } catch {
+ case e:Exception =>
+ e.printStackTrace
+ }
+ }
+
+ final def start(onCompleted:Runnable) = ^{
+ def do_start = {
+ val state = STARTING()
+ state << onCompleted
+ _serviceState = state
+ _start(^{
+ _serviceState = STARTED()
+ state.done
+ })
+ }
+ def done = {
+ if( onCompleted!=null ) {
+ onCompleted.run
+ }
+ }
+ _serviceState match {
+ case x:CREATED =>
+ do_start
+ case x:STOPPED =>
+ do_start
+ case state:STARTING =>
+ state << onCompleted
+ case state:STARTED =>
+ done
+ case state =>
+ done
+ error("start should not be called from state: "+state);
+ }
+ } ->: dispatchQueue
+
+ final def stop(onCompleted:Runnable) = ^{
+ def done = {
+ if( onCompleted!=null ) {
+ onCompleted.run
+ }
+ }
+ _serviceState match {
+ case x:STARTED =>
+ val state = STOPPING()
+ state << onCompleted
+ _serviceState = state
+ _stop(^{
+ _serviceState = STOPPED()
+ state.done
+ })
+ case state:STOPPING =>
+ state << onCompleted
+ case state:STOPPED =>
+ done
+ case state =>
+ done
+ error("stop should not be called from state: "+state);
+ }
+ } ->: dispatchQueue
+
+ protected def _start(onCompleted:Runnable)
+ protected def _stop(onCompleted:Runnable)
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:47:36 2010
@@ -29,6 +29,13 @@ import org.fusesource.hawtdispatch.{Disp
import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
import java.util.concurrent.{TimeUnit, CountDownLatch}
+/**
+ * <p>
+ * The BrokerFactory creates Broker objects from a URI.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object BrokerFactory {
val BROKER_FACTORY_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/");
@@ -66,62 +73,6 @@ object BrokerFactory {
}
-class CompletionTracker(val queue:DispatchQueue=getGlobalQueue) {
- private[this] val tasks = new HashSet[Runnable]()
- private[this] var _callback:Runnable = null
- queue.retain
-
- def task(name:String="unknown"):Runnable = {
- val rc = new Runnable() {
- def run():Unit = {
- tasks.synchronized {
- if( tasks.remove(this) ) {
- if( tasks.isEmpty && _callback!=null ) {
- _callback ->: queue
- queue.release
- }
- }
- }
- }
- override def toString = name
- }
- tasks.synchronized {
- if( _callback!=null ) {
- throw new IllegalStateException("all tasks should be created before setting the callback");
- }
- tasks.add(rc)
- }
- return rc
- }
-
- def callback(handler: =>Unit ) {
- tasks.synchronized {
- _callback = handler _
- if( tasks.isEmpty ) {
- _callback ->: queue
- queue.release
- }
- }
- }
-
- def await() = {
- val latch =new CountDownLatch(1)
- callback {
- latch.countDown
- }
- latch.await
- }
-
- def await(timeout:Long, unit:TimeUnit) = {
- val latch = new CountDownLatch(1)
- callback {
- latch.countDown
- }
- latch.await(timeout, unit)
- }
-
- override def toString = tasks.synchronized { "waiting on: "+tasks }
-}
object BufferConversions {
@@ -145,8 +96,19 @@ object BrokerConstants extends Log {
val UNKNOWN = "UNKNOWN"
val DEFAULT_VIRTUAL_HOST_NAME = new AsciiBuffer("default")
+
+ val STICK_ON_THREAD_QUEUES = true
}
+/**
+ * <p>
+ * A Broker is parent object of all services assoicated with the serverside of
+ * a message passing system. It keeps track of all running connections,
+ * virtual hosts and assoicated messaging destintations.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class Broker() extends Service with DispatchLogging {
import BrokerConstants._
@@ -174,7 +136,10 @@ class Broker() extends Service with Disp
def stop(onComplete:Runnable) = runtime.stop(onComplete)
val dispatchQueue = createQueue("broker");
- dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+
+ if( STICK_ON_THREAD_QUEUES ) {
+ dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+ }
def addVirtualHost(host: VirtualHost) = {
if (host.names.isEmpty) {
@@ -210,6 +175,10 @@ class Broker() extends Service with Disp
var connection = new BrokerConnection(Broker.this)
connection.transport = transport
connection.dispatchQueue.retain
+ if( STICK_ON_THREAD_QUEUES ) {
+ connection.dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+ }
+
clientConnections.add(connection)
try {
connection.start()
@@ -290,7 +259,7 @@ class Broker() extends Service with Disp
state = STARTING
- val tracker = new CompletionTracker(dispatchQueue)
+ val tracker = new CompletionTracker("broker startup", dispatchQueue)
for (virtualHost <- virtualHosts.values) {
virtualHost.start(tracker.task("virtual host: "+virtualHost))
}
@@ -315,45 +284,61 @@ class Broker() extends Service with Disp
def stop(onCompleted:Runnable): Unit = ^ {
if (state == RUNNING) {
state = STOPPING
- dispatchQueue.setDisposer(^{
- if( onCompleted!=null ) {
- state = STOPPED;
- onCompleted.run
- }
- })
+ val tracker = new CompletionTracker("broker shutdown", dispatchQueue)
+ // Stop accepting connections..
for (server <- transportServers) {
- stopService(server)
+ stopService(server,tracker)
}
+
+ // Kill client connections..
for (connection <- clientConnections) {
- stopService(connection)
+ stopService(connection, tracker)
}
+
+ // Shutdown the virtual host services
for (virtualHost <- virtualHosts.values) {
- stopService(virtualHost)
+ stopService(virtualHost, tracker)
}
- dispatchQueue.release
+
+ def stopped = {
+ state = STOPPED;
+
+ }
+
+ tracker.callback {
+ stopped
+ if( onCompleted!=null ) {
+ onCompleted.run
+ }
+ }
+
}
} ->: dispatchQueue
+
}
+
/**
* Helper method to help stop broker services and log error if they fail to start.
* @param server
*/
- private def stopService(server: Service): Unit = {
+ private def stopService(service: Service, tracker:CompletionTracker): Unit = {
try {
- server.stop
+ service.stop(tracker.task(service.toString))
} catch {
case e: Exception => {
- warn("Could not stop " + server + ": " + e)
- debug("Could not stop " + server + " due to: ", e)
+ warn(e, "Could not stop " + service + ": " + e)
}
}
}
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait QueueLifecyleListener {
/**
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala?rev=961086&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/CompletionTracker.scala Wed Jul 7 03:47:36 2010
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.HashSet
+import org.fusesource.hawtdispatch.DispatchQueue
+import org.fusesource.hawtdispatch.ScalaDispatch._
+
+object CompletionTracker extends Log
+
+/**
+ * <p>
+ * A CompletionTracker is used to track multiple async processing tasks and
+ * call a callback once they all complete.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class CompletionTracker(val name:String, val parent:DispatchQueue=getGlobalQueue) extends Logging {
+
+ override protected def log = CompletionTracker
+
+ private[this] val tasks = new HashSet[Runnable]()
+ private[this] var _callback:Runnable = null
+ val queue = parent.createSerialQueue("tracker: "+name);
+
+ def task(name:String="unknown"):Runnable = {
+ val rc = new Runnable() {
+ def run = {
+ trace("completed task: %s", name)
+ remove(this)
+ }
+ override def toString = name
+ }
+ ^ {
+ assert(_callback==null)
+ tasks.add(rc)
+ } ->: queue
+ return rc
+ }
+
+ def callback(handler: =>Unit ) {
+ var start = System.currentTimeMillis
+ ^ {
+ _callback = handler _
+ checkDone()
+ } ->: queue
+
+ def displayStatus = {
+ if( _callback!=null ) {
+ val duration = (System.currentTimeMillis-start)/1000
+ info("%s is taking a long time (%d seconds). Waiting on %s", name, duration, tasks)
+ schedualCheck
+ }
+ }
+ def schedualCheck:Unit = queue.dispatchAfter(1, TimeUnit.SECONDS, ^{displayStatus})
+ schedualCheck
+ }
+
+ private def remove(r:Runnable) = ^{
+ if( tasks.remove(r) ) {
+ checkDone()
+ }
+ } ->: queue
+
+ private def checkDone() = {
+ if( tasks.isEmpty && _callback!=null ) {
+ trace("executing callback for %s", name)
+ _callback ->: queue
+ _callback = null
+ queue.release
+ } else {
+ if( _callback==null ) {
+ trace("still for callback to bet set")
+ }
+ if( _callback==null ) {
+ trace("still waiting for tasks %s", tasks)
+ }
+ }
+ }
+
+ def await() = {
+ val latch =new CountDownLatch(1)
+ callback {
+ latch.countDown
+ }
+ latch.await
+ }
+
+ def await(timeout:Long, unit:TimeUnit) = {
+ val latch = new CountDownLatch(1)
+ callback {
+ latch.countDown
+ }
+ latch.await(timeout, unit)
+ }
+
+ override def toString = tasks.synchronized { name+" waiting on: "+tasks }
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:47:36 2010
@@ -28,43 +28,45 @@ import _root_.org.fusesource.hawtdispatc
import java.util.concurrent.atomic.AtomicLong
import org.fusesource.hawtdispatch.Dispatch
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object Connection extends Log {
val id_generator = new AtomicLong()
def next_id = "connection:"+id_generator.incrementAndGet
}
-abstract class Connection() extends TransportListener with Service with DispatchLogging {
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class Connection() extends TransportListener with BaseService with DispatchLogging {
override protected def log = Connection
import Connection._
val id = next_id
val dispatchQueue = createQueue(id)
- dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
- var name = "connection"
- var stopped = false;
+ def stopped = serviceState match {
+ case STOPPED() | STOPPING() => true
+ case _ => false
+ }
+
var transport:Transport = null
- def start() = start(null)
+ override def toString = id
- def start(onCompleted:Runnable) = {
+ override protected def _start(onCompleted:Runnable) = {
transport.setDispatchQueue(dispatchQueue);
transport.setTransportListener(Connection.this);
transport.start(onCompleted)
}
- def stop() = stop(null)
-
- def stop(onCompleted:Runnable) = {
- if( !stopped ) {
- stopped=true
- transport.stop()
- dispatchQueue.setDisposer(onCompleted)
- dispatchQueue.release
- }
+ override protected def _stop(onCompleted:Runnable) = {
+ transport.stop(onCompleted)
}
+
def onTransportFailure(error:IOException) = {
if (!stopped) {
onFailure(error);
@@ -84,23 +86,26 @@ abstract class Connection() extends Tran
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class BrokerConnection(val broker: Broker) extends Connection {
var protocol = "stomp"
var protocolHandler: ProtocolHandler = null;
- override def start(onCompleted:Runnable) = {
+ override protected def _start(onCompleted:Runnable) = {
broker.dispatchQueue.retain
protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
protocolHandler.setConnection(this);
- super.start(onCompleted)
+ super._start(onCompleted)
}
- override def stop(onCompleted:Runnable) = {
+ override protected def _stop(onCompleted:Runnable) = {
if( !stopped ) {
broker.runtime.stopped(this)
broker.dispatchQueue.release
- super.stop(onCompleted)
+ super._stop(onCompleted)
}
}
@@ -120,8 +125,14 @@ class BrokerConnection(val broker: Broke
override def onTransportFailure(error: IOException) = protocolHandler.onTransportFailure(error)
}
-class ProtocolException(message:String, e:Throwable=null) extends Exception(message, e)
-
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ProtocolException(message:String, e:Throwable=null) extends Exception(message, e)
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class MultiProtocolHandler extends ProtocolHandler {
var connected = false
@@ -156,6 +167,9 @@ class MultiProtocolHandler extends Proto
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object ProtocolHandlerFactory {
val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/protocol/");
@@ -164,6 +178,9 @@ object ProtocolHandlerFactory {
}
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait ProtocolHandler extends TransportListener {
var connection:BrokerConnection = null;
@@ -186,6 +203,9 @@ trait ProtocolHandler extends TransportL
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait ConsumerContext { // extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
def getConsumerId() : String
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:47:36 2010
@@ -24,16 +24,25 @@ import _root_.org.fusesource.hawtdispatc
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import org.apache.activemq.transport.Transport
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait DeliveryProducer {
def collocate(queue:DispatchQueue):Unit
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait DeliverySession {
val consumer:DeliveryConsumer
def deliver(delivery:Delivery)
def close:Unit
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait DeliveryConsumer extends Retained {
def matches(message:Delivery)
val queue:DispatchQueue;
@@ -43,6 +52,8 @@ trait DeliveryConsumer extends Retained
/**
* Abstracts wire protocol message implementations. Each wire protocol
* will provide it's own type of Message.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait Message {
@@ -84,6 +95,9 @@ trait Message {
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object Delivery {
def apply(o:Delivery) = new Delivery(o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
}
@@ -385,11 +399,17 @@ case class Delivery (
// }
//}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait DeliverySink {
def full:Boolean
def send(delivery:Delivery):Unit
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class TransportDeliverySink(var transport:Transport) extends DeliverySink {
def full:Boolean = transport.isFull
def send(delivery:Delivery) = transport.oneway(delivery.message, delivery)
@@ -436,6 +456,9 @@ class DeliveryBuffer(var maxSize:Int=102
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class DeliveryOverflowBuffer(val delivery_buffer:DeliverySink) extends DeliverySink {
private var overflow = new LinkedList[Delivery]()
@@ -472,6 +495,9 @@ class DeliveryOverflowBuffer(val deliver
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class DeliverySessionManager(val sink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
var sessions = List[SessionServer]()
@@ -516,12 +542,12 @@ class DeliverySessionManager(val sink:De
class SessionClient() extends DeliveryOverflowBuffer(sink) {
- producer_queue.retain
val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
credit_adder.setEventHandler(^{
internal_credit(credit_adder.getData.intValue)
});
credit_adder.resume
+ source.retain
private var credits = 0;
@@ -530,16 +556,18 @@ class DeliverySessionManager(val sink:De
///////////////////////////////////////////////////
def close = {
credit_adder.release
- producer_queue.release
+ source.release
}
override def full = credits <= 0
override protected def send_to_delivery_buffer(value:Delivery) = {
var delivery = Delivery(value)
+ credit_adder.retain
delivery.setDisposer(^{
// This is called from the server/consumer thread
credit_adder.merge(delivery.size);
+ credit_adder.release
})
internal_credit(-delivery.size)
source.merge(delivery)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Wed Jul 7 03:47:36 2010
@@ -19,6 +19,9 @@ package org.apache.activemq.apollo.broke
import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
import BufferConversions._
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class ParserOptions {
var defaultDomain:AsciiBuffer = null
var queuePrefix:AsciiBuffer = null
@@ -27,6 +30,9 @@ class ParserOptions {
var tempTopicPrefix:AsciiBuffer = null
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object DestinationParser {
/**
@@ -86,6 +92,9 @@ object DestinationParser {
}
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
case class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
def getDestinations():Array[Destination] = null;
@@ -94,6 +103,10 @@ case class SingleDestination(var domain:
override def toString() = ""+domain+":"+name
}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
case class MultiDestination(var destinations:Array[Destination]) extends Destination {
def getDestinations():Array[Destination] = destinations;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul 7 03:47:36 2010
@@ -22,15 +22,25 @@ import _root_.org.apache.commons.logging
import _root_.org.apache.commons.logging.{Log => Logger}
import java.util.concurrent.atomic.AtomicLong
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait Log {
- val log = LogFactory.getLog(getClass.getName)
+ val log = LogFactory.getLog(getClass.getName.stripSuffix("$"))
+
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class NamedLog(name:String) extends Log {
- def this(clazz:Class[_]) = this(clazz.getName)
+ def this(clazz:Class[_]) = this(clazz.getName.stripSuffix("$"))
override val log = LogFactory.getLog(name)
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object Logging {
val exception_id_generator = new AtomicLong(System.currentTimeMillis)
def next_exception_id = exception_id_generator.incrementAndGet.toHexString
@@ -215,6 +225,9 @@ trait Logging {
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait DispatchLogging extends Logging {
import org.fusesource.hawtdispatch.ScalaDispatch._
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:47:36 2010
@@ -25,6 +25,9 @@ import java.util.HashMap
import collection.JavaConversions
import path.PathMap
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object Domain {
val TOPIC_DOMAIN = new AsciiBuffer("topic");
val QUEUE_DOMAIN = new AsciiBuffer("queue");
@@ -33,6 +36,9 @@ object Domain {
}
import Domain._
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class Domain {
val targets = new PathMap[DeliveryConsumer]();
@@ -52,6 +58,9 @@ class Domain {
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object Router extends Log {
}
@@ -67,6 +76,7 @@ object Router extends Log {
* get the current set of consumers that are bound
* to the destination.
*
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class Router(var queue:DispatchQueue) extends DispatchLogging {
@@ -182,6 +192,9 @@ class Router(var queue:DispatchQueue) ex
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait Route extends Retained {
val destination:Destination
@@ -195,6 +208,9 @@ trait Route extends Retained {
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class DeliveryProducerRoute(val destination:Destination, val queue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
override protected def log = Router
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Wed Jul 7 03:47:36 2010
@@ -21,6 +21,9 @@ import _root_.org.apache.activemq.filter
import _root_.scala.collection.JavaConversions._
import path.PathFilter
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
trait BrokerSubscription {
def connect(consumer:ConsumerContext)
@@ -32,6 +35,9 @@ trait BrokerSubscription {
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class CompositeSubscription(val destination:Destination, val subscriptions:List[BrokerSubscription] ) extends BrokerSubscription {
@@ -51,12 +57,18 @@ class CompositeSubscription(val destinat
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object WildcardQueueSubscription extends Log {
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class WildcardQueueSubscription(val host:VirtualHost, val destination:Destination, val consumer:ConsumerContext) extends BrokerSubscription with QueueLifecyleListener with Logging {
- protected def log = WildcardQueueSubscription
+ override protected def log = WildcardQueueSubscription
var filter = PathFilter.parseFilter(destination.getName());
val childSubs = new ArrayList[BrokerSubscription]();
@@ -112,6 +124,9 @@ class WildcardQueueSubscription(val host
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class TopicSubscription { // extends BrokerSubscription with DeliveryTarget {
def matches(message:Delivery) = false
def deliver(message:Delivery) = {}
@@ -246,6 +261,9 @@ class TopicSubscription { // extends Bro
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class DurableSubscription(val host:VirtualHost, val destination:Destination, val selector:BooleanExpression) { // extends BrokerSubscription with DeliveryTarget {
// private final IQueue<Long, MessageDelivery> queue;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 03:47:36 2010
@@ -27,8 +27,14 @@ import _root_.scala.collection.JavaConve
import _root_.scala.reflect.BeanProperty
import path.PathFilter
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object VirtualHost extends Log
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class VirtualHost() extends Service with Logging {
override protected def log = VirtualHost
@@ -121,6 +127,10 @@ class VirtualHost() extends Service with
database.stop();
started = false;
+ if( onCompleted!=null ) {
+ onCompleted.run
+ }
+
}
def createQueue(dest:Destination) :Queue = {
@@ -218,6 +228,9 @@ class VirtualHost() extends Service with
}
}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class BrokerDatabase() {
@BeanProperty
@@ -1558,9 +1571,15 @@ class BrokerDatabase() {
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class UserAlreadyConnectedException extends Exception
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
class BrokerQueueStore { // implements QueueStore<Long, MessageDelivery> {
// TODO:
// private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul 7 03:47:36 2010
@@ -35,6 +35,9 @@ import _root_.org.apache.activemq.transp
import _root_.scala.collection.JavaConversions._
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
object VMTransportFactory extends Log {
val DEFAULT_PIPE_NAME = BrokerConstants.DEFAULT_VIRTUAL_HOST_NAME.toString();
}
@@ -43,8 +46,7 @@ object VMTransportFactory extends Log {
* Implements the vm transport which behaves like the pipe transport except that
* it can start embedded brokers up on demand.
*
- * @author chirino
- *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class VMTransportFactory extends PipeTransportFactory with Logging {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul 7 03:47:36 2010
@@ -29,14 +29,14 @@ import org.apache.activemq.util.buffer.A
import org.apache.activemq.broker.store.{Store, StoreFactory}
import java.io.{File, IOException}
import java.util.ArrayList
-import org.scalatest.Informer
import org.fusesource.hawtdispatch.BaseRetained
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.activemq.apollo.broker._
-
+import org.scalatest._
object BaseBrokerPerfSupport {
- var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+ var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "1"))
+ var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
var IO_WORK_AMOUNT = 0
var FANIN_COUNT = 10
var FANOUT_COUNT = 10
@@ -48,21 +48,22 @@ object BaseBrokerPerfSupport {
var PERSISTENT = false;
var DURABLE = false;
-}
-
-abstract class BaseBrokerPerfSupport extends FunSuiteSupport {
- import BaseBrokerPerfSupport._
+ // Set to test against ptp queues instead of topics:
+ var PTP = false;
// Set to put senders and consumers on separate brokers.
- protected var multibroker = false;
-
- // Set to mockup up ptp:
- protected var ptp = false;
+ var MULTI_BROKER = false;
// Set to use tcp IO
- protected var tcp = true;
+ protected var TCP = true;
+
// set to force marshalling even in the NON tcp case.
- protected var forceMarshalling = true;
+ protected var FORCE_MARSHALLING = true;
+}
+
+abstract class BaseBrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach {
+ import BaseBrokerPerfSupport._
+
protected var sendBrokerBindURI: String = null
protected var receiveBrokerBindURI: String = null
@@ -85,10 +86,26 @@ abstract class BaseBrokerPerfSupport ext
val producers = new ArrayList[RemoteProducer]()
val consumers = new ArrayList[RemoteConsumer]()
+ var spread_sheet_stats:List[(String, Any)] = Nil
+
+
+ override protected def beforeEach() = {
+ totalProducerRate.removeAllMetrics
+ totalConsumerRate.removeAllMetrics
+ brokers.clear
+ producers.clear
+ consumers.clear
+ stopping.set(false)
+ rcvBroker=null
+ sendBroker=null
+ producerCount = 0
+ consumerCount = 0
+ destCount =0
+ }
override protected def beforeAll(configMap: Map[String, Any]) = {
super.beforeAll(configMap)
- if (tcp) {
+ if (TCP) {
sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
@@ -97,7 +114,7 @@ abstract class BaseBrokerPerfSupport ext
} else {
sendBrokerConnectURI = "pipe://SendBroker";
receiveBrokerConnectURI = "pipe://ReceiveBroker";
- if (forceMarshalling) {
+ if (FORCE_MARSHALLING) {
sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
} else {
@@ -107,12 +124,30 @@ abstract class BaseBrokerPerfSupport ext
}
}
+
+ override protected def afterEach() = {
+ println("Spread sheet stats:")
+ println(spread_sheet_stats.map(_._1).mkString(","))
+ println(spread_sheet_stats.map(_._2).mkString(","))
+ }
+
+ override protected def afterAll() = {
+ println("Spread sheet stats:")
+ println(spread_sheet_stats.map(_._1).mkString(","))
+ println(spread_sheet_stats.map(_._2).mkString(","))
+ }
+
def getBrokerWireFormat() = "multi"
def getRemoteWireFormat(): String
- if (!ptp) {
- test("1 producer -> 1 destination -> 0 consumers") {
+ /**
+ * Used to benchmark what is the raw speed of sending messages one way.
+ * Divide by 2 and compare against 1-1-1 to figure out what the broker dispatching
+ * overhead is.
+ */
+ if (!PTP) {
+ test("1->1->0") {
producerCount = 1;
destCount = 1;
@@ -128,14 +163,16 @@ abstract class BaseBrokerPerfSupport ext
}
}
- test("1 producer -> 1 destination -> 1 consumers") {
+ /**
+ * The baseline of the performance of going from 1 producer to 1 consumer.
+ */
+ test("1->1->1") {
+ println(testName)
producerCount = 1;
destCount = 1;
consumerCount = 1;
createConnections();
-// producers.get(0).thinkTime = 500000*1000;
-
// Start 'em up.
startClients();
try {
@@ -145,10 +182,14 @@ abstract class BaseBrokerPerfSupport ext
}
}
- test(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT)) {
- producerCount = FANIN_COUNT;
- consumerCount = FANOUT_COUNT;
- destCount = 1;
+ /**
+ * To compare against the performance of the 1-1-1 case... If you have
+ * linear scalability then, this should be twice as fast.
+ */
+ test("2->2->2") {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
createConnections();
@@ -161,10 +202,13 @@ abstract class BaseBrokerPerfSupport ext
}
}
- test(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT)) {
+ /**
+ * To see how high producer and consumer contention on a destination performs.
+ */
+ test(format("%d->1->%d", FANIN_COUNT, FANOUT_COUNT)) {
producerCount = FANIN_COUNT;
+ consumerCount = FANOUT_COUNT;
destCount = 1;
- consumerCount = 1;
createConnections();
@@ -177,10 +221,13 @@ abstract class BaseBrokerPerfSupport ext
}
}
- test(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT)) {
- producerCount = 1;
+ /**
+ * To see how high producer contention on a destination performs.
+ */
+ test(format("%d->1->1", FANIN_COUNT)) {
+ producerCount = FANIN_COUNT;
destCount = 1;
- consumerCount = FANOUT_COUNT;
+ consumerCount = 1;
createConnections();
@@ -193,10 +240,13 @@ abstract class BaseBrokerPerfSupport ext
}
}
- test("2 producer -> 2 destination -> 2 consumers") {
- producerCount = 2;
- destCount = 2;
- consumerCount = 2;
+ /**
+ * To see how high consumer contention on a destination performs.
+ */
+ test(format("1->1->%d", FANOUT_COUNT)) {
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = FANOUT_COUNT;
createConnections();
@@ -209,7 +259,11 @@ abstract class BaseBrokerPerfSupport ext
}
}
- test("10 producers -> 10 destinations -> 10 consumers") {
+ /**
+ * To test how an overload situation affects scalability. Compare to the
+ * scalability trend of 1-1-1 to 2-2-2
+ */
+ test("10->10->10") {
producerCount = 10;
destCount = 10;
consumerCount = 10;
@@ -226,15 +280,18 @@ abstract class BaseBrokerPerfSupport ext
}
/**
- * Tests 2 producers sending to 1 destination with 2 consumres, but with
- * consumers set to select only messages from each producer. 1 consumers is
- * set to slow, the other producer should be able to send quickly.
+ * Tests 1 producers sending to 1 destination with 1 slow and 1 fast consumer.
+ *
+ * queue case: the producer should not slow down since it can dispatch to the
+ * fast consumer
+ *
+ * topic case: the producer should slow down since it HAS to dispatch to the
+ * slow consumer.
*
- * @throws Exception
*/
- test("2 producer -> 2 destination -> 2 slow consumers") {
+ test("1->1->[1 slow,1 fast]") {
producerCount = 2;
- destCount = 2;
+ destCount = 1;
consumerCount = 2;
createConnections();
@@ -249,7 +306,7 @@ abstract class BaseBrokerPerfSupport ext
}
}
- test("2 producer -> 2 destination -> 2 selector consumers") {
+ test("2->2->[1,1 selecting]") {
producerCount = 2;
destCount = 2;
consumerCount = 2;
@@ -278,7 +335,7 @@ abstract class BaseBrokerPerfSupport ext
*
* @throws Exception
*/
- test("1 high and 1 normal priority producer -> 1 destination -> 1 consumer") {
+ test("[1 high, 1 normal]->1->1") {
producerCount = 2;
destCount = 1;
consumerCount = 1;
@@ -296,7 +353,7 @@ abstract class BaseBrokerPerfSupport ext
println("Checking rates...");
for (i <- 0 until PERFORMANCE_SAMPLES) {
var p = new Period();
- Thread.sleep(1000 * 5);
+ Thread.sleep(SAMPLE_PERIOD);
println(producer.rate.getRateSummary(p));
println(totalProducerRate.getRateSummary(p));
println(totalConsumerRate.getRateSummary(p));
@@ -315,7 +372,7 @@ abstract class BaseBrokerPerfSupport ext
*
* @throws Exception
*/
- test("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer") {
+ test("[1 high, 1 mixed, 1 normal]->1->1") {
producerCount = 2;
destCount = 1;
consumerCount = 1;
@@ -335,7 +392,7 @@ abstract class BaseBrokerPerfSupport ext
println("Checking rates...");
for (i <- 0 until PERFORMANCE_SAMPLES) {
var p = new Period();
- Thread.sleep(1000 * 5);
+ Thread.sleep(SAMPLE_PERIOD);
println(producer.rate.getRateSummary(p));
println(totalProducerRate.getRateSummary(p));
println(totalConsumerRate.getRateSummary(p));
@@ -349,20 +406,44 @@ abstract class BaseBrokerPerfSupport ext
}
def reportRates() = {
- println("Checking "+(if (ptp) "ptp" else "topic")+" rates...");
+ val best_sample = PERFORMANCE_SAMPLES/2
+
+ println("Checking "+(if (PTP) "ptp" else "topic")+" rates...");
for (i <- 0 until PERFORMANCE_SAMPLES) {
var p = new Period();
- Thread.sleep(1000 * 5);
- println(totalProducerRate.getRateSummary(p));
- println(totalConsumerRate.getRateSummary(p));
+ Thread.sleep(SAMPLE_PERIOD);
+ if( producerCount > 0 ) {
+ println(totalProducerRate.getRateSummary(p));
+ }
+ if( consumerCount > 0 ) {
+ println(totalConsumerRate.getRateSummary(p));
+ }
+
+ if( i == best_sample ) {
+ if( producerCount > 0 ) {
+ spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: producer rate", totalProducerRate.total(p) ) :: Nil
+ if( producerCount > 1 ) {
+ spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: producer deviation", totalProducerRate.deviation ) :: Nil
+ }
+ }
+ if( consumerCount > 0 ) {
+ spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: consumer rate", totalConsumerRate.total(p) ) :: Nil
+ if( consumerCount > 1 ) {
+ spread_sheet_stats = spread_sheet_stats ::: ( testName+" :: consumer deviation", totalConsumerRate.deviation ) :: Nil
+ }
+ }
+ }
+
totalProducerRate.reset();
totalConsumerRate.reset();
}
+
+
}
def createConnections() = {
- if (multibroker) {
+ if (MULTI_BROKER) {
sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
brokers.add(sendBroker);
@@ -378,13 +459,13 @@ abstract class BaseBrokerPerfSupport ext
var dests = new Array[Destination](destCount);
for (i <- 0 until destCount) {
- val domain = if (ptp) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+ val domain = if (PTP) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
val name = new AsciiBuffer("dest" + (i + 1))
var bean = new SingleDestination(domain, name)
dests(i) = bean;
- if (ptp) {
+ if (PTP) {
sendBroker.defaultVirtualHost.createQueue(dests(i));
- if (multibroker) {
+ if (MULTI_BROKER) {
rcvBroker.defaultVirtualHost.createQueue(dests(i));
}
}
@@ -460,42 +541,38 @@ abstract class BaseBrokerPerfSupport ext
private def stopServices() = {
stopping.set(true);
- val tracker = new CompletionTracker
+ val tracker = new CompletionTracker("test shutdown")
for (broker <- brokers) {
- broker.stop(tracker.task());
+ broker.stop(tracker.task("broker"));
}
- brokers.clear
for (connection <- producers) {
- connection.stop(tracker.task());
+ connection.stop(tracker.task(connection.toString));
}
- producers.clear
for (connection <- consumers) {
- connection.stop(tracker.task());
+ connection.stop(tracker.task(connection.toString));
}
- consumers.clear
println("waiting for services to stop");
tracker.await
- stopping.set(false)
}
private def startBrokers() = {
- val tracker = new CompletionTracker
+ val tracker = new CompletionTracker("test broker startup")
for (broker <- brokers) {
- broker.start(tracker.task());
+ broker.start(tracker.task("broker"));
}
tracker.await
}
private def startClients() = {
- var tracker = new CompletionTracker
+ var tracker = new CompletionTracker("test consumer startup")
for (connection <- consumers) {
- connection.start(tracker.task());
+ connection.start(tracker.task(connection.toString));
}
tracker.await
- tracker = new CompletionTracker
+ tracker = new CompletionTracker("test producer startup")
for (connection <- producers) {
- connection.start(tracker.task());
+ connection.start(tracker.task(connection.toString));
}
tracker.await
}
@@ -510,13 +587,16 @@ abstract class RemoteConsumer extends Co
var selector: String = null;
var durable = false;
var uri: String = null
+ var name:String = null
var brokerPerfTest:BaseBrokerPerfSupport = null
- override def start(onComplete:Runnable) = {
- consumerRate.name("Consumer " + name + " Rate");
+ override protected def _start(onComplete:Runnable) = {
+ if( consumerRate.getName == null ) {
+ consumerRate.name("Consumer " + name + " Rate");
+ }
totalConsumerRate.add(consumerRate);
transport = TransportFactory.connect(uri);
- super.start(onComplete);
+ super._start(onComplete);
}
@@ -540,6 +620,7 @@ abstract class RemoteConsumer extends Co
abstract class RemoteProducer extends Connection {
val rate = new MetricCounter();
+ var name:String = null
var messageIdGenerator: AtomicLong = null
var priority = 0
var persistentDelivery = false
@@ -564,7 +645,7 @@ abstract class RemoteProducer extends Co
}
}
- override def start(onComplete:Runnable) = {
+ override protected def _start(onComplete:Runnable) = {
if (payloadSize > 0) {
var sb = new StringBuilder(payloadSize);
@@ -574,11 +655,13 @@ abstract class RemoteProducer extends Co
filler = sb.toString();
}
- rate.name("Producer " + name + " Rate");
+ if( rate.getName == null ) {
+ rate.name("Producer " + name + " Rate");
+ }
totalProducerRate.add(rate);
transport = TransportFactory.connect(uri);
- super.start(onComplete);
+ super._start(onComplete);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala Wed Jul 7 03:47:36 2010
@@ -2,11 +2,11 @@ package org.apache.activemq.apollo.broke
import _root_.org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
import java.io.File
import java.lang.String
import collection.immutable.Map
import org.apache.activemq.apollo.broker.Logging
+import org.scalatest._
/**
* @version $Revision : 1.1 $
@@ -15,6 +15,7 @@ import org.apache.activemq.apollo.broker
abstract class FunSuiteSupport extends FunSuite with Logging with BeforeAndAfterAll {
protected var _basedir = "."
+
/**
* Returns the base directory of the current project
*/
@@ -30,4 +31,22 @@ abstract class FunSuiteSupport extends F
}
debug("using basedir: " + _basedir)
}
+
+ //
+ // Allows us to get the current test name.
+ //
+
+ val _testName = new ThreadLocal[String]();
+
+ def testName = _testName.get
+
+ protected override def runTest(testName: String, reporter: Reporter, stopper: Stopper, configMap: Map[String, Any], tracker: Tracker) = {
+ _testName.set(testName)
+ try {
+ super.runTest(testName, reporter, stopper, configMap, tracker)
+ } finally {
+ _testName.remove
+ }
+ }
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties Wed Jul 7 03:47:36 2010
@@ -19,13 +19,13 @@
# The logging properties used during tests..
#
log4j.rootLogger=WARN, console, file
-log4j.logger.org.apache.activemq=DEBUG
+log4j.logger.org.apache.activemq=TRACE
# Console will only display warnnings
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
-log4j.appender.console.threshold=DEBUG
+log4j.appender.console.threshold=TRACE
# File appender will contain all info messages
log4j.appender.file=org.apache.log4j.FileAppender
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:47:36 2010
@@ -99,7 +99,11 @@ class StompProtocolHandler extends Proto
override def onTransportConnected() = {
outboundChannel = new TransportDeliverySink(connection.transport) {
- override def send(delivery: Delivery) = transport.oneway(delivery.message.asInstanceOf[StompFrameMessage].frame, delivery)
+ override def send(delivery: Delivery) = {
+ if( !connection.stopped ) {
+ transport.oneway(delivery.message.asInstanceOf[StompFrameMessage].frame, delivery)
+ }
+ }
}
connection.broker.runtime.getDefaultVirtualHost(
queue.wrap { (host)=>
@@ -112,7 +116,6 @@ class StompProtocolHandler extends Proto
override def onTransportDisconnected() = {
if( !closed ) {
- info("cleaning up resources")
closed=true;
if( producerRoute!=null ) {
host.router.disconnect(producerRoute)
@@ -122,6 +125,7 @@ class StompProtocolHandler extends Proto
host.router.unbind(consumer.destination, consumer::Nil)
consumer=null
}
+ info("stomp protocol resources released")
}
}
@@ -202,9 +206,11 @@ class StompProtocolHandler extends Proto
connection.transport.suspendRead
host.router.connect(destiantion, queue, producer) {
(route) =>
- connection.transport.resumeRead
- producerRoute = route
- send_via_route(producerRoute, frame)
+ if( !connection.stopped ) {
+ connection.transport.resumeRead
+ producerRoute = route
+ send_via_route(producerRoute, frame)
+ }
}
} else {
// we can re-use the existing producer route
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 03:47:36 2010
@@ -67,7 +67,7 @@ object StompWireFormat extends Log {
class StompWireFormat extends WireFormat with DispatchLogging {
import StompWireFormat._
- protected def log: Log = StompWireFormat
+ override protected def log: Log = StompWireFormat
implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
implicit def wrap(x: Byte) = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:47:36 2010
@@ -31,6 +31,7 @@ import _root_.org.fusesource.hawtdispatc
import org.fusesource.hawtdispatch.BaseRetained
class StompBrokerPerfTest extends BaseBrokerPerfSupport {
+ println(getClass.getClassLoader.getResource("log4j.properties"));
override def createProducer() = new StompRemoteProducer()
override def createConsumer() = new StompRemoteConsumer()
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:47:36 2010
@@ -20,6 +20,8 @@ import org.apache.activemq.transport.Tra
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.DispatchSource;
@@ -43,6 +45,12 @@ import static org.apache.activemq.transp
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class TcpTransport implements Transport {
+
+ static {
+ System.out.println(TcpTransport.class.getClassLoader().getResource("log4j.properties"));
+ }
+ private static final Log LOG = LogFactory.getLog(TcpTransport.class);
+
private Map<String, Object> socketOptions;
enum SocketState {
@@ -209,8 +217,9 @@ public class TcpTransport implements Tra
});
readSource.setCancelHandler(new Runnable() {
public void run() {
- readSource.release();
- releaseResources();
+ trace("Read canceled");
+ writeSource.cancel();
+ trace("Canceling write");
}
});
@@ -228,8 +237,10 @@ public class TcpTransport implements Tra
});
writeSource.setCancelHandler(new Runnable() {
public void run() {
- writeSource.release();
- releaseResources();
+ trace("Write canceled");
+ writeSource.cancel();
+ trace("Disposeing");
+ dispose();
}
});
@@ -241,35 +252,46 @@ public class TcpTransport implements Tra
public void stop() throws Exception {
stop(null);
}
- public void stop(Runnable onCompleted) throws Exception {
+ public void stop(final Runnable onCompleted) throws Exception {
if (transportState != RUNNING) {
- throw new IllegalStateException("stop can only be used from the started state");
+ throw new IllegalStateException("stop can only be used from the started state but was "+transportState);
}
+ trace("Canceling read");
transportState = DISPOSED;
+ writeSource.setDisposer(new Runnable(){
+ public void run() {
+ trace("running callback: "+onCompleted);
+ if( onCompleted!=null ) {
+ onCompleted.run();
+ }
+ }
+ });
readSource.cancel();
- writeSource.setDisposer(onCompleted);
- writeSource.cancel();
}
- private void releaseResources() {
- if( writeSource.isReleased() && writeSource.isReleased() ) {
- try {
- channel.close();
- } catch (IOException ignore) {
- }
- listener.onTransportDisconnected();
- OneWay oneWay = outbound.poll();
- while (oneWay != null) {
- if (oneWay.retained != null) {
- oneWay.retained.release();
- }
- oneWay = outbound.poll();
- }
- setDispatchQueue(null);
- next_outbound_buffer = null;
- outbound_buffer = null;
- this.wireformat = null;
+ private void dispose() {
+
+ assert dispatchQueue!=null;
+ assert Dispatch.getCurrentQueue() == dispatchQueue;
+
+ try {
+ channel.close();
+ } catch (IOException ignore) {
}
+ listener.onTransportDisconnected();
+// OneWay oneWay = outbound.poll();
+// while (oneWay != null) {
+// if (oneWay.retained != null) {
+// oneWay.retained.release();
+// }
+// oneWay = outbound.poll();
+// }
+ readSource.release();
+ writeSource.release();
+ dispatchQueue.release();
+ next_outbound_buffer = null;
+ outbound_buffer = null;
+ this.wireformat = null;
}
public void onTransportFailure(IOException error) {
@@ -323,6 +345,7 @@ public class TcpTransport implements Tra
* @retruns true if the outbound has been drained of all objects and there are no in progress writes.
*/
private boolean drainOutbound() {
+ assert Dispatch.getCurrentQueue() == dispatchQueue;
try {
while (socketState == CONNECTED) {
@@ -433,12 +456,29 @@ public class TcpTransport implements Tra
return null;
}
+ private boolean assertConnected() {
+ try {
+ if (socketState != CONNECTED) {
+ throw new IOException("Not connected.");
+ }
+ return true;
+ } catch (IOException e) {
+ onTransportFailure(e);
+ }
+ return false;
+ }
+
public void suspendRead() {
- readSource.suspend();
+ if( assertConnected() ) {
+ readSource.suspend();
+ }
}
+
public void resumeRead() {
- readSource.resume();
+ if( assertConnected() ) {
+ readSource.resume();
+ }
}
public void reconnect(URI uri) {
@@ -490,4 +530,16 @@ public class TcpTransport implements Tra
this.useLocalHost = useLocalHost;
}
+
+ private void trace(String message) {
+ if( LOG.isTraceEnabled() ) {
+ final String label = dispatchQueue.getLabel();
+ if( label !=null ) {
+ LOG.trace(label +" | "+message);
+ } else {
+ LOG.trace(message);
+ }
+ }
+ }
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java?rev=961086&r1=961085&r2=961086&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/metric/MetricAggregator.java Wed Jul 7 03:47:36 2010
@@ -41,6 +41,10 @@ public class MetricAggregator extends Me
return metrics.remove(metric);
}
+ public void removeAllMetrics() {
+ metrics.clear();
+ }
+
public Float average() {
if (metrics.isEmpty()) {
return null;
@@ -54,6 +58,31 @@ public class MetricAggregator extends Me
return rc * 1.0f / count;
}
+ public Float deviation() {
+ if (metrics.isEmpty()) {
+ return null;
+ }
+ long values[] = new long[metrics.size()];
+
+ long sum=0;
+ for (int i=0; i < values.length; i++) {
+ values[i] = metrics.get(i).counter();
+ sum += values[i];
+ }
+
+ double mean = (1.0 * sum) / values.length;
+ double rc = 0;
+ for (long value : values) {
+ double v = value - mean;
+ rc += (v*v);
+ }
+ return (float)Math.sqrt(rc / values.length);
+ }
+
+ public Float total(Period p) {
+ return p.rate(total());
+ }
+
public long total() {
long rc = 0;
for (Metric metric : metrics) {