You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:09:37 UTC

svn commit: r961139 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ activemq-stomp/src/test/scala/org/apache...

Author: chirino
Date: Wed Jul  7 04:09:36 2010
New Revision: 961139

URL: http://svn.apache.org/viewvc?rev=961139&view=rev
Log:
refactoring the perf test base to be able to make it more extendable.  Added a new vaiation to test the perf of persistent messages.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/persistent-report.html
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/persistent-report.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/persistent-report.html?rev=961139&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/persistent-report.html (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/persistent-report.html Wed Jul  7 04:09:36 2010
@@ -0,0 +1,475 @@
+<html>
+  <head>
+    <style type='text/css'>
+      body { font-family:Verdana; font-size:12px; color:#666666; }
+      #header {
+        margin: 0; padding: 2em; text-align:center; background: #baccd8;
+      }
+      #header h1 {
+        margin: 0; padding: 0; font-size: 180%; line-height: 1em; color: #333;
+      }
+      #wrap {
+        width: 60em; margin: 0 auto; padding: 2em;
+        background: #dae3e9;
+      }
+      #content {
+        width: 60em;
+        padding: 2em 0;
+      }
+      .chart-graph {
+        float:right; width: 66%; height: 20em;  margin: 1em 0 1em 1em;
+      }
+      .med-chart {
+        height: 30em;
+      }
+      .big-chart {
+        height: 40em;
+      }
+      .chart-section { clear:both; margin-top: 1em; padding-left:2em; }
+      .clear { clear:both; }
+    </style>
+    <script type='text/javascript' src='http://www.google.com/jsapi'></script>
+    <script type='text/javascript'>
+      var cols_index_map = {}
+      var data = null;
+      var data_funcs = null;
+      google.load('visualization', '1', {'packages':['linechart']});
+      google.setOnLoadCallback(function() {
+        data = new google.visualization.DataTable();
+        data.addColumn('string', 'version');
+          data.addColumn('number', 'queue 20b 1->1->1 producer');
+          data.addColumn('number', 'queue 20b 1->1->1 consumer');
+          data.addColumn('number', 'queue 20b 10->10->10 producer');
+          data.addColumn('number', 'queue 20b 10->10->10 producer sd');
+          data.addColumn('number', 'queue 20b 10->10->10 consumer');
+          data.addColumn('number', 'queue 20b 10->10->10 consumer sd');
+          data.addColumn('number', 'queue 20b 100->100->100 producer');
+          data.addColumn('number', 'queue 20b 100->100->100 producer sd');
+          data.addColumn('number', 'queue 20b 100->100->100 consumer');
+          data.addColumn('number', 'queue 20b 100->100->100 consumer sd');
+          data.addColumn('number', 'queue 20b 1000->1000->1000 producer');
+          data.addColumn('number', 'queue 20b 1000->1000->1000 producer sd');
+          data.addColumn('number', 'queue 20b 1000->1000->1000 consumer');
+          data.addColumn('number', 'queue 20b 1000->1000->1000 consumer sd');
+          data.addColumn('number', 'queue 20b 100->100->100 producer');
+          data.addColumn('number', 'queue 20b 100->100->100 producer sd');
+          data.addColumn('number', 'queue 20b 100->100->100 consumer');
+          data.addColumn('number', 'queue 20b 100->100->100 consumer sd');
+          data.addColumn('number', 'queue 20b 100->1->1 producer');
+          data.addColumn('number', 'queue 20b 100->1->1 producer sd');
+          data.addColumn('number', 'queue 20b 100->1->1 consumer');
+          data.addColumn('number', 'queue 20b 1->1->100 producer');
+          data.addColumn('number', 'queue 20b 1->1->100 consumer');
+          data.addColumn('number', 'queue 20b 1->1->100 consumer sd');
+          data.addColumn('number', 'queue 20b 100->1->100 producer');
+          data.addColumn('number', 'queue 20b 100->1->100 producer sd');
+          data.addColumn('number', 'queue 20b 100->1->100 consumer');
+          data.addColumn('number', 'queue 20b 100->1->100 consumer sd');
+          data.addColumn('number', 'queue 1k 1->1->1 producer');
+          data.addColumn('number', 'queue 1k 1->1->1 consumer');
+          data.addColumn('number', 'queue 1k 10->10->10 producer');
+          data.addColumn('number', 'queue 1k 10->10->10 producer sd');
+          data.addColumn('number', 'queue 1k 10->10->10 consumer');
+          data.addColumn('number', 'queue 1k 10->10->10 consumer sd');
+          data.addColumn('number', 'queue 1k 100->100->100 producer');
+          data.addColumn('number', 'queue 1k 100->100->100 producer sd');
+          data.addColumn('number', 'queue 1k 100->100->100 consumer');
+          data.addColumn('number', 'queue 1k 100->100->100 consumer sd');
+          data.addColumn('number', 'queue 1k 1000->1000->1000 producer');
+          data.addColumn('number', 'queue 1k 1000->1000->1000 producer sd');
+          data.addColumn('number', 'queue 1k 1000->1000->1000 consumer');
+          data.addColumn('number', 'queue 1k 1000->1000->1000 consumer sd');
+          data.addColumn('number', 'queue 1k 100->100->100 producer');
+          data.addColumn('number', 'queue 1k 100->100->100 producer sd');
+          data.addColumn('number', 'queue 1k 100->100->100 consumer');
+          data.addColumn('number', 'queue 1k 100->100->100 consumer sd');
+          data.addColumn('number', 'queue 1k 100->1->1 producer');
+          data.addColumn('number', 'queue 1k 100->1->1 producer sd');
+          data.addColumn('number', 'queue 1k 100->1->1 consumer');
+          data.addColumn('number', 'queue 1k 1->1->100 producer');
+          data.addColumn('number', 'queue 1k 1->1->100 consumer');
+          data.addColumn('number', 'queue 1k 1->1->100 consumer sd');
+          data.addColumn('number', 'queue 1k 100->1->100 producer');
+          data.addColumn('number', 'queue 1k 100->1->100 producer sd');
+          data.addColumn('number', 'queue 1k 100->1->100 consumer');
+          data.addColumn('number', 'queue 1k 100->1->100 consumer sd');
+          data.addColumn('number', 'queue 256k 1->1->1 producer');
+          data.addColumn('number', 'queue 256k 1->1->1 consumer');
+          data.addColumn('number', 'queue 256k 10->10->10 producer');
+          data.addColumn('number', 'queue 256k 10->10->10 producer sd');
+          data.addColumn('number', 'queue 256k 10->10->10 consumer');
+          data.addColumn('number', 'queue 256k 10->10->10 consumer sd');
+          data.addColumn('number', 'queue 256k 100->100->100 producer');
+          data.addColumn('number', 'queue 256k 100->100->100 producer sd');
+          data.addColumn('number', 'queue 256k 100->100->100 consumer');
+          data.addColumn('number', 'queue 256k 100->100->100 consumer sd');
+          data.addColumn('number', 'queue 256k 1000->1000->1000 producer');
+          data.addColumn('number', 'queue 256k 1000->1000->1000 producer sd');
+          data.addColumn('number', 'queue 256k 1000->1000->1000 consumer');
+          data.addColumn('number', 'queue 256k 1000->1000->1000 consumer sd');
+          data.addColumn('number', 'queue 256k 100->100->100 producer');
+          data.addColumn('number', 'queue 256k 100->100->100 producer sd');
+          data.addColumn('number', 'queue 256k 100->100->100 consumer');
+          data.addColumn('number', 'queue 256k 100->100->100 consumer sd');
+          data.addColumn('number', 'queue 256k 100->1->1 producer');
+          data.addColumn('number', 'queue 256k 100->1->1 producer sd');
+          data.addColumn('number', 'queue 256k 100->1->1 consumer');
+          data.addColumn('number', 'queue 256k 1->1->100 producer');
+          data.addColumn('number', 'queue 256k 1->1->100 consumer');
+          data.addColumn('number', 'queue 256k 1->1->100 consumer sd');
+          data.addColumn('number', 'queue 256k 100->1->100 producer');
+          data.addColumn('number', 'queue 256k 100->1->100 producer sd');
+          data.addColumn('number', 'queue 256k 100->1->100 consumer');
+          data.addColumn('number', 'queue 256k 100->1->100 consumer sd');
+          data.addColumn('number', 'topic 20b 1->1->0 producer');
+          data.addColumn('number', 'topic 20b 1->1->1 producer');
+          data.addColumn('number', 'topic 20b 1->1->1 consumer');
+          data.addColumn('number', 'topic 20b 10->10->10 producer');
+          data.addColumn('number', 'topic 20b 10->10->10 producer sd');
+          data.addColumn('number', 'topic 20b 10->10->10 consumer');
+          data.addColumn('number', 'topic 20b 10->10->10 consumer sd');
+          data.addColumn('number', 'topic 20b 100->100->100 producer');
+          data.addColumn('number', 'topic 20b 100->100->100 producer sd');
+          data.addColumn('number', 'topic 20b 100->100->100 consumer');
+          data.addColumn('number', 'topic 20b 100->100->100 consumer sd');
+          data.addColumn('number', 'topic 20b 1000->1000->1000 producer');
+          data.addColumn('number', 'topic 20b 1000->1000->1000 producer sd');
+          data.addColumn('number', 'topic 20b 1000->1000->1000 consumer');
+          data.addColumn('number', 'topic 20b 1000->1000->1000 consumer sd');
+          data.addColumn('number', 'topic 20b 100->100->100 producer');
+          data.addColumn('number', 'topic 20b 100->100->100 producer sd');
+          data.addColumn('number', 'topic 20b 100->100->100 consumer');
+          data.addColumn('number', 'topic 20b 100->100->100 consumer sd');
+          data.addColumn('number', 'topic 20b 100->1->1 producer');
+          data.addColumn('number', 'topic 20b 100->1->1 producer sd');
+          data.addColumn('number', 'topic 20b 100->1->1 consumer');
+          data.addColumn('number', 'topic 20b 1->1->100 producer');
+          data.addColumn('number', 'topic 20b 1->1->100 consumer');
+          data.addColumn('number', 'topic 20b 1->1->100 consumer sd');
+          data.addColumn('number', 'topic 20b 100->1->100 producer');
+          data.addColumn('number', 'topic 20b 100->1->100 producer sd');
+          data.addColumn('number', 'topic 20b 100->1->100 consumer');
+          data.addColumn('number', 'topic 20b 100->1->100 consumer sd');
+          data.addColumn('number', 'topic 1k 1->1->0 producer');
+          data.addColumn('number', 'topic 1k 1->1->1 producer');
+          data.addColumn('number', 'topic 1k 1->1->1 consumer');
+          data.addColumn('number', 'topic 1k 10->10->10 producer');
+          data.addColumn('number', 'topic 1k 10->10->10 producer sd');
+          data.addColumn('number', 'topic 1k 10->10->10 consumer');
+          data.addColumn('number', 'topic 1k 10->10->10 consumer sd');
+          data.addColumn('number', 'topic 1k 100->100->100 producer');
+          data.addColumn('number', 'topic 1k 100->100->100 producer sd');
+          data.addColumn('number', 'topic 1k 100->100->100 consumer');
+          data.addColumn('number', 'topic 1k 100->100->100 consumer sd');
+          data.addColumn('number', 'topic 1k 1000->1000->1000 producer');
+          data.addColumn('number', 'topic 1k 1000->1000->1000 producer sd');
+          data.addColumn('number', 'topic 1k 1000->1000->1000 consumer');
+          data.addColumn('number', 'topic 1k 1000->1000->1000 consumer sd');
+          data.addColumn('number', 'topic 1k 100->100->100 producer');
+          data.addColumn('number', 'topic 1k 100->100->100 producer sd');
+          data.addColumn('number', 'topic 1k 100->100->100 consumer');
+          data.addColumn('number', 'topic 1k 100->100->100 consumer sd');
+          data.addColumn('number', 'topic 1k 100->1->1 producer');
+          data.addColumn('number', 'topic 1k 100->1->1 producer sd');
+          data.addColumn('number', 'topic 1k 100->1->1 consumer');
+          data.addColumn('number', 'topic 1k 1->1->100 producer');
+          data.addColumn('number', 'topic 1k 1->1->100 consumer');
+          data.addColumn('number', 'topic 1k 1->1->100 consumer sd');
+          data.addColumn('number', 'topic 1k 100->1->100 producer');
+          data.addColumn('number', 'topic 1k 100->1->100 producer sd');
+          data.addColumn('number', 'topic 1k 100->1->100 consumer');
+          data.addColumn('number', 'topic 1k 100->1->100 consumer sd');
+          data.addColumn('number', 'topic 256k 1->1->0 producer');
+          data.addColumn('number', 'topic 256k 1->1->1 producer');
+          data.addColumn('number', 'topic 256k 1->1->1 consumer');
+          data.addColumn('number', 'topic 256k 10->10->10 producer');
+          data.addColumn('number', 'topic 256k 10->10->10 producer sd');
+          data.addColumn('number', 'topic 256k 10->10->10 consumer');
+          data.addColumn('number', 'topic 256k 10->10->10 consumer sd');
+          data.addColumn('number', 'topic 256k 100->100->100 producer');
+          data.addColumn('number', 'topic 256k 100->100->100 producer sd');
+          data.addColumn('number', 'topic 256k 100->100->100 consumer');
+          data.addColumn('number', 'topic 256k 100->100->100 consumer sd');
+          data.addColumn('number', 'topic 256k 1000->1000->1000 producer');
+          data.addColumn('number', 'topic 256k 1000->1000->1000 producer sd');
+          data.addColumn('number', 'topic 256k 1000->1000->1000 consumer');
+          data.addColumn('number', 'topic 256k 1000->1000->1000 consumer sd');
+          data.addColumn('number', 'topic 256k 100->100->100 producer');
+          data.addColumn('number', 'topic 256k 100->100->100 producer sd');
+          data.addColumn('number', 'topic 256k 100->100->100 consumer');
+          data.addColumn('number', 'topic 256k 100->100->100 consumer sd');
+          data.addColumn('number', 'topic 256k 100->1->1 producer');
+          data.addColumn('number', 'topic 256k 100->1->1 producer sd');
+          data.addColumn('number', 'topic 256k 100->1->1 consumer');
+          data.addColumn('number', 'topic 256k 1->1->100 producer');
+          data.addColumn('number', 'topic 256k 1->1->100 consumer');
+          data.addColumn('number', 'topic 256k 1->1->100 consumer sd');
+          data.addColumn('number', 'topic 256k 100->1->100 producer');
+          data.addColumn('number', 'topic 256k 100->1->100 producer sd');
+          data.addColumn('number', 'topic 256k 100->1->100 consumer');
+          data.addColumn('number', 'topic 256k 100->1->100 consumer sd');
+
+        data_funcs = new Array(data.getNumberOfColumns());
+        for( var i=0; i <  data.getNumberOfColumns(); i ++) {
+          cols_index_map[data.getColumnLabel(i)] = i;
+          eval("function lookup(dt, row) { return dt.getValue(row, "+i+"); }; data_funcs[i]=lookup;");
+        }
+
+        var data_array = [
+// DATA-START
+// DATA-END
+        ];
+        try {
+          data.addRows(data_array);
+        } catch (er) {
+          alert(er);
+        }
+
+      });
+      // Helpers
+      function chart(id, cols, options) {
+        var c = new google.visualization.LineChart(document.getElementById(id));
+        var view = new google.visualization.DataView(data);
+        var col_indexes = new Array(cols.length);
+        for (var i = 0; i < cols.length; i++) {
+          if( typeof(cols[i]) == "string" ) {
+            col_indexes[i] = cols_index_map[cols[i]];
+          } else {
+            var col = cols_index_map[cols[i][0]];
+            if( col> 0 ) {
+              col_indexes[i] = {type:'number', label:cols[i][1], calc:data_funcs[col]};
+            } else {
+              col_indexes[i] = col;
+            }
+          }
+        }
+        view.setColumns(col_indexes);
+        c.draw(view, options);
+      }
+      function chart_opts() {
+        return {tooltipWidth:300, tooltipHeight:120, showCategories:false, legend: 'bottom', smoothLine:true, titleX:'changes over time', titleY:'messages/sec', enableTooltip:true }
+      }
+    </script>
+  </head>
+  <body>
+    <div id='wrap'>
+      <div id='header'>
+        <h1>Broker Performance Evolution</h1>
+      </div>
+
+      <div id='content' >
+        <p>
+          This report is used to visualize the how the performance of the broker
+          is changed over time as new code changes are introduced. Click on any data
+          point in the charts to get the data point's exact value and git
+          commit version.
+        </p>
+
+        <p>
+          Producers are sending persistent messages and synchronously wait for a
+          broker ack before sending the next message.  Consumers client ack
+          each message.  
+        </p>
+
+        <h2>General</h2>
+
+        <div class='chart-section'>
+          <div id='raw_producer_rate' class='chart-graph'></div>
+          <script type='text/javascript'>
+            google.setOnLoadCallback(function() {
+              var opts = chart_opts();
+              opts.logScale = true;
+              chart('raw_producer_rate',
+                ['version',
+                  ['topic 20b 1->1->0 producer', '20 bytes'],
+                  ['topic 1k 1->1->0 producer', '1 k'],
+                  ['topic 256k 1->1->0 producer', '256 k']
+                ],
+                opts
+              );
+            });
+          </script>
+          <h3>Max Producer Rate</h3>
+          The maximum rate that a single producer can send to a broker.  The producer is setup
+          on a topic with no consumers.  This rate is has a close correlation to your processor
+          speed.  The Rate is show at 3 different message sizes.
+        </div>
+
+        <div class="clear"></div>
+        <h2>Queue Performance</h2>
+        <p>
+          This section examines the performance of queues also known as the point to point messaging domain. 
+        </p>
+
+        <div class='chart-section'>
+          <div id='queue_partioned_scaling' class='chart-graph med-chart'></div>
+          <script type='text/javascript'>
+            google.setOnLoadCallback(function() {
+              chart('queue_partioned_scaling',
+                ['version',
+                  ['queue 20b 1->1->1 consumer','1->1->1'],
+                  ['queue 20b 10->10->10 consumer', '10->10->10'],
+                  ['queue 20b 50->50->50 consumer', '50->50->50'],
+                  ['queue 20b 100->100->100 consumer', '100->100->100']
+                ],
+                chart_opts()
+              );
+            });
+          </script>
+          <h3>Partitioned Scaling</h3>
+          Compares how well the broker scales as partitioned load is increased on it.  Each destination
+          has only one producer and one consumer attached using small 20 byte messages.  This should
+          scale up on machines with many processing cores.  Keep in mind that the load test is running the
+          load clients and the broker on one machines so about 1/10 the cpu resources are being used by
+          the load clients. 
+        </div>
+
+        <div class='chart-section'>
+          <div id='queue_contention' class='chart-graph big-chart'></div>
+          <script type='text/javascript'>
+            google.setOnLoadCallback(function() {
+              chart('queue_contention',
+                ['version',
+                  ['queue 20b 100->1->1 consumer',  '20 byte 100->1->1'],
+                  ['queue 20b 1->1->100 consumer',  '20 byte 1->1->100'],
+                  ['queue 20b 100->1->100 consumer', '20 byte 100->1->100'],
+                  ['queue 1k 100->1->1 consumer',   '1k byte 100->1->1'],
+                  ['queue 1k 1->1->100 consumer',   '1k byte 1->1->100'],
+                  ['queue 1k 100->1->100 consumer',  '1k byte 100->1->100'],
+                  ['queue 256k 100->1->1 consumer',   '256k byte 100->1->1'],
+                  ['queue 256k 1->1->100 consumer',   '256k byte 1->1->100'],
+                  ['queue 256k 100->1->100 consumer',  '256k byte 100->1->100']
+                ],
+                chart_opts()
+              );
+            });
+          </script>
+          <h3>Destination Contention</h3>
+          When there are many consumer or/and producers on one destination, it can become a bottleneck.  Shown data points are the total consumer
+          rate for scenario.
+        </div>
+
+
+        <div class='chart-section'>
+          <div id='queue_contention_sd' class='chart-graph big-chart'></div>
+          <script type='text/javascript'>
+            google.setOnLoadCallback(function() {
+              chart('queue_contention_sd',
+                ['version',
+                  ['queue 20b 100->1->1 producer sd',  '20 byte 100->1->1'],
+                  ['queue 20b 1->1->100 consumer sd',  '20 byte 1->1->100'],
+                  ['queue 20b 100->1->100 producer sd', '20 byte 100->1->100 p'],
+                  ['queue 20b 100->1->100 consumer sd', '20 byte 100->1->100 c'],
+                  ['queue 1k 100->1->1 producer sd',   '1k byte 100->1->1'],
+                  ['queue 1k 1->1->100 consumer sd',   '1k byte 1->1->100'],
+                  ['queue 1k 100->1->100 producer sd',  '1k byte 100->1->100 p'],
+                  ['queue 1k 100->1->100 consumer sd',  '1k byte 100->1->100 c'],
+                  ['queue 256k 100->1->1 producer sd',   '256k byte 100->1->1'],
+                  ['queue 256k 1->1->100 consumer sd',   '256k byte 1->1->100'],
+                  ['queue 256k 100->1->100 producer sd',  '256k byte 100->1->100 p'],
+                  ['queue 256k 100->1->100 consumer sd',  '256k byte 100->1->100 c']
+                ],
+                chart_opts()
+              );
+            });
+          </script>
+          <h3>Fairness</h3>
+          When you have multiple homogenous consumers or producers, it's ideal if the the broker treats them all fairly
+          and sends or accepts the same number of messages from them.  This chart shows the standard deviation of the number
+          of messages produced or consumed.  The lower the number the better.
+        </div>
+
+
+        <div class="clear"></div>
+        <h2>Topic Performance</h2>
+        <p>
+          This section examines the performance of topics also known as the publish/subscribe messaging domain.
+        </p>
+
+        <div class='chart-section'>
+          <div id='topic_partioned_scaling' class='chart-graph med-chart'></div>
+          <script type='text/javascript'>
+            google.setOnLoadCallback(function() {
+              chart('topic_partioned_scaling',
+                ['version',
+                  ['topic 20b 1->1->1 consumer','1->1->1'],
+                  ['topic 20b 10->10->10 consumer', '10->10->10'],
+                  ['topic 20b 100->100->100 consumer', '100->100->100'],
+                  ['topic 20b 1000->1000->1000 consumer', '1000->1000->1000'],
+                  ['topic 20b 100->100->100 consumer', '100->100->100']
+                ],
+                chart_opts()
+              );
+            });
+          </script>
+          <h3>Partitioned Scaling</h3>
+          Compares how well the broker scales as partitioned load is increased on it.  Each destination
+          has only one producer and one consumer attached using small 20 byte messages.  This should
+          scale up on machines with many processing cores.  Keep in mind that the load test is running the
+          load clients and the broker on one machines so about 1/10 the cpu resources are being used by
+          the load clients.
+        </div>
+
+        <div class='chart-section'>
+          <div id='topic_contention' class='chart-graph big-chart'></div>
+          <script type='text/javascript'>
+            google.setOnLoadCallback(function() {
+              chart('topic_contention',
+                ['version',
+                  ['topic 20b 100->1->1 consumer',  '20 byte 100->1->1'],
+                  ['topic 20b 1->1->100 consumer',  '20 byte 1->1->100'],
+                  ['topic 20b 100->1->100 consumer', '20 byte 100->1->100'],
+                  ['topic 1k 100->1->1 consumer',   '1k byte 100->1->1'],
+                  ['topic 1k 1->1->100 consumer',   '1k byte 1->1->100'],
+                  ['topic 1k 100->1->100 consumer',  '1k byte 100->1->100'],
+                  ['topic 256k 100->1->1 consumer',   '256k byte 100->1->1'],
+                  ['topic 256k 1->1->100 consumer',   '256k byte 1->1->100'],
+                  ['topic 256k 100->1->100 consumer',  '256k byte 100->1->100']
+                ],
+                chart_opts()
+              );
+            });
+          </script>
+          <h3>Destination Contention</h3>
+          When there are many consumer or/and producers on one destination, it can become a bottleneck.  Shown data points are the total consumer
+          rate for scenario.  When looking at the numbers, keep in mind that topics replicate/broadcast every messages to every consumer so there is much
+          much higher message load when you have many consumers attached.
+        </div>
+
+
+        <div class='chart-section'>
+          <div id='topic_contention_sd' class='chart-graph big-chart'></div>
+          <script type='text/javascript'>
+            google.setOnLoadCallback(function() {
+              chart('topic_contention_sd',
+                ['version',
+                  ['topic 20b 100->1->1 producer sd',  '20 byte 100->1->1'],
+                  ['topic 20b 1->1->100 consumer sd',  '20 byte 1->1->100'],
+                  ['topic 20b 100->1->100 producer sd', '20 byte 100->1->100 p'],
+                  ['topic 20b 100->1->100 consumer sd', '20 byte 100->1->100 c'],
+                  ['topic 1k 100->1->1 producer sd',   '1k byte 100->1->1'],
+                  ['topic 1k 1->1->100 consumer sd',   '1k byte 1->1->100'],
+                  ['topic 1k 100->1->100 producer sd',  '1k byte 100->1->100 p'],
+                  ['topic 1k 100->1->100 consumer sd',  '1k byte 100->1->100 c'],
+                  ['topic 256k 100->1->1 producer sd',   '256k byte 100->1->1'],
+                  ['topic 256k 1->1->100 consumer sd',   '256k byte 1->1->100'],
+                  ['topic 256k 100->1->100 producer sd',  '256k byte 100->1->100 p'],
+                  ['topic 256k 100->1->100 consumer sd',  '256k byte 100->1->100 c']
+                ],
+                chart_opts()
+              );
+            });
+          </script>
+          <h3>Fairness</h3>
+          When you have multiple homogenous consumers or producers, it's ideal if the the broker treats them all fairly
+          and sends or accepts the same number of messages from them.  This chart shows the standard deviation of the number
+          of messages produced or consumed.  The lower the number the better.
+        </div>
+
+
+        <div class="clear"></div>
+      </div>
+    </div>
+  </body>
+</html>
+

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html?rev=961139&r1=961138&r2=961139&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/resources/org/apache/activemq/apollo/broker/perf/report.html Wed Jul  7 04:09:36 2010
@@ -217,9 +217,6 @@
 
         var data_array = [
 // DATA-START
-          ['commit 5debd92cefd2bcf937e47c9ee976b2af32945f60', 59065.00, 59441.66, 70182.34, 3887.50, 72572.33, 757.50, 62200.00, 538.67, 62480.00, 220.00, 70112.23, 7379.19, 70281.72, 7189.64, 72965.00, 8394.70, 72088.66, 8294.84, 70078.66, 870.80, 69376.33, 39911.67, 39731.00, 19.24, 56291.00, 5925.43, 57305.00, 2639.61, 69184.27, 69201.94, 66716.00, 2196.50, 66701.66, 2241.50, 79044.33, 14573.06, 79044.66, 14559.77, 78756.33, 7028.45, 78688.66, 7031.83, 76033.00, 6864.67, 76108.33, 6859.86, 79458.00, 377.52, 79477.34, 29308.00, 29312.00, 1.83, 74880.66, 9897.01, 74813.66, 212.97, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 979118.63, 98587.00, 98292.67, 127095.34, 622.00, 127084.00, 780.00, 68731.00, 32725.55, 68745.66, 32358.16, 108295.57, 11656.21, 108666.77, 11415.79, 84809.00, 11805.28, 83921.34, 11965.85, 104807.00, 20218.56, 103406.66, 11184.82,
  108604.05, 3.60, 19379.21, 835.08, 192433.52, 1081.44, 117028.67, 64440.00, 64433.00, 78744.00, 9146.00, 78777.00, 9213.50, 81101.34, 18720.47, 81130.00, 18700.15, 78234.59, 11514.27, 78224.92, 11519.75, 65135.91, 5318.66, 72323.78, 5041.96, 92432.19, 3435.90, 92447.85, 13498.33, 121418.66, 10.76, 532.67, 35.82, 5495.00, 160.48, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00],
-          ['commit 5debd92cefd2bcf937e47c9ee976b2af32945f60', 61267.00, 59611.00, 60624.79, 6686.50, 60543.82, 214.00, 70055.31, 9330.00, 69929.36, 9344.52, 71298.90, 9368.79, 71207.26, 9553.43, 79910.66, 6680.80, 80626.66, 6388.69, 70533.47, 99.60, 69225.44, 34210.00, 34220.33, 0.30, 72685.66, 8502.73, 71019.00, 4675.33, 57079.00, 57064.33, 72505.34, 15955.00, 72531.00, 15878.50, 97753.34, 7321.49, 97777.66, 7325.72, 89062.61, 12391.65, 89098.57, 12395.12, 86131.00, 10021.77, 86075.00, 10016.81, 73761.66, 384.01, 73744.00, 32560.67, 32565.33, 1.62, 86183.00, 9438.27, 86277.66, 151.25, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 930666.81, 93507.33, 93466.34, 76817.00, 37786.50, 76706.00, 38061.00, 104668.45, 22475.53, 104771.74, 22266.80, 66243.34, 8777.47, 72715.34, 8319.42, 93526.66, 11530.64, 93987.00, 11725.14, 84073.66, 11149.64, 82883.34, 13580.3
 3, 135017.33, 2.40, 20932.67, 793.95, 201999.00, 1980.45, 115204.59, 65177.66, 65151.00, 89512.16, 2750.50, 89508.83, 2744.00, 94051.65, 10450.63, 94048.32, 10463.70, 95777.08, 10451.45, 95738.42, 10453.78, 101276.67, 2741.30, 101311.00, 2748.79, 105001.34, 12699.42, 105032.66, 12325.00, 123264.00, 5.86, 28614.92, 885.62, 286094.25, 96.55, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00],
-          ['commit 5debd92cefd2bcf937e47c9ee976b2af32945f60', 59608.33, 59614.34, 66346.66, 5287.00, 64701.33, 218.00, 79616.00, 34577.69, 79193.34, 34549.07, 67212.45, 6835.70, 67838.83, 6825.10, 85929.71, 6246.85, 84848.44, 6088.69, 83057.61, 477.77, 83104.23, 54303.23, 54067.31, 0.49, 59712.00, 6983.59, 59913.66, 742.49, 58908.04, 58921.03, 62854.67, 187.00, 62854.34, 192.50, 81215.92, 6981.34, 81187.27, 6963.82, 95586.66, 6235.62, 95523.00, 6228.51, 98756.00, 15490.63, 98744.66, 15490.48, 91450.01, 2210.74, 91473.00, 32481.00, 32480.00, 0.87, 82071.31, 5277.88, 82031.32, 374.24, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 931205.06, 98172.33, 98509.34, 77836.33, 37457.00, 76933.34, 37722.00, 92677.99, 30757.18, 92988.00, 30728.10, 118740.67, 26473.27, 119305.66, 26914.20, 97088.97, 15287.81, 104049.99, 15320.30, 131258.16, 9485.30, 131338.78, 9845.0
 5, 102355.88, 0.00, 20318.67, 1033.34, 205720.00, 941.34, 117389.00, 67642.00, 67638.66, 83265.34, 5237.00, 83254.00, 5229.00, 85652.12, 26512.84, 85689.77, 26505.52, 90013.66, 5384.91, 89982.66, 5392.21, 99430.34, 5523.67, 99461.99, 5535.22, 99003.33, 11299.03, 98966.01, 10825.39, 108187.27, 7.16, 29818.00, 226.14, 297692.00, 58.31, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00]
 // DATA-END
         ];
         try {
@@ -268,6 +265,11 @@
           commit version.
         </p>
 
+        <p>
+          Producers are sending non-persistent messages and do NOT wait for
+          a broker ack before sending the next message.  Consumers use auto ack.
+        </p>
+
         <h2>General</h2>
 
         <div class='chart-section'>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961139&r1=961138&r2=961139&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 04:09:36 2010
@@ -16,169 +16,40 @@
  */
 package org.apache.activemq.apollo.broker.perf
 
-import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import _root_.org.apache.activemq.metric.{Period, MetricAggregator, MetricCounter}
-import _root_.java.lang.{String}
-import _root_.org.junit.{Test, Before}
-
-import org.apache.activemq.transport.TransportFactory
-
-import _root_.scala.collection.JavaConversions._
-import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.apache.activemq.broker.store.Store
-import java.util.ArrayList
-import org.fusesource.hawtdispatch.BaseRetained
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import _root_.org.apache.activemq.metric.{Period}
 import org.apache.activemq.apollo.broker._
-import org.scalatest._
 import _root_.org.fusesource.hawtbuf._
-import java.io.{PrintStream, FileOutputStream, File, IOException}
-import org.apache.activemq.util.{IOHelper, ProcessSupport}
-import scala.util.matching.Regex
-import org.apache.activemq.apollo.dto.BrokerDTO
+import java.net.URL
 
-object BaseBrokerPerfSupport {
-  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
-  var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
-
-  // Set to use tcp IO
-  protected var TCP = true;
-
-  var USE_KAHA_DB = true;
-  var PURGE_STORE = true;
-
-  // Set to put senders and consumers on separate brokers.
-  var MULTI_BROKER = false;
-
-  var DUMP_REPORT_COLS = true;
-}
-
-abstract class BaseBrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach {
-  import BaseBrokerPerfSupport._
-
-
-  protected var sendBrokerBindURI: String = null
-  protected var receiveBrokerBindURI: String = null
-  protected var sendBrokerConnectURI: String = null
-  protected var receiveBrokerConnectURI: String = null
-
-  protected var producerCount = 0
-  protected var consumerCount = 0
-  protected var destCount = 0
-
-  protected val totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
-  protected val totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items")
-
-  protected var sendBroker: Broker = null
-  protected var rcvBroker: Broker = null
-  protected val brokers = new ArrayList[Broker]()
-  protected val msgIdGenerator = new AtomicLong()
-  val stopping = new AtomicBoolean()
-
-  val producers = new ArrayList[RemoteProducer]()
-  val consumers = new ArrayList[RemoteConsumer]()
-
-  var samples:List[(String, AnyRef)] = Nil
-
-  override protected def beforeEach() = {
-    totalProducerRate.removeAllMetrics
-    totalConsumerRate.removeAllMetrics
-    brokers.clear
-    producers.clear
-    consumers.clear
-    stopping.set(false)
-    rcvBroker=null
-    sendBroker=null
-    producerCount = 0
-    consumerCount = 0
-    destCount =0
-  }
-
-  override protected def beforeAll(configMap: Map[String, Any]) = {
-    super.beforeAll(configMap)
-    if (TCP) {
-      sendBrokerBindURI = "tcp://localhost:10000";
-      receiveBrokerBindURI = "tcp://localhost:20000";
-
-      sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
-      receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat();
-    } else {
-      sendBrokerConnectURI = "pipe://SendBroker";
-      receiveBrokerConnectURI = "pipe://ReceiveBroker";
-      
-      sendBrokerBindURI = sendBrokerConnectURI;
-      receiveBrokerBindURI = receiveBrokerConnectURI;
-    }
-  }
-
-  override protected def afterAll() = {
-    val basedir = new File(System.getProperty("user.home", "."))
-    val csvfile = new File(basedir, "perf-"+getClass.getName+".html");
-
-    val report_parser = """(?s)(.*// DATA-START\r?\n)(.*)(// DATA-END.*)""".r
-
-    // Load the previous dataset if the file exists
-    var report_data = ""
-    if( csvfile.exists ) {
-      IOHelper.readText(csvfile) match {
-        case report_parser(_, data, _) =>
-          report_data = data.stripLineEnd
-        case _ =>
-          println("could not parse existing report file: "+csvfile);
-      }
-    }
+/**
+ * 
+ */
+abstract class BaseBrokerPerfSupport extends BrokerPerfSupport {
 
-    // Load the report template and parse it..
-    val template = IOHelper.readText(classOf[BaseBrokerPerfSupport].getResourceAsStream("report.html"))
-    template match {
-      case report_parser(report_header, _, report_footer) =>
-        var notes = System.getProperty("notes")
-        if( notes==null ) {
-          val version = new String(ProcessSupport.system("git", "rev-list", "--max-count=1", "HEAD").toByteArray).trim
-          notes = "commit "+version
-        }
+  PERSISTENT = false
 
-        if( !report_data.isEmpty ) {
-          report_data += ",\n"
-        }
-        report_data += "            ['"+jsescape(notes)+"', "+samples.map(x=>String.format("%.2f",x._2)).mkString(", ")+"]\n"
-        IOHelper.writeText(csvfile, report_header+report_data+report_footer)
-      case _ =>
-        println("could not parse template report file");
-    }
+  def reportResourceTemplate():URL = { classOf[BaseBrokerPerfSupport].getResource("report.html") }
+  def partitionedLoad = List(1, 2, 4, 8, 10)
+  def highContention = 10
+  def messageSizes = List(20,1024,1024*256)
 
-    println("Updated: "+csvfile);
-    
-    if( DUMP_REPORT_COLS ) {
-      samples.map(_._1).foreach{x=>
-        println("          data.addColumn('number', '"+x+"');");
-      }
-    }
-  }
+  // benchmark( all the combinations
+  for( ptp<- List(true,false) ; durable <- List(false) ; messageSize <- messageSizes ) {
 
-  def jsescape(value:String) = {
-    var rc = ""
-    value.foreach{ c=>
-      c match {
-        case '\n'=> rc+="\\n"
-        case '\r'=> rc+="\\r"
-        case '\t'=> rc+="\\t"
-        case '\''=> rc+="\\\'"
-        case '\"'=> rc+="\\\""
-        case _ => rc+=c
+    def benchmark(name:String)(func: =>Unit) {
+      test(name) {
+        this.PTP = ptp
+        this.DURABLE = durable
+        this.MESSAGE_SIZE = messageSize
+        func
       }
     }
-    rc
-  }
-
-  // Test all the combinations
-  for( PTP<- List(true,false) ; PERSISTENT <- List(false); DURABLE <- List(false) ; size <- List(20,1024,1024*256)) {
 
-    val prefix = (if( PTP ) "queue " else "topic ") +(if( PERSISTENT ) "persistent " else "")+(if((size%1024)==0) (size/1024)+"k" else size+"b" )+" "
-    val suffix = (if( DURABLE ) " durable" else "")
+    val prefix = (if( ptp ) "queue " else "topic ") +(if((messageSize%1024)==0) (messageSize/1024)+"k" else messageSize+"b" )+" "
+    val suffix = (if( durable ) " durable" else "")
 
-    if( PTP && DURABLE ) {
-      // skip this iteration since queues and durable subs don't mix.
+    if( ptp && durable ) {
+      // skip this combination since queues and durable subs don't mix.
     } else {
 
       /**
@@ -186,8 +57,8 @@ abstract class BaseBrokerPerfSupport ext
        * Divide by 2 and compare against 1-1-1 to figure out what the broker dispatching
        * overhead is.
        */
-      if (!PTP) {
-        test(prefix+"1->1->0"+suffix) {
+      if (!ptp) {
+        benchmark(prefix+"1->1->0"+suffix) {
           producerCount = 1;
           destCount = 1;
 
@@ -204,11 +75,10 @@ abstract class BaseBrokerPerfSupport ext
       }
 
       /**
-       * Test increasing partitioned load.  Should linearly scale up on multi-core
-       * machines until CPU usage is maxed.
+       * benchmark( increasing partitioned load.
        */
-      for( count <- List(1, 2, 4, 8, 10) ) {
-        test(format("%s%d->%d->%d%s", prefix, count, count, count, suffix)) {
+      for( count <- partitionedLoad ) {
+        benchmark(format("%s%d->%d->%d%s", prefix, count, count, count, suffix)) {
           println(testName)
           producerCount = count;
           destCount = count;
@@ -226,11 +96,11 @@ abstract class BaseBrokerPerfSupport ext
       }
 
       /**
-       * Test the effects of high producer and consumer contention on a single
+       * benchmark( the effects of high producer and consumer contention on a single
        * destination.
        */
-      for( (producers, consumers) <- List((10, 1), (1, 10), (10, 10)) ) {
-        test(format("%s%d->1->%d%s", prefix, producers, consumers, suffix)) {
+      for( (producers, consumers) <- List((highContention, 1), (1, highContention), (highContention, highContention)) ) {
+        benchmark(format("%s%d->1->%d%s", prefix, producers, consumers, suffix)) {
           producerCount = producers;
           consumerCount = consumers;
           destCount = 1;
@@ -248,7 +118,7 @@ abstract class BaseBrokerPerfSupport ext
       }
 
 //    /**
-//     *  Tests 1 producers sending to 1 destination with 1 slow and 1 fast consumer.
+//     *  benchmark(s 1 producers sending to 1 destination with 1 slow and 1 fast consumer.
 //     *
 //     * queue case: the producer should not slow down since it can dispatch to the
 //     *             fast consumer
@@ -257,7 +127,7 @@ abstract class BaseBrokerPerfSupport ext
 //     *             slow consumer.
 //     *
 //     */
-//    test("1->1->[1 slow,1 fast]") {
+//    benchmark("1->1->[1 slow,1 fast]") {
 //      producerCount = 2;
 //      destCount = 1;
 //      consumerCount = 2;
@@ -274,7 +144,7 @@ abstract class BaseBrokerPerfSupport ext
 //      }
 //    }
 //
-//    test("2->2->[1,1 selecting]") {
+//    benchmark("2->2->[1,1 selecting]") {
 //      producerCount = 2;
 //      destCount = 2;
 //      consumerCount = 2;
@@ -298,12 +168,12 @@ abstract class BaseBrokerPerfSupport ext
 //    }
 
 //    /**
-//     * Test sending with 1 high priority sender. The high priority sender should
+//     * benchmark( sending with 1 high priority sender. The high priority sender should
 //     * have higher throughput than the other low priority senders.
 //     *
 //     * @throws Exception
 //     */
-//    test("[1 high, 1 normal]->1->1") {
+//    benchmark("[1 high, 1 normal]->1->1") {
 //      producerCount = 2;
 //      destCount = 1;
 //      consumerCount = 1;
@@ -335,12 +205,12 @@ abstract class BaseBrokerPerfSupport ext
 //    }
 
 //    /**
-//     * Test sending with 1 high priority sender. The high priority sender should
+//     * benchmark( sending with 1 high priority sender. The high priority sender should
 //     * have higher throughput than the other low priority senders.
 //     *
 //     * @throws Exception
 //     */
-//    test("[1 high, 1 mixed, 1 normal]->1->1") {
+//    benchmark("[1 high, 1 mixed, 1 normal]->1->1") {
 //      producerCount = 2;
 //      destCount = 1;
 //      consumerCount = 1;
@@ -375,336 +245,6 @@ abstract class BaseBrokerPerfSupport ext
 
     }
 
-    def createConnections() = {
-
-      if (MULTI_BROKER) {
-        sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
-        rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
-        brokers.add(sendBroker);
-        brokers.add(rcvBroker);
-      } else {
-        sendBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
-        rcvBroker = sendBroker
-        brokers.add(sendBroker);
-      }
-
-      startBrokers();
-
-      var dests = new Array[Destination](destCount);
-
-      for (i <- 0 until destCount) {
-        val domain = if (PTP) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
-        val name = new AsciiBuffer("dest" + (i + 1))
-        var bean = new SingleDestination(domain, name)
-        dests(i) = bean;
-//        if (PTP) {
-//          sendBroker.defaultVirtualHost.createQueue(dests(i));
-//          if (MULTI_BROKER) {
-//            rcvBroker.defaultVirtualHost.createQueue(dests(i));
-//          }
-//        }
-      }
-
-      for (i <- 0 until producerCount) {
-        var destination = dests(i % destCount);
-        var producer = _createProducer(i, destination);
-        producer.persistentDelivery = PERSISTENT;
-        producers.add(producer);
-      }
-
-      for (i <- 0 until consumerCount) {
-        var destination = dests(i % destCount);
-        var consumer = _createConsumer(i, destination);
-        consumer.durable = DURABLE;
-        consumers.add(consumer);
-      }
-
-      // Create MultiBroker connections:
-      // if (multibroker) {
-      // Pipe<Message> pipe = new Pipe<Message>();
-      // sendBroker.createBrokerConnection(rcvBroker, pipe);
-      // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
-      // }
-    }
-
-    def _createConsumer(i: Int, destination: Destination): RemoteConsumer = {
-
-      var consumer = createConsumer();
-      consumer.brokerPerfTest = this
-
-      consumer.uri = connectUri(rcvBroker)
-      consumer.destination = destination
-      consumer.name = "consumer" + (i + 1)
-      consumer.totalConsumerRate = totalConsumerRate
-      return consumer;
-    }
-
-    def connectUri(broker:Broker) = {
-      broker.config.connectors.get(0).advertise
-    }
-
-
-    def _createProducer(id: Int, destination: Destination): RemoteProducer = {
-      var producer = createProducer();
-      producer.brokerPerfTest = this
-      producer.uri = connectUri(sendBroker)
-      producer.producerId = id + 1
-      producer.name = "producer" + (id + 1)
-      producer.destination = destination
-      producer.messageIdGenerator = msgIdGenerator
-      producer.totalProducerRate = totalProducerRate
-      producer.payloadSize = size;
-      producer
-    }
-
-    def stopServices() = {
-      println("waiting for services to stop");
-      stopping.set(true);
-      var tracker = new LoggingTracker("broker shutdown")
-      for (broker <- brokers) {
-        tracker.stop(broker)
-      }
-      tracker.await
-      tracker = new LoggingTracker("producer shutdown")
-      for (connection <- producers) {
-        tracker.stop(connection)
-      }
-      tracker.await
-      tracker = new LoggingTracker("consumer shutdown")
-      for (connection <- consumers) {
-        tracker.stop(connection)
-      }
-      tracker.await
-    }
-
-    def startBrokers() = {
-      val tracker = new LoggingTracker("test broker startup")
-      for (broker <- brokers) {
-        tracker.start(broker)
-      }
-      tracker.await
-    }
-
-
-    def startClients() = {
-      var tracker = new LoggingTracker("test consumer startup")
-      for (connection <- consumers) {
-        tracker.start(connection)
-      }
-      tracker.await
-      // let the consumers drain the destination for a bit...
-      Thread.sleep(1000)
-      tracker = new LoggingTracker("test producer startup")
-      for (connection <- producers) {
-        tracker.start(connection)
-      }
-      tracker.await
-    }
-
-    def reportRates() = {
-
-      println("Warming up...");
-      Thread.sleep(SAMPLE_PERIOD);
-      totalProducerRate.reset();
-      totalConsumerRate.reset();
-
-      println("Sampling rates");
-
-      case class Summary(producer:java.lang.Float, pdev:java.lang.Float, consumer:java.lang.Float, cdev:java.lang.Float)
-
-      val sample_rates = new Array[Summary](PERFORMANCE_SAMPLES)
-      var best = 0;
-
-      for (i <- 0 until PERFORMANCE_SAMPLES) {
-        var p = new Period();
-        Thread.sleep(SAMPLE_PERIOD);
-        if( producerCount > 0 ) {
-          println(totalProducerRate.getRateSummary(p));
-        }
-        if( consumerCount > 0 ) {
-          println(totalConsumerRate.getRateSummary(p));
-        }
-
-        sample_rates(i) = Summary(totalProducerRate.total(p), totalProducerRate.deviation, totalConsumerRate.total(p), totalConsumerRate.deviation)
-
-        val current_sum = sample_rates(i).producer.longValue + sample_rates(i).consumer.longValue
-        val best_sum = sample_rates(i).producer.longValue + sample_rates(i).consumer.longValue
-        if( current_sum > best_sum ) {
-          best = i
-        }
-
-        totalProducerRate.reset();
-        totalConsumerRate.reset();
-      }
-
-
-      if( producerCount > 0 ) {
-        samples = samples ::: ( testName+" producer", sample_rates(best).producer ) :: Nil
-        if( producerCount > 1 ) {
-          samples = samples ::: ( testName+" producer sd", sample_rates(best).pdev ) :: Nil
-        }
-      }
-      if( consumerCount > 0 ) {
-        samples = samples ::: ( testName+" consumer", sample_rates(best).consumer ) :: Nil
-        if( consumerCount > 1 ) {
-          samples = samples ::: ( testName+" consumer sd", sample_rates(best).cdev ) :: Nil
-        }
-      }
-
-
-    }
-  }
-  
-  protected def createConsumer(): RemoteConsumer
-  protected def createProducer(): RemoteProducer
-
-  def getBrokerWireFormat() = "multi"
-  def getRemoteWireFormat(): String
-
-  def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
-
-    val config = Broker.defaultConfig
-    val connector = config.connectors.get(0)
-    connector.bind = bindURI
-    connector.advertise = connectUri
-    connector.protocol = getBrokerWireFormat
-
-    val host = config.virtualHosts.get(0)
-    host.purgeOnStartup = true
-
-    val broker = new Broker()
-    broker.config = config
-    broker
-  }
-
-//  def createStore(broker: Broker): Store = {
-//    val store = if (USE_KAHA_DB) {
-//      StoreFactory.createStore("hawtdb");
-//    } else {
-//      StoreFactory.createStore("memory");
-//    }
-//    store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.id));
-//    store.setDeleteAllMessages(PURGE_STORE);
-//    store
-//  }
-
-}
-
-abstract class RemoteConsumer extends Connection {
-  val consumerRate = new MetricCounter();
-  var totalConsumerRate: MetricAggregator = null
-  var thinkTime: Long = 0
-  var destination: Destination = null
-  var selector: String = null;
-  var durable = false;
-  var uri: String = null
-  var name:String = null
-  var brokerPerfTest:BaseBrokerPerfSupport = null
-
-  override protected def _start(onComplete:Runnable) = {
-    if( consumerRate.getName == null ) {
-      consumerRate.name("Consumer " + name + " Rate");
-    }
-    totalConsumerRate.add(consumerRate);
-    transport = TransportFactory.connect(uri);
-    super._start(onComplete);
-  }
-
-
-  override def onTransportConnected() = {
-    setupSubscription();
-    transport.resumeRead
-  }
-
-  override def onTransportFailure(error: IOException) = {
-    if (!stopped) {
-      if(brokerPerfTest.stopping.get()) {
-        transport.stop
-      } else {
-        onFailure(error);
-      }
-    }
   }
 
-  protected def setupSubscription()
-
 }
-
-
-abstract class RemoteProducer extends Connection {
-  val rate = new MetricCounter();
-
-  var name:String = null
-  var messageIdGenerator: AtomicLong = null
-  var priority = 0
-  var persistentDelivery = false
-  var priorityMod = 0
-  var counter = 0
-  var producerId = 0
-  var destination: Destination = null
-  var property: String = null
-  var totalProducerRate: MetricAggregator = null
-  var next: Delivery = null
-  var thinkTime: Long = 0
-
-  var filler: String = null
-  var payloadSize = 20
-  var uri: String = null
-  var brokerPerfTest:BaseBrokerPerfSupport = null
-
-  override def onTransportFailure(error: IOException) = {
-    if (!brokerPerfTest.stopping.get()) {
-      System.err.println("Client Async Error:");
-      error.printStackTrace();
-    }
-  }
-
-  override protected def _start(onComplete:Runnable) = {
-
-    if (payloadSize > 0) {
-      var sb = new StringBuilder(payloadSize);
-      for (i <- 0 until payloadSize) {
-        sb.append(('a' + (i % 26)).toChar);
-      }
-      filler = sb.toString();
-    }
-
-    if( rate.getName == null ) {
-      rate.name("Producer " + name + " Rate");
-    }
-    totalProducerRate.add(rate);
-
-    transport = TransportFactory.connect(uri);
-    super._start(onComplete);
-
-  }
-
-  override def onTransportConnected() = {
-    setupProducer();
-    transport.resumeRead
-  }
-
-  def setupProducer()
-
-def createPayload(): String = {
-    if (payloadSize >= 0) {
-      var sb = new StringBuilder(payloadSize);
-      sb.append(name);
-      sb.append(':');
-      counter += 1
-      sb.append(counter);
-      sb.append(':');
-      var length = sb.length;
-      if (length <= payloadSize) {
-        sb.append(filler.subSequence(0, payloadSize - length));
-        return sb.toString();
-      } else {
-        return sb.substring(0, payloadSize);
-      }
-    } else {
-      counter += 1
-      return name + ":" + (counter);
-    }
-  }
-
-}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala?rev=961139&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BasePersistentBrokerPerfSupport.scala Wed Jul  7 04:09:36 2010
@@ -0,0 +1,38 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.perf
+
+import java.net.URL
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class BasePersistentBrokerPerfSupport  extends BaseBrokerPerfSupport {
+
+  PERSISTENT = true
+  
+  override def reportResourceTemplate():URL = { classOf[BasePersistentBrokerPerfSupport].getResource("persistent-report.html") }
+
+  override def partitionedLoad = List(1, 10, 50, 100)
+
+  override def highContention = 100
+
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala?rev=961139&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BrokerPerfSupport.scala Wed Jul  7 04:09:36 2010
@@ -0,0 +1,502 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.perf
+
+import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import _root_.java.lang.{String}
+import org.apache.activemq.transport.TransportFactory
+
+import java.util.ArrayList
+import org.apache.activemq.apollo.broker._
+import org.scalatest._
+import java.io.{File, IOException}
+import org.apache.activemq.util.{IOHelper, ProcessSupport}
+import org.apache.activemq.metric.{Period, MetricAggregator, MetricCounter}
+import org.fusesource.hawtbuf.AsciiBuffer
+import collection.mutable.ListBuffer
+import java.net.URL
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdispatch.ScalaDispatch._
+
+/**
+ * 
+ */
+abstract class BrokerPerfSupport extends FunSuiteSupport with BeforeAndAfterEach {
+
+  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "6"))
+  var SAMPLE_PERIOD = java.lang.Long.parseLong(System.getProperty("SAMPLE_PERIOD", "1000"))
+
+  protected var TCP = true // Set to use tcp IO
+
+  var USE_KAHA_DB = true
+  var PURGE_STORE = true
+
+  // Set to put senders and consumers on separate brokers.
+  var MULTI_BROKER = false
+
+  var DUMP_REPORT_COLS = true
+
+
+  var PTP = false
+  var PERSISTENT = false
+  var DURABLE = false
+  var MESSAGE_SIZE = 20
+
+
+  protected var sendBrokerBindURI: String = null
+  protected var receiveBrokerBindURI: String = null
+  protected var sendBrokerConnectURI: String = null
+  protected var receiveBrokerConnectURI: String = null
+
+  protected var producerCount = 0
+  protected var consumerCount = 0
+  protected var destCount = 0
+
+  protected var totalProducerRate:MetricAggregator = null
+  protected var totalConsumerRate:MetricAggregator = null 
+
+  protected var sendBroker: Broker = null
+  protected var rcvBroker: Broker = null
+  protected val brokers = ListBuffer[Broker]()
+  protected val msgIdGenerator = new AtomicLong()
+  val stopping = new AtomicBoolean()
+
+  val producers = ListBuffer[RemoteProducer]()
+  val consumers = ListBuffer[RemoteConsumer]()
+
+  var samples:List[(String, AnyRef)] = Nil
+
+  override protected def beforeEach() = {
+    totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
+    totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items")
+    brokers.clear
+    producers.clear
+    consumers.clear
+    stopping.set(false)
+    rcvBroker=null
+    sendBroker=null
+    producerCount = 0
+    consumerCount = 0
+    destCount =0
+  }
+
+  override protected def beforeAll(configMap: Map[String, Any]) = {
+    super.beforeAll(configMap)
+    if (TCP) {
+      sendBrokerBindURI = "tcp://localhost:10000"
+      receiveBrokerBindURI = "tcp://localhost:20000"
+
+      sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat()
+      receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat()
+    } else {
+      sendBrokerConnectURI = "pipe://SendBroker"
+      receiveBrokerConnectURI = "pipe://ReceiveBroker"
+
+      sendBrokerBindURI = sendBrokerConnectURI
+      receiveBrokerBindURI = receiveBrokerConnectURI
+    }
+  }
+
+  def reportResourceTemplate:URL
+
+  def reportTargetName = "perf-"+getClass.getName+".html"
+
+  override protected def afterAll() = {
+    val basedir = new File(System.getProperty("user.home", "."))
+    val htmlFile = new File(basedir, reportTargetName)
+
+    val report_parser = """(?s)(.*// DATA-START\r?\n)(.*)(// DATA-END.*)""".r
+
+    // Load the previous dataset if the file exists
+    var report_data = ""
+    if( htmlFile.exists ) {
+      IOHelper.readText(htmlFile) match {
+        case report_parser(_, data, _) =>
+          report_data = data.stripLineEnd
+        case _ =>
+          println("could not parse existing report file: "+htmlFile)
+      }
+    }
+
+    // Load the report template and parse it..
+    val template = IOHelper.readText(reportResourceTemplate.openStream)
+    template match {
+      case report_parser(report_header, _, report_footer) =>
+        var notes = System.getProperty("notes")
+        if( notes==null ) {
+          val version = new String(ProcessSupport.system("git", "rev-list", "--max-count=1", "HEAD").toByteArray).trim
+          notes = "commit "+version
+        }
+
+        if( !report_data.isEmpty ) {
+          report_data += ",\n"
+        }
+        report_data += "            ['"+jsescape(notes)+"', "+samples.map(x=>String.format("%.2f",x._2)).mkString(", ")+"]\n"
+        IOHelper.writeText(htmlFile, report_header+report_data+report_footer)
+      case _ =>
+        println("could not parse template report file")
+    }
+
+    println("Updated: "+htmlFile)
+
+    if( DUMP_REPORT_COLS ) {
+      samples.map(_._1).foreach{x=>
+        println("          data.addColumn('number', '"+x+"')")
+      }
+    }
+  }
+
+  def jsescape(value:String) = {
+    var rc = ""
+    value.foreach{ c=>
+      c match {
+        case '\n'=> rc+="\\n"
+        case '\r'=> rc+="\\r"
+        case '\t'=> rc+="\\t"
+        case '\''=> rc+="\\\'"
+        case '\"'=> rc+="\\\""
+        case _ => rc+=c
+      }
+    }
+    rc
+  }
+
+
+
+  protected def createConsumer(): RemoteConsumer
+  protected def createProducer(): RemoteProducer
+
+  def getBrokerWireFormat() = "multi"
+  def getRemoteWireFormat(): String
+
+  def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
+
+    val config = Broker.defaultConfig
+    val connector = config.connectors.get(0)
+    connector.bind = bindURI
+    connector.advertise = connectUri
+    connector.protocol = getBrokerWireFormat
+
+    val host = config.virtualHosts.get(0)
+    host.purgeOnStartup = true
+
+    val broker = new Broker()
+    broker.config = config
+    broker
+  }
+
+  def createConnections() = {
+
+    if (MULTI_BROKER) {
+      sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI)
+      rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI)
+      brokers += (sendBroker)
+      brokers += (rcvBroker)
+    } else {
+      sendBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI)
+      rcvBroker = sendBroker
+      brokers += (sendBroker)
+    }
+
+    startBrokers()
+
+    var dests = new Array[Destination](destCount)
+
+    for (i <- 0 until destCount) {
+      val domain = if (PTP) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+      val name = new AsciiBuffer("dest" + (i + 1))
+      var bean = new SingleDestination(domain, name)
+      dests(i) = bean
+//        if (PTP) {
+//          sendBroker.defaultVirtualHost.createQueue(dests(i))
+//          if (MULTI_BROKER) {
+//            rcvBroker.defaultVirtualHost.createQueue(dests(i))
+//          }
+//        }
+    }
+
+    for (i <- 0 until producerCount) {
+      var destination = dests(i % destCount)
+      var producer = _createProducer(i, destination)
+      producer.persistent = PERSISTENT
+      producers += (producer)
+    }
+
+    for (i <- 0 until consumerCount) {
+      var destination = dests(i % destCount)
+      var consumer = _createConsumer(i, destination)
+      consumer.persistent = PERSISTENT
+      consumer.durable = DURABLE
+      consumers += (consumer)
+    }
+
+    // Create MultiBroker connections:
+    // if (multibroker) {
+    // Pipe<Message> pipe = new Pipe<Message>()
+    // sendBroker.createBrokerConnection(rcvBroker, pipe)
+    // rcvBroker.createBrokerConnection(sendBroker, pipe.connect())
+    // }
+  }
+
+  def _createConsumer(i: Int, destination: Destination): RemoteConsumer = {
+
+    var consumer = createConsumer()
+    consumer.stopping = stopping
+
+    consumer.uri = connectUri(rcvBroker)
+    consumer.destination = destination
+    consumer.name = "Consumer:" + (i + 1)
+    consumer.rateAggregator = totalConsumerRate
+    consumer.init
+    
+    return consumer
+  }
+
+  def connectUri(broker:Broker) = {
+    broker.config.connectors.get(0).advertise
+  }
+
+
+  def _createProducer(id: Int, destination: Destination): RemoteProducer = {
+    var producer = createProducer()
+    producer.stopping = stopping
+
+    producer.uri = connectUri(sendBroker)
+    producer.producerId = id + 1
+    producer.name = "Producer:" + (id + 1)
+    producer.destination = destination
+    producer.messageIdGenerator = msgIdGenerator
+    producer.rateAggregator = totalProducerRate
+    producer.payloadSize = MESSAGE_SIZE
+    producer.init
+    producer
+  }
+
+  def stopServices() = {
+    println("waiting for services to stop")
+    stopping.set(true)
+    var tracker = new LoggingTracker("broker shutdown")
+    for (broker <- brokers) {
+      tracker.stop(broker)
+    }
+    tracker.await
+    tracker = new LoggingTracker("producer shutdown")
+    for (connection <- producers) {
+      tracker.stop(connection)
+    }
+    tracker.await
+    tracker = new LoggingTracker("consumer shutdown")
+    for (connection <- consumers) {
+      tracker.stop(connection)
+    }
+    tracker.await
+  }
+
+  def startBrokers() = {
+    val tracker = new LoggingTracker("test broker startup")
+    for (broker <- brokers) {
+      tracker.start(broker)
+    }
+    tracker.await
+  }
+
+
+  def startClients() = {
+    var tracker = new LoggingTracker("test consumer startup")
+    for (connection <- consumers) {
+      tracker.start(connection)
+    }
+    tracker.await
+    // let the consumers drain the destination for a bit...
+    Thread.sleep(1000)
+    tracker = new LoggingTracker("test producer startup")
+    for (connection <- producers) {
+      tracker.start(connection)
+    }
+    tracker.await
+  }
+
+  def reportRates() = {
+
+    println("Warming up...")
+    Thread.sleep(SAMPLE_PERIOD)
+    totalProducerRate.reset()
+    totalConsumerRate.reset()
+
+    println("Sampling rates")
+
+    case class Summary(producer:java.lang.Float, pdev:java.lang.Float, consumer:java.lang.Float, cdev:java.lang.Float)
+
+    val sample_rates = new Array[Summary](PERFORMANCE_SAMPLES)
+    var best = 0
+
+    for (i <- 0 until PERFORMANCE_SAMPLES) {
+      var p = new Period()
+      Thread.sleep(SAMPLE_PERIOD)
+      if( producerCount > 0 ) {
+        println(totalProducerRate.getRateSummary(p))
+      }
+      if( consumerCount > 0 ) {
+        println(totalConsumerRate.getRateSummary(p))
+      }
+
+      sample_rates(i) = Summary(totalProducerRate.total(p), totalProducerRate.deviation, totalConsumerRate.total(p), totalConsumerRate.deviation)
+
+      val current_sum = sample_rates(i).producer.longValue + sample_rates(i).consumer.longValue
+      val best_sum = sample_rates(best).producer.longValue + sample_rates(best).consumer.longValue
+      if( current_sum > best_sum ) {
+        best = i
+      }
+
+      totalProducerRate.reset()
+      totalConsumerRate.reset()
+    }
+
+
+    if( producerCount > 0 ) {
+      samples = samples ::: ( testName+" producer", sample_rates(best).producer ) :: Nil
+      if( producerCount > 1 ) {
+        samples = samples ::: ( testName+" producer sd", sample_rates(best).pdev ) :: Nil
+      }
+    }
+    if( consumerCount > 0 ) {
+      samples = samples ::: ( testName+" consumer", sample_rates(best).consumer ) :: Nil
+      if( consumerCount > 1 ) {
+        samples = samples ::: ( testName+" consumer sd", sample_rates(best).cdev ) :: Nil
+      }
+    }
+
+
+  }
+
+
+}
+abstract class RemoteConnection extends Connection {
+  var uri: String = null
+  var name:String = null
+
+  val rate = new MetricCounter()
+  var rateAggregator: MetricAggregator = null
+
+  var stopping:AtomicBoolean = null
+  var destination: Destination = null
+
+  def init = {
+    if( rate.getName == null ) {
+      rate.name(name + " Rate")
+    }
+    rateAggregator.add(rate)
+  }
+
+  var callbackWhenConnected:Runnable = null
+
+  override protected def _start(onComplete:Runnable) = {
+    callbackWhenConnected = onComplete
+    transport = TransportFactory.connect(uri)
+    super._start(^{ })
+  }
+
+  override def onTransportConnected() = {
+    onConnected()
+    transport.resumeRead
+    callbackWhenConnected.run
+    callbackWhenConnected = null
+  }
+
+  protected def onConnected()
+
+  override def onTransportFailure(error: IOException) = {
+    if (!stopped) {
+      if(stopping.get()) {
+        transport.stop
+      } else {
+        onFailure(error)
+        if( callbackWhenConnected!=null ) {
+          warn("connect attempt failed. wil retry connection..")
+          dispatchQueue.dispatchAfter(50, TimeUnit.MILLISECONDS, ^{
+            if(stopping.get()) {
+              callbackWhenConnected.run
+            } else {
+              // try to connect again...
+              transport = TransportFactory.connect(uri)
+              super._start(^{ })
+            }
+          })
+        }
+      }
+    }
+  }
+
+}
+
+abstract class RemoteConsumer extends RemoteConnection {
+  var thinkTime: Long = 0
+  var selector: String = null
+  var durable = false
+  var persistent = false
+}
+
+
+abstract class RemoteProducer extends RemoteConnection {
+
+  var messageIdGenerator: AtomicLong = null
+  var priority = 0
+  var persistent = false
+  var priorityMod = 0
+  var counter = 0
+  var producerId = 0
+  var property: String = null
+  var next: Delivery = null
+  var thinkTime: Long = 0
+
+  var filler: String = null
+  var payloadSize = 20
+
+  override def init = {
+    super.init
+
+    if (payloadSize > 0) {
+      var sb = new StringBuilder(payloadSize)
+      for (i <- 0 until payloadSize) {
+        sb.append(('a' + (i % 26)).toChar)
+      }
+      filler = sb.toString()
+    }
+  }
+
+  def createPayload(): String = {
+    if (payloadSize >= 0) {
+      var sb = new StringBuilder(payloadSize)
+      sb.append(name)
+      sb.append(':')
+      counter += 1
+      sb.append(counter)
+      sb.append(':')
+      var length = sb.length
+      if (length <= payloadSize) {
+        sb.append(filler.subSequence(0, payloadSize - length))
+        return sb.toString()
+      } else {
+        return sb.substring(0, payloadSize)
+      }
+    } else {
+      counter += 1
+      return name + ":" + (counter)
+    }
+  }
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961139&r1=961138&r2=961139&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 04:09:36 2010
@@ -28,10 +28,9 @@ import AsciiBuffer._
 import Stomp._
 import _root_.org.apache.activemq.apollo.stomp.StompFrame
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.fusesource.hawtdispatch.BaseRetained
+
 
 class StompBrokerPerfTest extends BaseBrokerPerfSupport {
-  println(getClass.getClassLoader.getResource("log4j.properties"));
 
   override def createProducer() = new StompRemoteProducer()
 
@@ -41,10 +40,21 @@ class StompBrokerPerfTest extends BaseBr
 
 }
 
+class StompPersistentBrokerPerfTest extends BasePersistentBrokerPerfSupport {
+  
+  override def createProducer() = new StompRemoteProducer()
+
+  override def createConsumer() = new StompRemoteConsumer()
+
+  override def getRemoteWireFormat() = "stomp"
+
+}
+
+
 class StompRemoteConsumer extends RemoteConsumer {
   var outboundSink: OverflowSink[StompFrame] = null
 
-  def setupSubscription() = {
+  def onConnected() = {
     outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
     outboundSink.refiller = ^{}
 
@@ -60,7 +70,10 @@ class StompRemoteConsumer extends Remote
     var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
     headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
     headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
-    headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO)
+
+    if( persistent ) {
+      headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT)
+    }
 
     frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
     outboundSink.offer(frame);
@@ -72,6 +85,13 @@ class StompRemoteConsumer extends Remote
       case StompFrame(Responses.CONNECTED, headers, _, _) =>
       case StompFrame(Responses.MESSAGE, headers, content, _) =>
         messageReceived();
+
+        // we client ack if persistent messages are being used.
+        if( persistent ) {
+          var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
+          outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+        }
+
       case StompFrame(Responses.ERROR, headers, content, _) =>
         onFailure(new Exception("Server reported an error: " + frame.content));
       case _ =>
@@ -83,13 +103,13 @@ class StompRemoteConsumer extends Remote
     if (thinkTime > 0) {
       transport.suspendRead
       dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
-        consumerRate.increment();
+        rate.increment();
         if (!stopped) {
           transport.resumeRead
         }
       })
     } else {
-      consumerRate.increment();
+      rate.increment();
     }
   }
 
@@ -106,6 +126,9 @@ class StompRemoteProducer extends Remote
     if (property != null) {
       headers ::= (ascii(property), ascii(property));
     }
+    if( persistent ) {
+      headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+    }
     //    var p = this.priority;
     //    if (priorityMod > 0) {
     //        p = if ((counter % priorityMod) == 0) { 0 } else { priority }
@@ -127,16 +150,21 @@ class StompRemoteProducer extends Remote
             send_next
           }
         }
-        if (thinkTime > 0) {
-          dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
-        } else {
-          dispatchQueue << task
+
+        if( !persistent ) {
+          // if we are not going to wait for an ack back from the server,
+          // then jut send the next one...
+          if (thinkTime > 0) {
+            dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
+          } else {
+            dispatchQueue << task
+          }
         }
       }
     }
   }
 
-  override def setupProducer() = {
+  override def onConnected() = {
     outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
     outboundSink.refiller = ^ { drain }
 
@@ -152,6 +180,11 @@ class StompRemoteProducer extends Remote
   override def onTransportCommand(command: Object) = {
     var frame = command.asInstanceOf[StompFrame]
     frame match {
+      case StompFrame(Responses.RECEIPT, headers, _, _) =>
+        assert( persistent )
+        // we got the ack for the previous message we sent.. now send the next one.
+        send_next
+
       case StompFrame(Responses.CONNECTED, headers, _, _) =>
       case StompFrame(Responses.ERROR, headers, content, _) =>
         onFailure(new Exception("Server reported an error: " + frame.content.utf8));