You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/09/29 19:35:09 UTC

svn commit: r1002772 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/

Author: chirino
Date: Wed Sep 29 17:35:08 2010
New Revision: 1002772

URL: http://svn.apache.org/viewvc?rev=1002772&view=rev
Log:
Added watchdog to test consumer so it dies off of it doesn't receive a message in 10 seconds, added assertions to persistence tests to check the number of messages sent/received.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=1002772&r1=1002771&r2=1002772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Sep 29 17:35:08 2010
@@ -33,14 +33,12 @@ abstract class BaseBrokerPerfSupport ext
   for( ptp <- List(true,false) ; durable <- List(false) ; messageSize <- messageSizes ) {
 
     def benchmark(name:String)(func: =>Unit) {
-      /*
       test(name) {
         this.PTP = ptp
         this.DURABLE = durable
         this.MESSAGE_SIZE = messageSize
         func
       }
-      */
     }
 
     val prefix = (if( ptp ) "queue " else "topic ") + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " "

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala?rev=1002772&r1=1002771&r2=1002772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala Wed Sep 29 17:35:08 2010
@@ -36,7 +36,8 @@ abstract class BasePersistentBrokerPerfS
 
   for ( load <- partitionedLoad ; messageSize <- messageSizes ) {
 
-    val numMessages = 100000 / load
+    val totalMessages = 100000
+    val numMessages = totalMessages / load
 
     def benchmark(name: String)(func: => Unit) {
       test(name) {
@@ -62,6 +63,7 @@ abstract class BasePersistentBrokerPerfS
       } finally {
         stopServices();
       }
+      this.assert(messagesSent == totalMessages, "Unexpected number of messages sent!")
     }
 
     benchmark("De" + info + "consumer(s)") {
@@ -76,6 +78,7 @@ abstract class BasePersistentBrokerPerfS
       } finally {
         stopServices();
       }
+      this.assert(messagesReceived == totalMessages, "Unexpected number of messages received!")
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1002772&r1=1002771&r2=1002772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Wed Sep 29 17:35:08 2010
@@ -340,6 +340,18 @@ abstract class BrokerPerfSupport extends
     }
     tracker.await
   }
+
+  def messagesSent() : Long = {
+    var sum = 0
+    producers.foreach((producer:RemoteConnection) => sum += producer.messageCount)
+    sum
+  }
+
+  def messagesReceived() : Long = {
+    var sum = 0
+    consumers.foreach((consumer:RemoteConnection) => sum += consumer.messageCount)
+    sum
+  }
   
   def reportRates() = {
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1002772&r1=1002771&r2=1002772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Sep 29 17:35:08 2010
@@ -110,6 +110,18 @@ class StompBDBPersistentBrokerPerfTest e
 class StompRemoteConsumer extends RemoteConsumer with Logging {
   var outboundSink: OverflowSink[StompFrame] = null
 
+  def watchdog(lastMessageCount: Int) : Unit = {
+    val seconds = 10
+    dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+          if (messageCount == lastMessageCount) {
+            warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
+            stop
+          } else {
+            watchdog(messageCount)
+          }
+        })
+  }
+
   def onConnected() = {
     outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
     outboundSink.refiller = ^{}
@@ -133,6 +145,7 @@ class StompRemoteConsumer extends Remote
 
     frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
     outboundSink.offer(frame);
+    watchdog(messageCount)
   }
 
   override def onTransportCommand(command: Object) = {