You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/09/29 19:34:08 UTC

svn commit: r1002766 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/

Author: chirino
Date: Wed Sep 29 17:34:07 2010
New Revision: 1002766

URL: http://svn.apache.org/viewvc?rev=1002766&view=rev
Log:
Initial stab at additional persistence tests, each across 1, 2, 4, 8 and 10 clients with 20b 1k and 10k messages.

1)  Enqueue 1M messages
2)  Dequeue 1M messages

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=1002766&r1=1002765&r2=1002766&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Sep 29 17:34:07 2010
@@ -26,15 +26,13 @@ import java.net.URL
  */
 abstract class BaseBrokerPerfSupport extends BrokerPerfSupport {
 
-  PERSISTENT = false
-
   def reportResourceTemplate():URL = { classOf[BaseBrokerPerfSupport].getResource("report.html") }
   def partitionedLoad = List(1, 2, 4, 8, 10)
   def highContention = 10
   def messageSizes = List(20,1024,1024*256)
 
-  // benchmark( all the combinations
-  for( ptp<- List(true,false) ; durable <- List(false) ; messageSize <- messageSizes ) {
+  // benchmark all the combinations
+  for( ptp <- List(true,false) ; durable <- List(false,true) ; messageSize <- messageSizes ) {
 
     def benchmark(name:String)(func: =>Unit) {
       test(name) {
@@ -45,7 +43,7 @@ abstract class BaseBrokerPerfSupport ext
       }
     }
 
-    val prefix = (if( ptp ) "queue " else "topic ") +(if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" )+" "
+    val prefix = (if( ptp ) "queue " else "topic ") + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " "
     val suffix = (if( durable ) " durable" else "")
 
     if( ptp && durable ) {
@@ -119,7 +117,7 @@ abstract class BaseBrokerPerfSupport ext
 
 //    /**
 //     *  benchmark(s 1 producers sending to 1 destination with 1 slow and 1 fast consumer.
-//     *
+      //     *
 //     * queue case: the producer should not slow down since it can dispatch to the
 //     *             fast consumer
 //     *

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala?rev=1002766&r1=1002765&r2=1002766&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala Wed Sep 29 17:34:07 2010
@@ -24,15 +24,54 @@ import java.net.URL
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract class BasePersistentBrokerPerfSupport  extends BaseBrokerPerfSupport {
+abstract class BasePersistentBrokerPerfSupport extends BaseBrokerPerfSupport {
 
   PERSISTENT = true
-  
-  override def reportResourceTemplate():URL = { classOf[BasePersistentBrokerPerfSupport].getResource("persistent-report.html") }
 
-  override def partitionedLoad = List(1, 10, 50, 100)
+  override def reportResourceTemplate():URL = { classOf[BasePersistentBrokerPerfSupport].getResource("persistent-report.html") }
 
   override def highContention = 100
 
+  for ( load <- partitionedLoad ; messageSize <- messageSizes ) {
+    val numMessages = 1000000 / load
+
+    val info = "queue " + numMessages + " " + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " with " + load + " "    
+
+    test("En" + info + "producer(s)") {
+      MAX_MESSAGES = numMessages
+      PTP = true
+      PURGE_STORE = true      
+      MESSAGE_SIZE = messageSize
+      producerCount = load;
+      destCount = 1;
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+        reportRates();
+      } finally {
+        stopServices();
+      }
+    }
+
+    test("De" + info + "consumer(s)") {
+      MAX_MESSAGES = numMessages
+      PTP = true
+      PURGE_STORE = false
+      MESSAGE_SIZE = messageSize
+      consumerCount = load;
+      destCount = 1;
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+        reportRates();
+      } finally {
+        stopServices();
+      }
+    }
+  }
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1002766&r1=1002765&r2=1002766&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Wed Sep 29 17:34:07 2010
@@ -40,6 +40,8 @@ abstract class BrokerPerfSupport extends
   var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
   var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
 
+  var MAX_MESSAGES = 0
+
   protected var TCP = true // Set to use tcp IO
 
   var USE_KAHA_DB = true
@@ -199,7 +201,7 @@ abstract class BrokerPerfSupport extends
     connector.protocol = getBrokerProtocolName
 
     val host = config.virtual_hosts.get(0)
-    host.purge_on_startup = true
+    host.purge_on_startup = PURGE_STORE
     config
   }
 
@@ -268,6 +270,7 @@ abstract class BrokerPerfSupport extends
     consumer.destination = destination
     consumer.name = "Consumer:" + (i + 1)
     consumer.rateAggregator = totalConsumerRate
+    consumer.maxMessages = MAX_MESSAGES    
     consumer.init
     
     return consumer
@@ -289,6 +292,7 @@ abstract class BrokerPerfSupport extends
     producer.messageIdGenerator = msgIdGenerator
     producer.rateAggregator = totalProducerRate
     producer.payloadSize = MESSAGE_SIZE
+    producer.maxMessages = MAX_MESSAGES
     producer.init
     producer
   }
@@ -336,7 +340,7 @@ abstract class BrokerPerfSupport extends
     }
     tracker.await
   }
-
+  
   def reportRates() = {
 
     println("Warming up...")
@@ -348,24 +352,27 @@ abstract class BrokerPerfSupport extends
 
     case class Summary(producer:java.lang.Float, pdev:java.lang.Float, consumer:java.lang.Float, cdev:java.lang.Float)
 
-    val sample_rates = new Array[Summary](PERFORMANCE_SAMPLES)
     var best = 0
 
-    for (i <- 0 until PERFORMANCE_SAMPLES) {
-      var p = new Period()
+    import scala.collection.mutable.ArrayBuffer
+
+    val sample_rates = new ArrayBuffer[Summary]()
+
+    def fillRateSummary(i: Int): Unit = {
+      val p = new Period()
       Thread.sleep(SAMPLE_PERIOD)
-      if( producerCount > 0 ) {
+      if (producerCount > 0) {
         println(totalProducerRate.getRateSummary(p))
       }
-      if( consumerCount > 0 ) {
+      if (consumerCount > 0) {
         println(totalConsumerRate.getRateSummary(p))
       }
 
-      sample_rates(i) = Summary(totalProducerRate.total(p), totalProducerRate.deviation, totalConsumerRate.total(p), totalConsumerRate.deviation)
+      sample_rates += Summary(totalProducerRate.total(p), totalProducerRate.deviation, totalConsumerRate.total(p), totalConsumerRate.deviation)
 
       val current_sum = sample_rates(i).producer.longValue + sample_rates(i).consumer.longValue
       val best_sum = sample_rates(best).producer.longValue + sample_rates(best).consumer.longValue
-      if( current_sum > best_sum ) {
+      if (current_sum > best_sum) {
         best = i
       }
 
@@ -373,6 +380,30 @@ abstract class BrokerPerfSupport extends
       totalConsumerRate.reset()
     }
 
+    // either we want to do x number of samples or sample over the course of x number of messages
+    if ( MAX_MESSAGES == 0 ) {
+      for (i <- 0 until PERFORMANCE_SAMPLES) {
+        fillRateSummary(i)
+      }
+    } else {
+      var clientsRunning = true
+      var i = 0
+      
+      while (clientsRunning) {
+        fillRateSummary(i)
+        i = i + 1
+        clientsRunning = false
+
+        def checkForRunningClients(connection: Connection) = {
+          if (connection.stopped == false) {
+            clientsRunning = true
+          }
+        }
+
+        producers.foreach(checkForRunningClients)
+        consumers.foreach(checkForRunningClients)
+      }
+    }
 
     if( producerCount > 0 ) {
       samples = samples ::: ( testName+" producer", sample_rates(best).producer ) :: Nil
@@ -386,8 +417,6 @@ abstract class BrokerPerfSupport extends
         samples = samples ::: ( testName+" consumer sd", sample_rates(best).cdev ) :: Nil
       }
     }
-
-
   }
 
 
@@ -402,6 +431,9 @@ abstract class RemoteConnection extends 
   var stopping:AtomicBoolean = null
   var destination: Destination = null
 
+  var messageCount = 0
+  var maxMessages = 0
+
   def init = {
     if( rate.getName == null ) {
       rate.name(name + " Rate")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1002766&r1=1002765&r2=1002766&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Sep 29 17:34:07 2010
@@ -20,6 +20,7 @@ import _root_.java.util.concurrent.TimeU
 import _root_.org.apache.activemq.apollo.broker._
 import _root_.org.apache.activemq.apollo.broker.perf._
 import _root_.org.apache.activemq.apollo.stomp._
+import _root_.org.apache.activemq.apollo.util._
 
 import _root_.org.fusesource.hawtbuf._
 import collection.mutable.{ListBuffer, HashMap}
@@ -30,6 +31,7 @@ import _root_.org.apache.activemq.apollo
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import java.io.File
 import org.apache.activemq.apollo.dto.{BrokerDTO, HawtDBStoreDTO}
+import org.apache.activemq.apollo.store.bdb.dto.BDBStoreDTO
 
 
 class StompBrokerPerfTest extends BaseBrokerPerfSupport {
@@ -57,7 +59,7 @@ class StompPersistentBrokerPerfTest exte
 }
 
 class StompHawtDBPersistentBrokerPerfTest extends BasePersistentBrokerPerfSupport {
-
+  
   override def description = "Using the STOMP protocol over TCP persisting to the HawtDB store."
 
   println(getClass.getClassLoader.getResource("log4j.properties"))
@@ -65,7 +67,7 @@ class StompHawtDBPersistentBrokerPerfTes
   override def createProducer() = new StompRemoteProducer()
 
   override def createConsumer() = new StompRemoteConsumer()
-
+ 
   override def getRemoteProtocolName() = "stomp"
 
   override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
@@ -78,11 +80,34 @@ class StompHawtDBPersistentBrokerPerfTes
     rc
   }
 
+}
+
+class StompBDBPersistentBrokerPerfTest extends BasePersistentBrokerPerfSupport {
+
+  override def description = "Using the STOMP protocol over TCP persisting to the BerkleyDB store."
+
+  println(getClass.getClassLoader.getResource("log4j.properties"))
+
+  override def createProducer() = new StompRemoteProducer()
+
+  override def createConsumer() = new StompRemoteConsumer()
+
+  override def getRemoteProtocolName() = "stomp"
+
+  override def createBrokerConfig(name: String, bindURI: String, connectUri: String): BrokerDTO = {
+    val rc = super.createBrokerConfig(name, bindURI, connectUri)
+
+    val store = new BDBStoreDTO
+    store.directory = new File(new File(testDataDir, getClass.getName), name)
+
+    rc.virtual_hosts.get(0).store = store
+    rc
+  }
 
 }
 
 
-class StompRemoteConsumer extends RemoteConsumer {
+class StompRemoteConsumer extends RemoteConsumer with Logging {
   var outboundSink: OverflowSink[StompFrame] = null
 
   def onConnected() = {
@@ -115,12 +140,20 @@ class StompRemoteConsumer extends Remote
     frame match {
       case StompFrame(Responses.CONNECTED, headers, _, _) =>
       case StompFrame(Responses.MESSAGE, headers, content, _) =>
-        messageReceived();
+        if (maxMessages > 0 && messageCount < maxMessages - 1) {
+          messageReceived();
 
-        // we client ack if persistent messages are being used.
-        if( persistent ) {
-          var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
-          outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+          // we client ack if persistent messages are being used.
+          if( persistent ) {
+            var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
+            outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+          }
+          messageCount = messageCount + 1
+          if ( messageCount % 10000 == 0 ) {
+            trace("Received message count : " + messageCount)
+          }
+        } else {
+          stop()
         }
 
       case StompFrame(Responses.ERROR, headers, content, _) =>
@@ -131,43 +164,51 @@ class StompRemoteConsumer extends Remote
   }
 
   protected def messageReceived() {
-    if (thinkTime > 0) {
-      transport.suspendRead
-      dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+      if (thinkTime > 0) {
+        transport.suspendRead
+        dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+          rate.increment();
+          if (!stopped) {
+            transport.resumeRead
+          }
+        })
+      } else {
         rate.increment();
-        if (!stopped) {
-          transport.resumeRead
-        }
-      })
-    } else {
-      rate.increment();
-    }
+      }
   }
 
 }
 
-class StompRemoteProducer extends RemoteProducer {
+class StompRemoteProducer extends RemoteProducer with Logging {
   var outboundSink: OverflowSink[StompFrame] = null
   var stompDestination: AsciiBuffer = null
   var frame:StompFrame = null
 
   def send_next: Unit = {
-    var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-    headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
-    if (property != null) {
-      headers ::= (ascii(property), ascii(property));
-    }
-    if( persistent ) {
-      headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+    if (maxMessages > 0 && messageCount < maxMessages) {
+      var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+      headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+      if (property != null) {
+        headers ::= (ascii(property), ascii(property));
+      }
+      if( persistent ) {
+        headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+      }
+      //    var p = this.priority;
+      //    if (priorityMod > 0) {
+      //        p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+      //    }
+
+      var content = ascii(createPayload());
+      frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
+      messageCount = messageCount + 1
+      if ( messageCount % 10000 == 0 ) {
+        trace("Sent message count : " + messageCount)
+      }
+      drain()
+    } else {
+      stop()
     }
-    //    var p = this.priority;
-    //    if (priorityMod > 0) {
-    //        p = if ((counter % priorityMod) == 0) { 0 } else { priority }
-    //    }
-
-    var content = ascii(createPayload());
-    frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
-    drain()
   }
 
   def drain() = {