You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/09/29 19:34:08 UTC
svn commit: r1002766 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/
Author: chirino
Date: Wed Sep 29 17:34:07 2010
New Revision: 1002766
URL: http://svn.apache.org/viewvc?rev=1002766&view=rev
Log:
Initial stab at additional persistence tests, each across 1, 2, 4, 8 and 10 clients with 20b 1k and 10k messages.
1) Enqueue 1M messages
2) Dequeue 1M messages
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=1002766&r1=1002765&r2=1002766&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Sep 29 17:34:07 2010
@@ -26,15 +26,13 @@ import java.net.URL
*/
abstract class BaseBrokerPerfSupport extends BrokerPerfSupport {
- PERSISTENT = false
-
def reportResourceTemplate():URL = { classOf[BaseBrokerPerfSupport].getResource("report.html") }
def partitionedLoad = List(1, 2, 4, 8, 10)
def highContention = 10
def messageSizes = List(20,1024,1024*256)
- // benchmark( all the combinations
- for( ptp<- List(true,false) ; durable <- List(false) ; messageSize <- messageSizes ) {
+ // benchmark all the combinations
+ for( ptp <- List(true,false) ; durable <- List(false,true) ; messageSize <- messageSizes ) {
def benchmark(name:String)(func: =>Unit) {
test(name) {
@@ -45,7 +43,7 @@ abstract class BaseBrokerPerfSupport ext
}
}
- val prefix = (if( ptp ) "queue " else "topic ") +(if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" )+" "
+ val prefix = (if( ptp ) "queue " else "topic ") + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " "
val suffix = (if( durable ) " durable" else "")
if( ptp && durable ) {
@@ -119,7 +117,7 @@ abstract class BaseBrokerPerfSupport ext
// /**
// * benchmark(s 1 producers sending to 1 destination with 1 slow and 1 fast consumer.
-// *
+ // *
// * queue case: the producer should not slow down since it can dispatch to the
// * fast consumer
// *
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala?rev=1002766&r1=1002765&r2=1002766&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala Wed Sep 29 17:34:07 2010
@@ -24,15 +24,54 @@ import java.net.URL
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-abstract class BasePersistentBrokerPerfSupport extends BaseBrokerPerfSupport {
+abstract class BasePersistentBrokerPerfSupport extends BaseBrokerPerfSupport {
PERSISTENT = true
-
- override def reportResourceTemplate():URL = { classOf[BasePersistentBrokerPerfSupport].getResource("persistent-report.html") }
- override def partitionedLoad = List(1, 10, 50, 100)
+ override def reportResourceTemplate():URL = { classOf[BasePersistentBrokerPerfSupport].getResource("persistent-report.html") }
override def highContention = 100
+ for ( load <- partitionedLoad ; messageSize <- messageSizes ) {
+ val numMessages = 1000000 / load
+
+ val info = "queue " + numMessages + " " + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " with " + load + " "
+
+ test("En" + info + "producer(s)") {
+ MAX_MESSAGES = numMessages
+ PTP = true
+ PURGE_STORE = true
+ MESSAGE_SIZE = messageSize
+ producerCount = load;
+ destCount = 1;
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ test("De" + info + "consumer(s)") {
+ MAX_MESSAGES = numMessages
+ PTP = true
+ PURGE_STORE = false
+ MESSAGE_SIZE = messageSize
+ consumerCount = load;
+ destCount = 1;
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+ }
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1002766&r1=1002765&r2=1002766&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Wed Sep 29 17:34:07 2010
@@ -40,6 +40,8 @@ abstract class BrokerPerfSupport extends
var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
+ var MAX_MESSAGES = 0
+
protected var TCP = true // Set to use tcp IO
var USE_KAHA_DB = true
@@ -199,7 +201,7 @@ abstract class BrokerPerfSupport extends
connector.protocol = getBrokerProtocolName
val host = config.virtual_hosts.get(0)
- host.purge_on_startup = true
+ host.purge_on_startup = PURGE_STORE
config
}
@@ -268,6 +270,7 @@ abstract class BrokerPerfSupport extends
consumer.destination = destination
consumer.name = "Consumer:" + (i + 1)
consumer.rateAggregator = totalConsumerRate
+ consumer.maxMessages = MAX_MESSAGES
consumer.init
return consumer
@@ -289,6 +292,7 @@ abstract class BrokerPerfSupport extends
producer.messageIdGenerator = msgIdGenerator
producer.rateAggregator = totalProducerRate
producer.payloadSize = MESSAGE_SIZE
+ producer.maxMessages = MAX_MESSAGES
producer.init
producer
}
@@ -336,7 +340,7 @@ abstract class BrokerPerfSupport extends
}
tracker.await
}
-
+
def reportRates() = {
println("Warming up...")
@@ -348,24 +352,27 @@ abstract class BrokerPerfSupport extends
case class Summary(producer:java.lang.Float, pdev:java.lang.Float, consumer:java.lang.Float, cdev:java.lang.Float)
- val sample_rates = new Array[Summary](PERFORMANCE_SAMPLES)
var best = 0
- for (i <- 0 until PERFORMANCE_SAMPLES) {
- var p = new Period()
+ import scala.collection.mutable.ArrayBuffer
+
+ val sample_rates = new ArrayBuffer[Summary]()
+
+ def fillRateSummary(i: Int): Unit = {
+ val p = new Period()
Thread.sleep(SAMPLE_PERIOD)
- if( producerCount > 0 ) {
+ if (producerCount > 0) {
println(totalProducerRate.getRateSummary(p))
}
- if( consumerCount > 0 ) {
+ if (consumerCount > 0) {
println(totalConsumerRate.getRateSummary(p))
}
- sample_rates(i) = Summary(totalProducerRate.total(p), totalProducerRate.deviation, totalConsumerRate.total(p), totalConsumerRate.deviation)
+ sample_rates += Summary(totalProducerRate.total(p), totalProducerRate.deviation, totalConsumerRate.total(p), totalConsumerRate.deviation)
val current_sum = sample_rates(i).producer.longValue + sample_rates(i).consumer.longValue
val best_sum = sample_rates(best).producer.longValue + sample_rates(best).consumer.longValue
- if( current_sum > best_sum ) {
+ if (current_sum > best_sum) {
best = i
}
@@ -373,6 +380,30 @@ abstract class BrokerPerfSupport extends
totalConsumerRate.reset()
}
+ // either we want to do x number of samples or sample over the course of x number of messages
+ if ( MAX_MESSAGES == 0 ) {
+ for (i <- 0 until PERFORMANCE_SAMPLES) {
+ fillRateSummary(i)
+ }
+ } else {
+ var clientsRunning = true
+ var i = 0
+
+ while (clientsRunning) {
+ fillRateSummary(i)
+ i = i + 1
+ clientsRunning = false
+
+ def checkForRunningClients(connection: Connection) = {
+ if (connection.stopped == false) {
+ clientsRunning = true
+ }
+ }
+
+ producers.foreach(checkForRunningClients)
+ consumers.foreach(checkForRunningClients)
+ }
+ }
if( producerCount > 0 ) {
samples = samples ::: ( testName+" producer", sample_rates(best).producer ) :: Nil
@@ -386,8 +417,6 @@ abstract class BrokerPerfSupport extends
samples = samples ::: ( testName+" consumer sd", sample_rates(best).cdev ) :: Nil
}
}
-
-
}
@@ -402,6 +431,9 @@ abstract class RemoteConnection extends
var stopping:AtomicBoolean = null
var destination: Destination = null
+ var messageCount = 0
+ var maxMessages = 0
+
def init = {
if( rate.getName == null ) {
rate.name(name + " Rate")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1002766&r1=1002765&r2=1002766&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Sep 29 17:34:07 2010
@@ -20,6 +20,7 @@ import _root_.java.util.concurrent.TimeU
import _root_.org.apache.activemq.apollo.broker._
import _root_.org.apache.activemq.apollo.broker.perf._
import _root_.org.apache.activemq.apollo.stomp._
+import _root_.org.apache.activemq.apollo.util._
import _root_.org.fusesource.hawtbuf._
import collection.mutable.{ListBuffer, HashMap}
@@ -30,6 +31,7 @@ import _root_.org.apache.activemq.apollo
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import java.io.File
import org.apache.activemq.apollo.dto.{BrokerDTO, HawtDBStoreDTO}
+import org.apache.activemq.apollo.store.bdb.dto.BDBStoreDTO
class StompBrokerPerfTest extends BaseBrokerPerfSupport {
@@ -57,7 +59,7 @@ class StompPersistentBrokerPerfTest exte
}
class StompHawtDBPersistentBrokerPerfTest extends BasePersistentBrokerPerfSupport {
-
+
override def description = "Using the STOMP protocol over TCP persisting to the HawtDB store."
println(getClass.getClassLoader.getResource("log4j.properties"))
@@ -65,7 +67,7 @@ class StompHawtDBPersistentBrokerPerfTes
override def createProducer() = new StompRemoteProducer()
override def createConsumer() = new StompRemoteConsumer()
-
+
override def getRemoteProtocolName() = "stomp"
override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
@@ -78,11 +80,34 @@ class StompHawtDBPersistentBrokerPerfTes
rc
}
+}
+
+class StompBDBPersistentBrokerPerfTest extends BasePersistentBrokerPerfSupport {
+
+ override def description = "Using the STOMP protocol over TCP persisting to the BerkleyDB store."
+
+ println(getClass.getClassLoader.getResource("log4j.properties"))
+
+ override def createProducer() = new StompRemoteProducer()
+
+ override def createConsumer() = new StompRemoteConsumer()
+
+ override def getRemoteProtocolName() = "stomp"
+
+ override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
+ val rc = super.createBrokerConfig(name, bindURI, connectUri)
+
+ val store = new BDBStoreDTO
+ store.directory = new File(new File(testDataDir, getClass.getName), name)
+
+ rc.virtual_hosts.get(0).store = store
+ rc
+ }
}
-class StompRemoteConsumer extends RemoteConsumer {
+class StompRemoteConsumer extends RemoteConsumer with Logging {
var outboundSink: OverflowSink[StompFrame] = null
def onConnected() = {
@@ -115,12 +140,20 @@ class StompRemoteConsumer extends Remote
frame match {
case StompFrame(Responses.CONNECTED, headers, _, _) =>
case StompFrame(Responses.MESSAGE, headers, content, _) =>
- messageReceived();
+ if (maxMessages > 0 && messageCount < maxMessages - 1) {
+ messageReceived();
- // we client ack if persistent messages are being used.
- if( persistent ) {
- var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
- outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+ // we client ack if persistent messages are being used.
+ if( persistent ) {
+ var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
+ outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+ }
+ messageCount = messageCount + 1
+ if ( messageCount % 10000 == 0 ) {
+ trace("Received message count : " + messageCount)
+ }
+ } else {
+ stop()
}
case StompFrame(Responses.ERROR, headers, content, _) =>
@@ -131,43 +164,51 @@ class StompRemoteConsumer extends Remote
}
protected def messageReceived() {
- if (thinkTime > 0) {
- transport.suspendRead
- dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+ if (thinkTime > 0) {
+ transport.suspendRead
+ dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+ rate.increment();
+ if (!stopped) {
+ transport.resumeRead
+ }
+ })
+ } else {
rate.increment();
- if (!stopped) {
- transport.resumeRead
- }
- })
- } else {
- rate.increment();
- }
+ }
}
}
-class StompRemoteProducer extends RemoteProducer {
+class StompRemoteProducer extends RemoteProducer with Logging {
var outboundSink: OverflowSink[StompFrame] = null
var stompDestination: AsciiBuffer = null
var frame:StompFrame = null
def send_next: Unit = {
- var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
- if (property != null) {
- headers ::= (ascii(property), ascii(property));
- }
- if( persistent ) {
- headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+ if (maxMessages > 0 && messageCount < maxMessages) {
+ var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+ headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+ if (property != null) {
+ headers ::= (ascii(property), ascii(property));
+ }
+ if( persistent ) {
+ headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+ }
+ // var p = this.priority;
+ // if (priorityMod > 0) {
+ // p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+ // }
+
+ var content = ascii(createPayload());
+ frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
+ messageCount = messageCount + 1
+ if ( messageCount % 10000 == 0 ) {
+ trace("Sent message count : " + messageCount)
+ }
+ drain()
+ } else {
+ stop()
}
- // var p = this.priority;
- // if (priorityMod > 0) {
- // p = if ((counter % priorityMod) == 0) { 0 } else { priority }
- // }
-
- var content = ascii(createPayload());
- frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
- drain()
}
def drain() = {