You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/09/29 19:35:09 UTC
svn commit: r1002772 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/
Author: chirino
Date: Wed Sep 29 17:35:08 2010
New Revision: 1002772
URL: http://svn.apache.org/viewvc?rev=1002772&view=rev
Log:
Added watchdog to test consumer so it dies off of it doesn't receive a message in 10 seconds, added assertions to persistence tests to check the number of messages sent/received.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=1002772&r1=1002771&r2=1002772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Sep 29 17:35:08 2010
@@ -33,14 +33,12 @@ abstract class BaseBrokerPerfSupport ext
for( ptp <- List(true,false) ; durable <- List(false) ; messageSize <- messageSizes ) {
def benchmark(name:String)(func: =>Unit) {
- /*
test(name) {
this.PTP = ptp
this.DURABLE = durable
this.MESSAGE_SIZE = messageSize
func
}
- */
}
val prefix = (if( ptp ) "queue " else "topic ") + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " "
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala?rev=1002772&r1=1002771&r2=1002772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala Wed Sep 29 17:35:08 2010
@@ -36,7 +36,8 @@ abstract class BasePersistentBrokerPerfS
for ( load <- partitionedLoad ; messageSize <- messageSizes ) {
- val numMessages = 100000 / load
+ val totalMessages = 100000
+ val numMessages = totalMessages / load
def benchmark(name: String)(func: => Unit) {
test(name) {
@@ -62,6 +63,7 @@ abstract class BasePersistentBrokerPerfS
} finally {
stopServices();
}
+ this.assert(messagesSent == totalMessages, "Unexpected number of messages sent!")
}
benchmark("De" + info + "consumer(s)") {
@@ -76,6 +78,7 @@ abstract class BasePersistentBrokerPerfS
} finally {
stopServices();
}
+ this.assert(messagesReceived == totalMessages, "Unexpected number of messages received!")
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1002772&r1=1002771&r2=1002772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Wed Sep 29 17:35:08 2010
@@ -340,6 +340,18 @@ abstract class BrokerPerfSupport extends
}
tracker.await
}
+
+ def messagesSent() : Long = {
+ var sum = 0
+ producers.foreach((producer:RemoteConnection) => sum += producer.messageCount)
+ sum
+ }
+
+ def messagesReceived() : Long = {
+ var sum = 0
+ consumers.foreach((consumer:RemoteConnection) => sum += consumer.messageCount)
+ sum
+ }
def reportRates() = {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1002772&r1=1002771&r2=1002772&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Sep 29 17:35:08 2010
@@ -110,6 +110,18 @@ class StompBDBPersistentBrokerPerfTest e
class StompRemoteConsumer extends RemoteConsumer with Logging {
var outboundSink: OverflowSink[StompFrame] = null
+ def watchdog(lastMessageCount: Int) : Unit = {
+ val seconds = 10
+ dispatchQueue.dispatchAfter(seconds, TimeUnit.SECONDS, ^ {
+ if (messageCount == lastMessageCount) {
+ warn("Messages have stopped arriving after " + seconds + "s, stopping consumer")
+ stop
+ } else {
+ watchdog(messageCount)
+ }
+ })
+ }
+
def onConnected() = {
outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
outboundSink.refiller = ^{}
@@ -133,6 +145,7 @@ class StompRemoteConsumer extends Remote
frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
outboundSink.offer(frame);
+ watchdog(messageCount)
}
override def onTransportCommand(command: Object) = {