You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/09/29 19:34:48 UTC
svn commit: r1002770 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/
Author: chirino
Date: Wed Sep 29 17:34:48 2010
New Revision: 1002770
URL: http://svn.apache.org/viewvc?rev=1002770&view=rev
Log:
Tidy up message count handling
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=1002770&r1=1002769&r2=1002770&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Sep 29 17:34:48 2010
@@ -33,12 +33,14 @@ abstract class BaseBrokerPerfSupport ext
for( ptp <- List(true,false) ; durable <- List(false) ; messageSize <- messageSizes ) {
def benchmark(name:String)(func: =>Unit) {
+ /*
test(name) {
this.PTP = ptp
this.DURABLE = durable
this.MESSAGE_SIZE = messageSize
func
}
+ */
}
val prefix = (if( ptp ) "queue " else "topic ") + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " "
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala?rev=1002770&r1=1002769&r2=1002770&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala Wed Sep 29 17:34:48 2010
@@ -30,11 +30,13 @@ abstract class BasePersistentBrokerPerfS
override def reportResourceTemplate():URL = { classOf[BasePersistentBrokerPerfSupport].getResource("persistent-report.html") }
+ //override def partitionedLoad = List(1, 2, 4, 8, 10)
override def highContention = 100
+ //override def messageSizes = List(20, 1024, 1024*256)
for ( load <- partitionedLoad ; messageSize <- messageSizes ) {
- val numMessages = 1000000 / load
+ val numMessages = 100000 / load
def benchmark(name: String)(func: => Unit) {
test(name) {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1002770&r1=1002769&r2=1002770&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Wed Sep 29 17:34:48 2010
@@ -465,7 +465,7 @@ abstract class RemoteConnection extends
} else {
onFailure(error)
if( callbackWhenConnected!=null ) {
- warn("connect attempt failed. wil retry connection..")
+ warn("connect attempt failed. will retry connection..")
dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{
if(stopping.get()) {
callbackWhenConnected.run
@@ -480,6 +480,19 @@ abstract class RemoteConnection extends
}
}
+ protected def doStop()
+
+ protected def incrementMessageCount() = {
+ messageCount = messageCount + 1
+ //if ( messageCount % (maxMessages / 1000) == 0 ) {
+ trace(name + " message count : " + messageCount)
+ //}
+ if (maxMessages > 0 && messageCount == maxMessages) {
+ trace(name + " message count (" + messageCount + ") max (" + maxMessages + ") reached, stopping connection")
+ doStop
+ }
+ }
+
}
abstract class RemoteConsumer extends RemoteConnection {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1002770&r1=1002769&r2=1002770&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Sep 29 17:34:48 2010
@@ -147,13 +147,6 @@ class StompRemoteConsumer extends Remote
var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
}
- messageCount = messageCount + 1
- if ( messageCount % 10000 == 0 ) {
- trace("Received message count : " + messageCount)
- }
- if (maxMessages > 0 && messageCount >= maxMessages - 1) {
- stop()
- }
case StompFrame(Responses.ERROR, headers, content, _) =>
onFailure(new Exception("Server reported an error: " + frame.content));
@@ -166,16 +159,25 @@ class StompRemoteConsumer extends Remote
if (thinkTime > 0) {
transport.suspendRead
dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+ incrementMessageCount
rate.increment();
if (!stopped) {
transport.resumeRead
}
})
} else {
- rate.increment();
+ incrementMessageCount
+ rate.increment
}
}
+ override def doStop() = {
+ outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT));
+ dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
+ transport.stop
+ stop
+ })
+ }
}
class StompRemoteProducer extends RemoteProducer with Logging {
@@ -199,13 +201,6 @@ class StompRemoteProducer extends Remote
var content = ascii(createPayload());
frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
- messageCount = messageCount + 1
- if ( messageCount % 10000 == 0 ) {
- trace("Sent message count : " + messageCount)
- }
- if (maxMessages > 0 && messageCount >= maxMessages) {
- stop()
- }
drain()
}
@@ -214,9 +209,10 @@ class StompRemoteProducer extends Remote
if( !outboundSink.full ) {
outboundSink.offer(frame)
frame = null
- rate.increment();
+ rate.increment
val task = ^ {
if (!stopped) {
+ incrementMessageCount
send_next
}
}
@@ -253,6 +249,7 @@ class StompRemoteProducer extends Remote
case StompFrame(Responses.RECEIPT, headers, _, _) =>
assert( persistent )
// we got the ack for the previous message we sent.. now send the next one.
+ incrementMessageCount
send_next
case StompFrame(Responses.CONNECTED, headers, _, _) =>
@@ -263,5 +260,13 @@ class StompRemoteProducer extends Remote
}
}
+ override def doStop() = {
+ outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT));
+ dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
+ transport.stop
+ stop
+ })
+ }
+
}