You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:46:58 UTC
svn commit: r961083 - in /activemq/sandbox/activemq-apollo-actor: ./
activemq-broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
activemq-hawtdb/src/main/java...
Author: chirino
Date: Wed Jul 7 03:46:57 2010
New Revision: 961083
URL: http://svn.apache.org/viewvc?rev=961083&view=rev
Log:
- added async aware version of start/stop to the service interface
- cleaned up startup/shutdown logic in the broker and test case.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
- copied, changed from r961082, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java
activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java
activemq/sandbox/activemq-apollo-actor/pom.xml
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/pom.xml Wed Jul 7 03:46:57 2010
@@ -91,6 +91,13 @@
<version>${junit-version}</version>
</dependency>
<dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest</artifactId>
+ <version>${scalatest-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:46:57 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.apollo.broker
import _root_.java.io.{File}
-import _root_.java.util.{LinkedList, LinkedHashMap, ArrayList}
import _root_.org.apache.activemq.transport._
import _root_.org.apache.activemq.Service
import _root_.java.lang.{String}
@@ -27,6 +26,8 @@ import _root_.org.fusesource.hawtdispatc
import _root_.scala.collection.JavaConversions._
import _root_.scala.reflect.BeanProperty
import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
+import java.util.{HashSet, LinkedList, LinkedHashMap, ArrayList}
+import java.util.concurrent.{TimeUnit, CountDownLatch}
object BrokerFactory {
@@ -65,6 +66,63 @@ object BrokerFactory {
}
+class CompletionTracker(val queue:DispatchQueue=getGlobalQueue) {
+ private[this] val tasks = new HashSet[Runnable]()
+ private[this] var _callback:Runnable = null
+ queue.retain
+
+ def task(name:String="unknown"):Runnable = {
+ val rc = new Runnable() {
+ def run():Unit = {
+ tasks.synchronized {
+ if( tasks.remove(this) ) {
+ if( tasks.isEmpty && _callback!=null ) {
+ _callback ->: queue
+ queue.release
+ }
+ }
+ }
+ }
+ override def toString = name
+ }
+ tasks.synchronized {
+ if( _callback!=null ) {
+ throw new IllegalStateException("all tasks should be created before setting the callback");
+ }
+ tasks.add(rc)
+ }
+ return rc
+ }
+
+ def callback(handler: =>Unit ) {
+ tasks.synchronized {
+ _callback = handler _
+ if( tasks.isEmpty ) {
+ _callback ->: queue
+ queue.release
+ }
+ }
+ }
+
+ def await() = {
+ val latch =new CountDownLatch(1)
+ callback {
+ latch.countDown
+ }
+ latch.await
+ }
+
+ def await(timeout:Long, unit:TimeUnit) = {
+ val latch = new CountDownLatch(1)
+ callback {
+ latch.countDown
+ }
+ latch.await(timeout, unit)
+ }
+
+ override def toString = tasks.synchronized { "waiting on: "+tasks }
+}
+
object BufferConversions {
implicit def toAsciiBuffer(value:String) = new AsciiBuffer(value)
@@ -109,8 +167,11 @@ class Broker() extends Service with Disp
@BeanProperty
var defaultVirtualHost: VirtualHost = null
- def start = runtime.start
- def stop = runtime.stop
+ def start = runtime.start(null)
+ def start(onComplete:Runnable) = runtime.start(onComplete)
+
+ def stop = runtime.stop(null)
+ def stop(onComplete:Runnable) = runtime.stop(onComplete)
val dispatchQueue = createQueue("broker");
dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
@@ -148,9 +209,10 @@ class Broker() extends Service with Disp
debug("Accepted connection from: %s", transport.getRemoteAddress)
var connection = new BrokerConnection(Broker.this)
connection.transport = transport
+ connection.dispatchQueue.retain
clientConnections.add(connection)
try {
- connection.start
+ connection.start()
}
catch {
case e1: Exception => {
@@ -161,7 +223,13 @@ class Broker() extends Service with Disp
}
var state = CONFIGURATION
- val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
+ val clientConnections: HashSet[Connection] = new HashSet[Connection]
+
+ def stopped(connection:Connection) = ^{
+ if( clientConnections.remove(connection) ) {
+ connection.dispatchQueue.release
+ }
+ } ->: dispatchQueue
def removeConnectUri(uri: String): Unit = ^ {
connectUris.remove(uri)
@@ -202,7 +270,11 @@ class Broker() extends Service with Disp
new ArrayList[TransportServer](transportServers)
} ->: dispatchQueue
- def start = ^ {
+ def start(onCompleted:Runnable) = ^ {
+ _start(onCompleted)
+ } ->: dispatchQueue
+
+ def _start(onCompleted:Runnable) = {
if (state == CONFIGURATION) {
// We can apply defaults now
if (dataDirectory == null) {
@@ -218,23 +290,37 @@ class Broker() extends Service with Disp
state = STARTING
+ val tracker = new CompletionTracker(dispatchQueue)
for (virtualHost <- virtualHosts.values) {
- virtualHost.start
+ virtualHost.start(tracker.task("virtual host: "+virtualHost))
}
for (server <- transportServers) {
server.setDispatchQueue(dispatchQueue)
server.setAcceptListener(new BrokerAcceptListener)
- server.start
+ server.start(tracker.task("transport server: "+server))
}
- state = RUNNING
+ tracker.callback {
+ state = RUNNING
+ if( onCompleted!=null ) {
+ onCompleted.run
+ }
+ }
+
} else {
warn("Can only start a broker that is in the " + CONFIGURATION + " state. Broker was " + state)
}
- } ->: dispatchQueue
+ }
+
- def stop: Unit = ^ {
+ def stop(onCompleted:Runnable): Unit = ^ {
if (state == RUNNING) {
state = STOPPING
+ dispatchQueue.setDisposer(^{
+ if( onCompleted!=null ) {
+ state = STOPPED;
+ onCompleted.run
+ }
+ })
for (server <- transportServers) {
stopService(server)
@@ -245,9 +331,8 @@ class Broker() extends Service with Disp
for (virtualHost <- virtualHosts.values) {
stopService(virtualHost)
}
- state = STOPPED;
+ dispatchQueue.release
}
-
} ->: dispatchQueue
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:46:57 2010
@@ -44,19 +44,25 @@ abstract class Connection() extends Tran
var name = "connection"
var stopped = false;
-
var transport:Transport = null
- def start() = {
+ def start() = start(null)
+
+ def start(onCompleted:Runnable) = {
transport.setDispatchQueue(dispatchQueue);
transport.setTransportListener(Connection.this);
- transport.start()
+ transport.start(onCompleted)
}
- def stop() = {
- stopped=true
- transport.stop()
- dispatchQueue.release
+ def stop() = stop(null)
+
+ def stop(onCompleted:Runnable) = {
+ if( !stopped ) {
+ stopped=true
+ transport.stop()
+ dispatchQueue.setDisposer(onCompleted)
+ dispatchQueue.release
+ }
}
def onTransportFailure(error:IOException) = {
@@ -83,10 +89,19 @@ class BrokerConnection(val broker: Broke
var protocol = "stomp"
var protocolHandler: ProtocolHandler = null;
- override def start() = {
+ override def start(onCompleted:Runnable) = {
+ broker.dispatchQueue.retain
protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
protocolHandler.setConnection(this);
- super.start
+ super.start(onCompleted)
+ }
+
+ override def stop(onCompleted:Runnable) = {
+ if( !stopped ) {
+ broker.runtime.stopped(this)
+ broker.dispatchQueue.release
+ super.stop(onCompleted)
+ }
}
override def onTransportConnected() = protocolHandler.onTransportConnected
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul 7 03:46:57 2010
@@ -26,6 +26,11 @@ trait Log {
val log = LogFactory.getLog(getClass.getName)
}
+class NamedLog(name:String) extends Log {
+ def this(clazz:Class[_]) = this(clazz.getName)
+ override val log = LogFactory.getLog(name)
+}
+
object Logging {
val exception_id_generator = new AtomicLong(System.currentTimeMillis)
def next_exception_id = exception_id_generator.incrementAndGet.toHexString
@@ -37,7 +42,7 @@ object Logging {
trait Logging {
import Logging._
- protected def log: Log
+ protected def log: Log = new NamedLog(getClass)
protected def log_map(message:String) = message
protected def error(message: => String, args:Any*): Unit = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 03:46:57 2010
@@ -54,23 +54,26 @@ class VirtualHost() extends Service with
@BeanProperty
var transactionManager:TransactionManager = new TransactionManager
+ override def toString = names.head
- def start():Unit = {
- if (started) {
- return;
- }
- database.virtualHost = this
- database.start();
+ def start() = start(null)
+ def start(onCompleted:Runnable):Unit = {
+ if (started) {
+ return;
+ }
+
+ database.virtualHost = this
+ database.start();
// router.setDatabase(database);
- //Recover queues:
- queueStore.setDatabase(database);
- queueStore.setDispatchQueue(q);
- queueStore.loadQueues();
+ //Recover queues:
+ queueStore.setDatabase(database);
+ queueStore.setDispatchQueue(q);
+ queueStore.loadQueues();
- // Create Queue instances
+ // Create Queue instances
// TODO:
// for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
// Queue queue = new Queue(iQueue);
@@ -84,15 +87,19 @@ class VirtualHost() extends Service with
// queue.start();
// }
- //Recover transactions:
- transactionManager.virtualHost = this
- transactionManager.loadTransactions();
- started = true;
+ //Recover transactions:
+ transactionManager.virtualHost = this
+ transactionManager.loadTransactions();
+ started = true;
+
+ if( onCompleted!=null ) {
+ onCompleted.run
+ }
}
-
- def stop():Unit = {
+ def stop() = start(null)
+ def stop(onCompleted:Runnable):Unit = {
if (!started) {
return;
@@ -211,7 +218,7 @@ class VirtualHost() extends Service with
}
}
-class BrokerDatabase() extends Service {
+class BrokerDatabase() {
@BeanProperty
var store:Store=new MemoryStore;
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (from r961082, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala&r1=961082&r2=961083&rev=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul 7 03:46:57 2010
@@ -25,126 +25,18 @@ import org.apache.activemq.transport.Tra
import _root_.scala.collection.JavaConversions._
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.apache.activemq.apollo.broker._
import org.apache.activemq.util.buffer.AsciiBuffer
import org.apache.activemq.broker.store.{Store, StoreFactory}
import java.io.{File, IOException}
-import java.util.concurrent.TimeUnit
import java.util.ArrayList
+import org.scalatest.Informer
+import org.fusesource.hawtdispatch.BaseRetained
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import org.apache.activemq.apollo.broker._
-abstract class RemoteConsumer extends Connection {
- val consumerRate = new MetricCounter();
- var totalConsumerRate: MetricAggregator = null
- var thinkTime: Long = 0
- var destination: Destination = null
- var selector: String = null;
- var durable = false;
- var uri: String = null
- var brokerPerfTest:BaseBrokerPerfTest = null
-
- override def start() = {
- consumerRate.name("Consumer " + name + " Rate");
- totalConsumerRate.add(consumerRate);
- transport = TransportFactory.connect(uri);
- super.start();
- }
-
-
- override def onTransportConnected() = {
- setupSubscription();
- transport.resumeRead
- }
-
- override def onTransportFailure(error: IOException) = {
- if (!brokerPerfTest.stopping.get()) {
- System.err.println("Client Async Error:");
- error.printStackTrace();
- }
- }
-
- protected def setupSubscription()
-
-}
-
-
-abstract class RemoteProducer extends Connection {
- val rate = new MetricCounter();
-
- var messageIdGenerator: AtomicLong = null
- var priority = 0
- var persistentDelivery = false
- var priorityMod = 0
- var counter = 0
- var producerId = 0
- var destination: Destination = null
- var property: String = null
- var totalProducerRate: MetricAggregator = null
- var next: Delivery = null
- var thinkTime: Long = 0
-
- var filler: String = null
- var payloadSize = 20
- var uri: String = null
- var brokerPerfTest:BaseBrokerPerfTest = null
-
- override def onTransportFailure(error: IOException) = {
- if (!brokerPerfTest.stopping.get()) {
- System.err.println("Client Async Error:");
- error.printStackTrace();
- }
- }
-
- override def start() = {
-
- if (payloadSize > 0) {
- var sb = new StringBuilder(payloadSize);
- for (i <- 0 until payloadSize) {
- sb.append(('a' + (i % 26)).toChar);
- }
- filler = sb.toString();
- }
-
- rate.name("Producer " + name + " Rate");
- totalProducerRate.add(rate);
-
- transport = TransportFactory.connect(uri);
- super.start();
-
- }
-
- override def onTransportConnected() = {
- setupProducer();
- transport.resumeRead
- }
-
- def setupProducer()
-
-def createPayload(): String = {
- if (payloadSize >= 0) {
- var sb = new StringBuilder(payloadSize);
- sb.append(name);
- sb.append(':');
- counter += 1
- sb.append(counter);
- sb.append(':');
- var length = sb.length;
- if (length <= payloadSize) {
- sb.append(filler.subSequence(0, payloadSize - length));
- return sb.toString();
- } else {
- return sb.substring(0, payloadSize);
- }
- } else {
- counter += 1
- return name + ":" + (counter);
- }
- }
-
-}
-
-object BaseBrokerPerfTest {
- var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3000000"))
+object BaseBrokerPerfSupport {
+ var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
var IO_WORK_AMOUNT = 0
var FANIN_COUNT = 10
var FANOUT_COUNT = 10
@@ -157,8 +49,9 @@ object BaseBrokerPerfTest {
var DURABLE = false;
}
-abstract class BaseBrokerPerfTest {
- import BaseBrokerPerfTest._
+
+abstract class BaseBrokerPerfSupport extends FunSuiteSupport {
+ import BaseBrokerPerfSupport._
// Set to put senders and consumers on separate brokers.
protected var multibroker = false;
@@ -191,10 +84,10 @@ abstract class BaseBrokerPerfTest {
val producers = new ArrayList[RemoteProducer]()
val consumers = new ArrayList[RemoteConsumer]()
- var name: String = null;
- @Before
- def setUp() = {
+
+ override protected def beforeAll(configMap: Map[String, Any]) = {
+ super.beforeAll(configMap)
if (tcp) {
sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
@@ -214,41 +107,28 @@ abstract class BaseBrokerPerfTest {
}
}
- def setName(name: String) = {
- if (this.name == null) {
- this.name = name;
- }
- }
-
- def getName() = name
-
def getBrokerWireFormat() = "multi"
def getRemoteWireFormat(): String
- @Test
- def benchmark_1_1_0(): Unit = {
- setName("1 producer -> 1 destination -> 0 consumers");
- if (ptp) {
- return;
- }
- producerCount = 1;
- destCount = 1;
-
- createConnections();
-
- // Start 'em up.
- startClients();
- try {
- reportRates();
- } finally {
- stopServices();
+ if (!ptp) {
+ test("1 producer -> 1 destination -> 0 consumers") {
+ producerCount = 1;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
}
}
- @Test
- def benchmark_1_1_1() = {
- setName("1 producer -> 1 destination -> 1 consumers");
+ test("1 producer -> 1 destination -> 1 consumers") {
producerCount = 1;
destCount = 1;
consumerCount = 1;
@@ -265,9 +145,7 @@ abstract class BaseBrokerPerfTest {
}
}
- @Test
- def benchmark_10_1_10() = {
- setName(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT));
+ test(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT)) {
producerCount = FANIN_COUNT;
consumerCount = FANOUT_COUNT;
destCount = 1;
@@ -283,9 +161,7 @@ abstract class BaseBrokerPerfTest {
}
}
- @Test
- def benchmark_10_1_1() = {
- setName(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT));
+ test(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT)) {
producerCount = FANIN_COUNT;
destCount = 1;
consumerCount = 1;
@@ -301,9 +177,7 @@ abstract class BaseBrokerPerfTest {
}
}
- @Test
- def benchmark_1_1_10() = {
- setName(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT));
+ test(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT)) {
producerCount = 1;
destCount = 1;
consumerCount = FANOUT_COUNT;
@@ -319,9 +193,7 @@ abstract class BaseBrokerPerfTest {
}
}
- @Test
- def benchmark_2_2_2() = {
- setName(format("2 producer -> 2 destination -> 2 consumers"));
+ test("2 producer -> 2 destination -> 2 consumers") {
producerCount = 2;
destCount = 2;
consumerCount = 2;
@@ -337,9 +209,7 @@ abstract class BaseBrokerPerfTest {
}
}
- @Test
- def benchmark_10_10_10() = {
- setName(format("10 producers -> 10 destinations -> 10 consumers"));
+ test("10 producers -> 10 destinations -> 10 consumers") {
producerCount = 10;
destCount = 10;
consumerCount = 10;
@@ -362,9 +232,7 @@ abstract class BaseBrokerPerfTest {
*
* @throws Exception
*/
- @Test
- def benchmark_2_2_2_SlowConsumer() = {
- setName(format("2 producer -> 2 destination -> 2 slow consumers"));
+ test("2 producer -> 2 destination -> 2 slow consumers") {
producerCount = 2;
destCount = 2;
consumerCount = 2;
@@ -381,9 +249,7 @@ abstract class BaseBrokerPerfTest {
}
}
- @Test
- def benchmark_2_2_2_Selector() = {
- setName(format("2 producer -> 2 destination -> 2 selector consumers"));
+ test("2 producer -> 2 destination -> 2 selector consumers") {
producerCount = 2;
destCount = 2;
consumerCount = 2;
@@ -412,10 +278,7 @@ abstract class BaseBrokerPerfTest {
*
* @throws Exception
*/
- @Test
- def benchmark_2_1_1_HighPriorityProducer() = {
-
- setName(format("1 high and 1 normal priority producer -> 1 destination -> 1 consumer"));
+ test("1 high and 1 normal priority producer -> 1 destination -> 1 consumer") {
producerCount = 2;
destCount = 1;
consumerCount = 1;
@@ -430,14 +293,13 @@ abstract class BaseBrokerPerfTest {
// Start 'em up.
startClients();
try {
-
- System.out.println("Checking rates for test: " + getName());
+ println("Checking rates...");
for (i <- 0 until PERFORMANCE_SAMPLES) {
var p = new Period();
Thread.sleep(1000 * 5);
- System.out.println(producer.rate.getRateSummary(p));
- System.out.println(totalProducerRate.getRateSummary(p));
- System.out.println(totalConsumerRate.getRateSummary(p));
+ println(producer.rate.getRateSummary(p));
+ println(totalProducerRate.getRateSummary(p));
+ println(totalConsumerRate.getRateSummary(p));
totalProducerRate.reset();
totalConsumerRate.reset();
}
@@ -453,10 +315,7 @@ abstract class BaseBrokerPerfTest {
*
* @throws Exception
*/
- @Test
- def benchmark_2_1_1_MixedHighPriorityProducer() = {
-
- setName(format("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer"));
+ test("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer") {
producerCount = 2;
destCount = 1;
consumerCount = 1;
@@ -473,13 +332,13 @@ abstract class BaseBrokerPerfTest {
startClients();
try {
- System.out.println("Checking rates for test: " + getName());
+ println("Checking rates...");
for (i <- 0 until PERFORMANCE_SAMPLES) {
var p = new Period();
Thread.sleep(1000 * 5);
- System.out.println(producer.rate.getRateSummary(p));
- System.out.println(totalProducerRate.getRateSummary(p));
- System.out.println(totalConsumerRate.getRateSummary(p));
+ println(producer.rate.getRateSummary(p));
+ println(totalProducerRate.getRateSummary(p));
+ println(totalConsumerRate.getRateSummary(p));
totalProducerRate.reset();
totalConsumerRate.reset();
}
@@ -490,12 +349,12 @@ abstract class BaseBrokerPerfTest {
}
def reportRates() = {
- System.out.println("Checking rates for test: " + getName() + ", " + (if (ptp) {"ptp"} else {"topic"}));
+ println("Checking "+(if (ptp) "ptp" else "topic")+" rates...");
for (i <- 0 until PERFORMANCE_SAMPLES) {
var p = new Period();
Thread.sleep(1000 * 5);
- System.out.println(totalProducerRate.getRateSummary(p));
- System.out.println(totalConsumerRate.getRateSummary(p));
+ println(totalProducerRate.getRateSummary(p));
+ println(totalConsumerRate.getRateSummary(p));
totalProducerRate.reset();
totalConsumerRate.reset();
}
@@ -601,35 +460,154 @@ abstract class BaseBrokerPerfTest {
private def stopServices() = {
stopping.set(true);
+ val tracker = new CompletionTracker
for (broker <- brokers) {
- broker.stop();
+ broker.stop(tracker.task());
}
+ brokers.clear
for (connection <- producers) {
- connection.stop();
+ connection.stop(tracker.task());
}
+ producers.clear
for (connection <- consumers) {
- connection.stop();
+ connection.stop(tracker.task());
}
+ consumers.clear
+ println("waiting for services to stop");
+ tracker.await
+ stopping.set(false)
}
private def startBrokers() = {
+ val tracker = new CompletionTracker
for (broker <- brokers) {
- broker.start();
+ broker.start(tracker.task());
}
+ tracker.await
}
+
private def startClients() = {
- // Start the clients after a delay to give the server a chance to startup.
- getGlobalQueue.dispatchAfter(200, TimeUnit.MILLISECONDS, ^{
- for (connection <- consumers) {
- connection.start();
+ var tracker = new CompletionTracker
+ for (connection <- consumers) {
+ connection.start(tracker.task());
+ }
+ tracker.await
+ tracker = new CompletionTracker
+ for (connection <- producers) {
+ connection.start(tracker.task());
+ }
+ tracker.await
+ }
+
+}
+
+abstract class RemoteConsumer extends Connection {
+ val consumerRate = new MetricCounter();
+ var totalConsumerRate: MetricAggregator = null
+ var thinkTime: Long = 0
+ var destination: Destination = null
+ var selector: String = null;
+ var durable = false;
+ var uri: String = null
+ var brokerPerfTest:BaseBrokerPerfSupport = null
+
+ override def start(onComplete:Runnable) = {
+ consumerRate.name("Consumer " + name + " Rate");
+ totalConsumerRate.add(consumerRate);
+ transport = TransportFactory.connect(uri);
+ super.start(onComplete);
+ }
+
+
+ override def onTransportConnected() = {
+ setupSubscription();
+ transport.resumeRead
+ }
+
+ override def onTransportFailure(error: IOException) = {
+ if (!brokerPerfTest.stopping.get()) {
+ System.err.println("Client Async Error:");
+ error.printStackTrace();
+ }
+ }
+
+ protected def setupSubscription()
+
+}
+
+
+abstract class RemoteProducer extends Connection {
+ val rate = new MetricCounter();
+
+ var messageIdGenerator: AtomicLong = null
+ var priority = 0
+ var persistentDelivery = false
+ var priorityMod = 0
+ var counter = 0
+ var producerId = 0
+ var destination: Destination = null
+ var property: String = null
+ var totalProducerRate: MetricAggregator = null
+ var next: Delivery = null
+ var thinkTime: Long = 0
+
+ var filler: String = null
+ var payloadSize = 20
+ var uri: String = null
+ var brokerPerfTest:BaseBrokerPerfSupport = null
+
+ override def onTransportFailure(error: IOException) = {
+ if (!brokerPerfTest.stopping.get()) {
+ System.err.println("Client Async Error:");
+ error.printStackTrace();
+ }
+ }
+
+ override def start(onComplete:Runnable) = {
+
+ if (payloadSize > 0) {
+ var sb = new StringBuilder(payloadSize);
+ for (i <- 0 until payloadSize) {
+ sb.append(('a' + (i % 26)).toChar);
}
- })
- getGlobalQueue.dispatchAfter(400, TimeUnit.MILLISECONDS, ^{
- for (connection <- producers) {
- connection.start();
+ filler = sb.toString();
+ }
+
+ rate.name("Producer " + name + " Rate");
+ totalProducerRate.add(rate);
+
+ transport = TransportFactory.connect(uri);
+ super.start(onComplete);
+
+ }
+
+ override def onTransportConnected() = {
+ setupProducer();
+ transport.resumeRead
+ }
+
+ def setupProducer()
+
+def createPayload(): String = {
+ if (payloadSize >= 0) {
+ var sb = new StringBuilder(payloadSize);
+ sb.append(name);
+ sb.append(':');
+ counter += 1
+ sb.append(counter);
+ sb.append(':');
+ var length = sb.length;
+ if (length <= payloadSize) {
+ sb.append(filler.subSequence(0, payloadSize - length));
+ return sb.toString();
+ } else {
+ return sb.substring(0, payloadSize);
}
- })
+ } else {
+ counter += 1
+ return name + ":" + (counter);
+ }
}
-}
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala?rev=961083&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/FunSuiteSupport.scala Wed Jul 7 03:46:57 2010
@@ -0,0 +1,33 @@
+package org.apache.activemq.apollo.broker.perf
+
+import _root_.org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import java.io.File
+import java.lang.String
+import collection.immutable.Map
+import org.apache.activemq.apollo.broker.Logging
+
+/**
+ * @version $Revision : 1.1 $
+ */
+@RunWith(classOf[JUnitRunner])
+abstract class FunSuiteSupport extends FunSuite with Logging with BeforeAndAfterAll {
+ protected var _basedir = "."
+
+ /**
+ * Returns the base directory of the current project
+ */
+ def baseDir = {
+ new File(_basedir)
+ }
+
+
+ override protected def beforeAll(map: Map[String, Any]): Unit = {
+ _basedir = map.get("basedir") match {
+ case Some(basedir) => basedir.toString
+ case _ => System.getProperty("basedir", ".")
+ }
+ debug("using basedir: " + _basedir)
+ }
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Wed Jul 7 03:46:57 2010
@@ -128,12 +128,20 @@ public class KahaDBStore implements Stor
if (started.compareAndSet(false, true)) {
try {
load();
+
} catch (Exception e) {
LOG.error("Error loading store", e);
}
}
}
+ public void start(Runnable onComplete) throws Exception {
+ start();
+ if( onComplete!=null ) {
+ onComplete.run();
+ }
+ }
+
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
flush();
@@ -141,6 +149,13 @@ public class KahaDBStore implements Stor
}
}
+ public void stop(Runnable onComplete) throws Exception {
+ stop();
+ if( onComplete!=null ) {
+ onComplete.run();
+ }
+ }
+
/**
* @return a unique sequential store tracking number.
*/
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/pom.xml Wed Jul 7 03:46:57 2010
@@ -87,6 +87,12 @@
<version>${junit-version}</version>
</dependency>
<dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest</artifactId>
+ <version>${scalatest-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<!--<scope>test</scope>-->
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:46:57 2010
@@ -30,14 +30,7 @@ import _root_.org.apache.activemq.apollo
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import org.fusesource.hawtdispatch.BaseRetained
-object StompBrokerPerfTest {
- def main(args:Array[String]) = {
- val test = new StompBrokerPerfTest();
- test.setUp
- test.benchmark_1_1_1
- }
-}
-class StompBrokerPerfTest extends BaseBrokerPerfTest {
+class StompBrokerPerfTest extends BaseBrokerPerfSupport {
override def createProducer() = new StompRemoteProducer()
override def createConsumer() = new StompRemoteConsumer()
Modified: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Wed Jul 7 03:46:57 2010
@@ -641,9 +641,21 @@ public class MemoryStore implements Stor
public void start() throws Exception {
}
+ public void start(Runnable onComplete) throws Exception {
+ if( onComplete!=null ) {
+ onComplete.run();
+ }
+ }
+
public void stop() throws Exception {
}
+ public void stop(Runnable onComplete) throws Exception {
+ if( onComplete!=null ) {
+ onComplete.run();
+ }
+ }
+
public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable runnable) throws T {
R rc = callback.execute(session);
if (runnable != null) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:46:57 2010
@@ -119,6 +119,9 @@ public class TcpTransport implements Tra
}
public void start() throws Exception {
+ start(null);
+ }
+ public void start(Runnable onCompleted) throws Exception {
if (dispatchQueue == null) {
throw new IllegalArgumentException("dispatchQueue is not set");
}
@@ -169,6 +172,10 @@ public class TcpTransport implements Tra
} else {
fireConnected();
}
+ if( onCompleted!=null ) {
+ dispatchQueue.execute(onCompleted);
+ }
+
}
@@ -232,11 +239,15 @@ public class TcpTransport implements Tra
public void stop() throws Exception {
+ stop(null);
+ }
+ public void stop(Runnable onCompleted) throws Exception {
if (transportState != RUNNING) {
throw new IllegalStateException("stop can only be used from the started state");
}
transportState = DISPOSED;
readSource.cancel();
+ writeSource.setDisposer(onCompleted);
writeSource.cancel();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Jul 7 03:46:57 2010
@@ -84,7 +84,10 @@ public class TcpTransportServer implemen
acceptSource.resume();
}
- public void start() throws IOException {
+ public void start() throws Exception {
+ start(null);
+ }
+ public void start(Runnable onCompleted) throws IOException {
URI bind = bindURI;
String host = bind.getHost();
@@ -138,6 +141,9 @@ public class TcpTransportServer implemen
}
});
acceptSource.resume();
+ if( onCompleted!=null ) {
+ dispatchQueue.execute(onCompleted);
+ }
}
private URI connectURI(String hostname) throws URISyntaxException {
@@ -160,6 +166,10 @@ public class TcpTransportServer implemen
}
public void stop() throws Exception {
+ stop(null);
+ }
+ public void stop(Runnable onCompleted) throws Exception {
+ acceptSource.setDisposer(onCompleted);
acceptSource.release();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul 7 03:46:57 2010
@@ -90,6 +90,10 @@ public class TransportFilter implements
next.start();
}
+ public void start(Runnable onComplete) throws Exception {
+ next.start(onComplete);
+ }
+
/**
* @see org.apache.activemq.Service#stop()
*/
@@ -97,6 +101,10 @@ public class TransportFilter implements
next.stop();
}
+ public void stop(Runnable onComplete) throws Exception {
+ next.stop(onComplete);
+ }
+
public void onTransportCommand(Object command) {
transportListener.onTransportCommand(command);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul 7 03:46:57 2010
@@ -67,6 +67,9 @@ public class PipeTransport implements Tr
}
public void start() throws Exception {
+ start(null);
+ }
+ public void start(final Runnable onCompleted) throws Exception {
if (dispatchQueue == null) {
throw new IllegalArgumentException("dispatchQueue is not set");
}
@@ -107,6 +110,10 @@ public class PipeTransport implements Tr
fireConnected();
peer.fireConnected();
}
+ if( onCompleted!=null ) {
+ onCompleted.run();
+ }
+
}
});
}
@@ -123,10 +130,14 @@ public class PipeTransport implements Tr
}
public void stop() throws Exception {
+ stop(null);
+ }
+ public void stop(Runnable onCompleted) throws Exception {
if( connected ) {
peer.dispatchSource.merge(EOF_TOKEN);
}
if( dispatchSource!=null ) {
+ dispatchSource.setDisposer(onCompleted);
dispatchSource.release();
dispatchSource = null;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportServer.java Wed Jul 7 03:46:57 2010
@@ -75,6 +75,9 @@ public class PipeTransportServer impleme
}
public void start() throws Exception {
+ start(null);
+ }
+ public void start(Runnable onCompleted) throws Exception {
acceptSource = Dispatch.createSource(EventAggregators.<PipeTransport>linkedList(), dispatchQueue);
acceptSource.setEventHandler(new Runnable() {
public void run() {
@@ -84,11 +87,18 @@ public class PipeTransportServer impleme
}
}
});
+ if( onCompleted!=null ) {
+ dispatchQueue.execute(onCompleted);
+ }
}
public void stop() throws Exception {
- acceptSource.release();
+ stop(null);
+ }
+ public void stop(Runnable onCompleted) throws Exception {
PipeTransportFactory.unbind(this);
+ acceptSource.setDisposer(onCompleted);
+ acceptSource.release();
}
public void setConnectURI(URI connectURI) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/Service.java Wed Jul 7 03:46:57 2010
@@ -19,18 +19,35 @@ package org.apache.activemq;
/**
* The core lifecyle interface for ActiveMQ components.
- *
- * If there was a standard way to do so, it'd be good to register this
- * interface with Spring so it treats the start/stop methods as those of
- * {@link org.springframework.beans.factory.InitializingBean}
- * and {@link org.springframework.beans.factory.DisposableBean}
- *
+ *
* @version $Revision: 1.1 $
*/
public interface Service {
+ /**
+ * Starts the service. No guarantee is given that the service has fully started
+ * by the time this method returns.
+ */
void start() throws Exception;
-
+
+ /**
+ * Starts the service. Executes the onComplete runnable once the service has fully started up.
+ *
+ * @param onComplete my be set to null if not interested in a callback.
+ */
+ void start(Runnable onComplete) throws Exception;
+
+ /**
+ * Stops the service. No guarantee is given that the service has fully stopped
+ * by the time this method returns.
+ */
void stop() throws Exception;
-
+
+ /**
+ * Stops the service. Executes the onComplete runnable once the service has fully stopped.
+ *
+ * @param onComplete my be set to null if not interested in a callback.
+ */
+ void stop(Runnable onComplete) throws Exception;
+
}
Modified: activemq/sandbox/activemq-apollo-actor/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/pom.xml?rev=961083&r1=961082&r2=961083&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/pom.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/pom.xml Wed Jul 7 03:46:57 2010
@@ -135,7 +135,6 @@
<module>activemq-tcp</module>
<module>activemq-hawtdb</module>
<module>activemq-jaxb</module>
- <module>activemq-scala</module>
<module>activemq-stomp</module>
</modules>