You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/08/18 15:11:08 UTC

svn commit: r805404 - /qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Author: gsim
Date: Tue Aug 18 13:11:08 2009
New Revision: 805404

URL: http://svn.apache.org/viewvc?rev=805404&view=rev
Log:
QPID-2053: Allow queue names to be controlled for perftest (this allows multiple concurrent instances to be run). Based on a proposal and patch from Frantisek Reznicek.


Modified:
    qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=805404&r1=805403&r2=805404&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/perftest.cpp Tue Aug 18 13:11:08 2009
@@ -75,6 +75,7 @@
     // Queue policy
     uint32_t queueMaxCount;
     uint64_t queueMaxSize;
+    std::string baseName;
     bool queueDurable;
 
     // Publisher
@@ -106,8 +107,8 @@
     static const std::string helpText;
     
     Opts() :
-        TestOptions(helpText),
-        setup(false), control(false), publish(false), subscribe(false),
+        TestOptions(helpText), 
+        setup(false), control(false), publish(false), subscribe(false), baseName("perftest"),
         pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
         subs(1), ack(0),
         qt(1),singleConnect(false), iterations(1), mode(SHARED), summary(false),
@@ -144,6 +145,7 @@
 
             ("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'")
             ("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'")
+            ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics") 
             ("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)")
 
             ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
@@ -219,6 +221,13 @@
 Opts opts;
 Connection globalConnection;
 
+std::string fqn(const std::string& name)
+{
+    ostringstream fqn;
+    fqn << opts.baseName << "_" << name;
+    return fqn.str();
+}
+
 struct Client : public Runnable {
     Connection* connection;
     Connection localConnection;
@@ -257,18 +266,18 @@
     }
 
     void run() {
-        queueInit("pub_start");
-        queueInit("pub_done");
-        queueInit("sub_ready");
-        queueInit("sub_done");
-        if (opts.iterations > 1) queueInit("sub_iteration");
+        queueInit(fqn("pub_start"));
+        queueInit(fqn("pub_done"));
+        queueInit(fqn("sub_ready"));
+        queueInit(fqn("sub_done"));
+        if (opts.iterations > 1) queueInit(fqn("sub_iteration"));
         if (opts.mode==SHARED) {
             framing::FieldTable settings;//queue policy settings
             settings.setInt("qpid.max_count", opts.queueMaxCount);
             settings.setInt("qpid.max_size", opts.queueMaxSize);
             for (size_t i = 0; i < opts.qt; ++i) {
                 ostringstream qname;
-                qname << "perftest" << i;
+                qname << opts.baseName << i;
                 queueInit(qname.str(), opts.durable || opts.queueDurable, settings); 
             }
         }
@@ -384,13 +393,13 @@
     void run() {                // Controller
         try {
             // Wait for subscribers to be ready.
-            process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready"));
+            process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready"));
 
             LocalQueue pubDone;
             LocalQueue subDone;
             subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false);
-            subs.subscribe(pubDone, "pub_done");
-            subs.subscribe(subDone, "sub_done");
+            subs.subscribe(pubDone, fqn("pub_done"));
+            subs.subscribe(subDone, fqn("sub_done"));
 
             double txrateTotal(0);
             double mbytesTotal(0);
@@ -399,16 +408,16 @@
 
             for (size_t j = 0; j < opts.iterations; ++j) {
                 AbsTime start=now();
-                send(opts.totalPubs, "pub_start", "start"); // Start publishers
+                send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers
                 if (j) {
-                    send(opts.totalPubs, "sub_iteration", "next"); // Start subscribers on next iteration
+		    send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration
                 }
 
                 Stats pubRates;
                 Stats subRates;
 
-                process(opts.totalPubs, pubDone, "pub_done", boost::ref(pubRates));
-                process(opts.totalSubs, subDone, "sub_done", boost::ref(subRates));
+                process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates));
+                process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates));
 
                 AbsTime end=now(); 
 
@@ -497,7 +506,7 @@
             SubscriptionManager subs(session);
             LocalQueue lq;
             subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true); 
-            subs.subscribe(lq, "pub_start"); 
+            subs.subscribe(lq, fqn("pub_start")); 
             
             for (size_t j = 0; j < opts.iterations; ++j) {
                 expect(lq.pop().getData(), "start");
@@ -533,7 +542,7 @@
                 double time=secs(start,end);
                 
                 // Send result to controller.
-                Message report(lexical_cast<string>(opts.count/time), "pub_done");
+                Message report(lexical_cast<string>(opts.count/time), fqn("pub_done"));
                 session.messageTransfer(arg::content=report, arg::acceptMode=1);
                 if (opts.txPub){
                     sync(session).txCommit();
@@ -587,7 +596,7 @@
             LocalQueue lq;
             Subscription subscription = subs.subscribe(lq, queue, settings);
             // Notify controller we are ready.
-            session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
+            session.messageTransfer(arg::content=Message("ready", fqn("sub_ready")), arg::acceptMode=1);
             if (opts.txSub) {
                 if (opts.commitAsync) session.txCommit();
                 else sync(session).txCommit();
@@ -595,13 +604,13 @@
 
             LocalQueue iterationControl;
             if (opts.iterations > 1) {
-                subs.subscribe(iterationControl, "sub_iteration", SubscriptionSettings(FlowControl::messageCredit(0)));
+                subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0)));
             }
             
             for (size_t j = 0; j < opts.iterations; ++j) {
                 if (j > 0) {
                     //need to wait here until all subs are done
-                    session.messageFlow("sub_iteration", 0, 1); 
+                    session.messageFlow(fqn("sub_iteration"), 0, 1); 
                     iterationControl.pop();
 
                     //need to allocate some more credit for subscription
@@ -643,7 +652,7 @@
 
                 // Report to publisher.
                 Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
-                               "sub_done");
+                               fqn("sub_done"));
                 session.messageTransfer(arg::content=result, arg::acceptMode=1);
                 if (opts.txSub) sync(session).txCommit();
             }
@@ -680,7 +689,7 @@
         // Start pubs/subs for each queue/topic.
         for (size_t i = 0; i < opts.qt; ++i) {
             ostringstream key;
-            key << "perftest" << i; // Queue or topic name.
+            key << opts.baseName << i; // Queue or topic name.
             if (opts.publish) {
                 size_t n = singleProcess ? opts.pubs : 1;
                 for (size_t j = 0; j < n; ++j)  {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org