You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/07/25 09:36:08 UTC

svn commit: r679689 - in /incubator/qpid/branches/qpid.0-10/cpp/src/tests: topic_listener.cpp topic_publisher.cpp

Author: gsim
Date: Fri Jul 25 00:36:07 2008
New Revision: 679689

URL: http://svn.apache.org/viewvc?rev=679689&view=rev
Log:
QPID-447: Optional mechanism to avoid race when automating topic tests. Patch from David Sommerseth.


Modified:
    incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_listener.cpp
    incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_publisher.cpp

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_listener.cpp?rev=679689&r1=679688&r2=679689&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_listener.cpp Fri Jul 25 00:36:07 2008
@@ -76,13 +76,15 @@
     bool transactional;
     bool durable;
     int prefetch;
+    string statusqueue;
 
     Args() : ack(0), transactional(false), durable(false), prefetch(0) {
         addOptions()
             ("ack", optValue(ack, "MODE"), "Ack frequency in messages (defaults to half the prefetch value)")
             ("transactional", optValue(transactional), "Use transactions")
             ("durable", optValue(durable), "subscribers should use durable queues")
-            ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)");
+            ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)")
+            ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to put status messages on");
     }
 };
 
@@ -102,9 +104,6 @@
             Connection connection;
             args.open(connection);
             AsyncSession session = connection.newSession();
-            if (args.transactional) {
-                session.txSelect();
-            }
 
             //declare exchange, queue and bind them:
             session.queueDeclare(arg::queue="response");
@@ -128,6 +127,17 @@
             }
             mgr.subscribe(listener, control);
             session.sync();
+
+            if( args.statusqueue.length() > 0 ) {
+                stringstream msg_str;
+                msg_str << "topic_listener: " << (int)getpid();
+                session.messageTransfer(arg::content=Message(msg_str.str(), args.statusqueue));
+                cout << "Ready status put on queue '" << args.statusqueue << "'" << endl;
+            }
+
+            if (args.transactional) {
+                session.txSelect();
+            }
             
             cout << "topic_listener: listening..." << endl;
             mgr.run();

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_publisher.cpp?rev=679689&r1=679688&r2=679689&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_publisher.cpp (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/tests/topic_publisher.cpp Fri Jul 25 00:36:07 2008
@@ -82,6 +82,7 @@
     int batches;
     int delay;
     int size;
+    string statusqueue;
 
     Args() : messages(1000), subscribers(1),
              transactional(false), durable(false),
@@ -94,7 +95,8 @@
             ("durable", optValue(durable), "messages should be durable")
             ("batches", optValue(batches, "N"), "how many batches to run")
             ("delay", optValue(delay, "SECONDS"), "Causes a delay between each batch")
-            ("size", optValue(size, "BYTES"), "size of the published messages");
+            ("size", optValue(size, "BYTES"), "size of the published messages")
+            ("status-queue", optValue(statusqueue, "QUEUE-NAME"), "Message queue to read status messages from");
     }
 };
 
@@ -108,11 +110,28 @@
             Connection connection;
             args.open(connection);
             AsyncSession session = connection.newSession();
+
+            // If status-queue is defined, wait for all expected listeners to join in before we start
+            if( args.statusqueue.length() > 0 ) {
+                cout << "Waiting for " << args.subscribers << " listeners..." << endl;
+                SubscriptionManager statusSubs(session);
+                LocalQueue statusQ;
+                statusSubs.subscribe(statusQ, args.statusqueue);
+                for (int i = 0; i < args.subscribers; i++) {
+                    Message m = statusQ.get();
+                    if( m.getData().find("topic_listener: ", 0) == 0 ) {
+                        cout << "Listener " << (i+1) << " of " << args.subscribers
+                             << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16)
+                             << ")" << endl;                        
+                    } else {
+                        throw Exception(QPID_MSG("Unexpected message received on status queue: " << m.getData()));
+                    }                    
+                }
+            }
+
             if (args.transactional) {
                 session.txSelect();
             }
-
-
             session.queueDeclare(arg::queue="response");
             session.exchangeBind(arg::exchange="amq.direct", arg::queue="response", arg::bindingKey="response");
 
@@ -203,4 +222,3 @@
         session.txCommit();
     }
 }
-