You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/09/29 19:34:48 UTC

svn commit: r1002770 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/

Author: chirino
Date: Wed Sep 29 17:34:48 2010
New Revision: 1002770

URL: http://svn.apache.org/viewvc?rev=1002770&view=rev
Log:
Tidy up message count handling

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=1002770&r1=1002769&r2=1002770&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Sep 29 17:34:48 2010
@@ -33,12 +33,14 @@ abstract class BaseBrokerPerfSupport ext
   for( ptp <- List(true,false) ; durable <- List(false) ; messageSize <- messageSizes ) {
 
     def benchmark(name:String)(func: =>Unit) {
+      /*
       test(name) {
         this.PTP = ptp
         this.DURABLE = durable
         this.MESSAGE_SIZE = messageSize
         func
       }
+      */
     }
 
     val prefix = (if( ptp ) "queue " else "topic ") + (if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" ) + " "

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala?rev=1002770&r1=1002769&r2=1002770&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala Wed Sep 29 17:34:48 2010
@@ -30,11 +30,13 @@ abstract class BasePersistentBrokerPerfS
 
   override def reportResourceTemplate():URL = { classOf[BasePersistentBrokerPerfSupport].getResource("persistent-report.html") }
 
+  //override def partitionedLoad = List(1, 2, 4, 8, 10)
   override def highContention = 100
+  //override def messageSizes = List(20, 1024, 1024*256)
 
   for ( load <- partitionedLoad ; messageSize <- messageSizes ) {
 
-    val numMessages = 1000000 / load
+    val numMessages = 100000 / load
 
     def benchmark(name: String)(func: => Unit) {
       test(name) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=1002770&r1=1002769&r2=1002770&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Wed Sep 29 17:34:48 2010
@@ -465,7 +465,7 @@ abstract class RemoteConnection extends 
       } else {
         onFailure(error)
         if( callbackWhenConnected!=null ) {
-          warn("connect attempt failed. wil retry connection..")
+          warn("connect attempt failed. will retry connection..")
           dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{
             if(stopping.get()) {
               callbackWhenConnected.run
@@ -480,6 +480,19 @@ abstract class RemoteConnection extends 
     }
   }
 
+  protected def doStop()
+
+  protected def incrementMessageCount() = {
+    messageCount = messageCount + 1
+    //if ( messageCount % (maxMessages / 1000) == 0 ) {
+      trace(name + " message count : " + messageCount)
+    //}
+    if (maxMessages > 0 && messageCount  == maxMessages) {
+      trace(name + " message count (" + messageCount + ") max (" + maxMessages + ") reached, stopping connection")
+      doStop
+    }
+  }
+
 }
 
 abstract class RemoteConsumer extends RemoteConnection {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1002770&r1=1002769&r2=1002770&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Sep 29 17:34:48 2010
@@ -147,13 +147,6 @@ class StompRemoteConsumer extends Remote
             var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
             outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
           }
-          messageCount = messageCount + 1
-          if ( messageCount % 10000 == 0 ) {
-            trace("Received message count : " + messageCount)
-          }
-          if (maxMessages > 0 && messageCount >= maxMessages - 1) {
-            stop()
-          }
 
       case StompFrame(Responses.ERROR, headers, content, _) =>
         onFailure(new Exception("Server reported an error: " + frame.content));
@@ -166,16 +159,25 @@ class StompRemoteConsumer extends Remote
       if (thinkTime > 0) {
         transport.suspendRead
         dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+          incrementMessageCount          
           rate.increment();
           if (!stopped) {
             transport.resumeRead
           }
         })
       } else {
-        rate.increment();
+        incrementMessageCount
+        rate.increment
       }
   }
 
+  override def doStop() = {
+    outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT));
+    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
+        transport.stop
+        stop
+      })
+  }
 }
 
 class StompRemoteProducer extends RemoteProducer with Logging {
@@ -199,13 +201,6 @@ class StompRemoteProducer extends Remote
 
       var content = ascii(createPayload());
       frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
-      messageCount = messageCount + 1
-      if ( messageCount % 10000 == 0 ) {
-        trace("Sent message count : " + messageCount)
-      }
-      if (maxMessages > 0 && messageCount >= maxMessages) {
-        stop()
-      }    
       drain()
   }
 
@@ -214,9 +209,10 @@ class StompRemoteProducer extends Remote
       if( !outboundSink.full ) {
         outboundSink.offer(frame)
         frame = null
-        rate.increment();
+        rate.increment
         val task = ^ {
           if (!stopped) {
+            incrementMessageCount
             send_next
           }
         }
@@ -253,6 +249,7 @@ class StompRemoteProducer extends Remote
       case StompFrame(Responses.RECEIPT, headers, _, _) =>
         assert( persistent )
         // we got the ack for the previous message we sent.. now send the next one.
+        incrementMessageCount
         send_next
 
       case StompFrame(Responses.CONNECTED, headers, _, _) =>
@@ -263,5 +260,13 @@ class StompRemoteProducer extends Remote
     }
   }
 
+  override def doStop() = {
+    outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT));
+    dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
+        transport.stop
+        stop
+      })
+  }  
+
 }