You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/08 00:07:52 UTC

svn commit: r592941 - /incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Author: aconway
Date: Wed Nov  7 15:07:51 2007
New Revision: 592941

URL: http://svn.apache.org/viewvc?rev=592941&view=rev
Log:
Fix race condition in perftest.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=592941&r1=592940&r2=592941&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Wed Nov  7 15:07:51 2007
@@ -68,7 +68,24 @@
 
 struct ListenThread : public Runnable { Thread thread; void run(); };
 struct PublishThread : public Runnable { Thread thread; void run(); };
-    
+
+// Create and purge the shared queues 
+void setup() {
+    cout << "Create shared queues" << endl;
+    Connection connection;
+    opts.open(connection);
+    Session_0_10 session = connection.newSession();
+    session.setSynchronous(true); // Make sure this is all completed.
+    session.queueDeclare(arg::queue="control"); // Control queue
+    session.queuePurge(arg::queue="control");
+    if (mode==SHARED) {
+        session.queueDeclare(arg::queue="perftest"); // Shared data queue
+        session.queuePurge(arg::queue="perftest");
+    }
+    session.close();
+    connection.close();
+}
+
 int main(int argc, char** argv) {
     try {
         opts.parse(argc, argv);
@@ -78,6 +95,7 @@
         else throw Exception("Invalid mode");
         if (!opts.listen && !opts.publish)
             opts.listen = opts.publish = true;
+        setup();
         std::vector<ListenThread> listen(opts.consumers);
         PublishThread publish;
         if (opts.listen) 
@@ -122,19 +140,16 @@
         opts.open(connection);
         Session_0_10 session = connection.newSession();
 
-        session.queueDeclare(arg::queue="control"); // Control queue
-        session.queuePurge(arg::queue="control");
-        if (mode==SHARED) {
-            session.queueDeclare(arg::queue="perftest"); // Shared data queue
-            session.queuePurge(arg::queue="perftest");
-        }
-        
         // Wait for consumers.
+        cout << "Publisher wating for consumers " << flush;
         SubscriptionManager subs(session);
         LocalQueue control;
         subs.subscribe(control, "control");
-        for (int i = 0; i < opts.consumers; ++i) 
+        for (int i = 0; i < opts.consumers; ++i) {
+            cout << "." << flush;
             expect(control.pop().getData(), "ready");
+        }
+        cout << endl;
 
         // Create test message
         size_t msgSize=max(opts.size, 32);
@@ -167,6 +182,7 @@
         cout << "publish rate:" << (opts.count)/secs(start,end) << endl;
 
         //  Wait for consumer(s) to finish.
+        cout << "Publisher wating for consumer reports. " << endl;
         for (int i = 0; i < opts.consumers; ++i) {
             string report=control.pop().getData();
             if (report.find("consume") != 0)
@@ -201,14 +217,11 @@
         Session_0_10 session = connection.newSession();
 
         string consumeQueue;
-        switch (mode) {
-          case SHARED:
+        if (mode == SHARED) {
             consumeQueue="perftest";
-            session.queueDeclare(arg::queue="perftest"); 
-            break;
-          case FANOUT:
-          case TOPIC:
-            consumeQueue=session.getId().str(); // Unique
+        }
+        else {
+            consumeQueue=session.getId().str(); // Unique name.
             session.queueDeclare(arg::queue=consumeQueue,
                                  arg::exclusive=true,
                                  arg::autoDelete=true);
@@ -217,7 +230,6 @@
                               arg::routingKey="perftest");
         }
         // Notify publisher we are ready.
-        session.queueDeclare(arg::queue="control"); // Control queue
         session.messageTransfer(arg::content=Message("ready", "control"));
 
         SubscriptionManager subs(session);
@@ -226,8 +238,15 @@
         int consumed=0;
         AbsTime start=now();
         Message msg;
-        while ((msg=consume.pop()).getData() != "done")
+        if (!opts.publish)
+            cout << "Consuming " << flush;
+        while ((msg=consume.pop()).getData() != "done") {
             ++consumed;
+            if (!opts.publish && (consumed%10000) == 0)
+                cout << "." << flush;
+        }
+        if (!opts.publish)
+            cout << endl;
         msg.acknowledge();      // Ack all outstanding messages.
         AbsTime end=now();