You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/08 00:07:52 UTC
svn commit: r592941 - /incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Author: aconway
Date: Wed Nov 7 15:07:51 2007
New Revision: 592941
URL: http://svn.apache.org/viewvc?rev=592941&view=rev
Log:
Fix race condition in perftest.
Modified:
incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=592941&r1=592940&r2=592941&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Wed Nov 7 15:07:51 2007
@@ -68,7 +68,24 @@
struct ListenThread : public Runnable { Thread thread; void run(); };
struct PublishThread : public Runnable { Thread thread; void run(); };
-
+
+// Create and purge the shared queues
+void setup() {
+ cout << "Create shared queues" << endl;
+ Connection connection;
+ opts.open(connection);
+ Session_0_10 session = connection.newSession();
+ session.setSynchronous(true); // Make sure this is all completed.
+ session.queueDeclare(arg::queue="control"); // Control queue
+ session.queuePurge(arg::queue="control");
+ if (mode==SHARED) {
+ session.queueDeclare(arg::queue="perftest"); // Shared data queue
+ session.queuePurge(arg::queue="perftest");
+ }
+ session.close();
+ connection.close();
+}
+
int main(int argc, char** argv) {
try {
opts.parse(argc, argv);
@@ -78,6 +95,7 @@
else throw Exception("Invalid mode");
if (!opts.listen && !opts.publish)
opts.listen = opts.publish = true;
+ setup();
std::vector<ListenThread> listen(opts.consumers);
PublishThread publish;
if (opts.listen)
@@ -122,19 +140,16 @@
opts.open(connection);
Session_0_10 session = connection.newSession();
- session.queueDeclare(arg::queue="control"); // Control queue
- session.queuePurge(arg::queue="control");
- if (mode==SHARED) {
- session.queueDeclare(arg::queue="perftest"); // Shared data queue
- session.queuePurge(arg::queue="perftest");
- }
-
// Wait for consumers.
+ cout << "Publisher wating for consumers " << flush;
SubscriptionManager subs(session);
LocalQueue control;
subs.subscribe(control, "control");
- for (int i = 0; i < opts.consumers; ++i)
+ for (int i = 0; i < opts.consumers; ++i) {
+ cout << "." << flush;
expect(control.pop().getData(), "ready");
+ }
+ cout << endl;
// Create test message
size_t msgSize=max(opts.size, 32);
@@ -167,6 +182,7 @@
cout << "publish rate:" << (opts.count)/secs(start,end) << endl;
// Wait for consumer(s) to finish.
+ cout << "Publisher wating for consumer reports. " << endl;
for (int i = 0; i < opts.consumers; ++i) {
string report=control.pop().getData();
if (report.find("consume") != 0)
@@ -201,14 +217,11 @@
Session_0_10 session = connection.newSession();
string consumeQueue;
- switch (mode) {
- case SHARED:
+ if (mode == SHARED) {
consumeQueue="perftest";
- session.queueDeclare(arg::queue="perftest");
- break;
- case FANOUT:
- case TOPIC:
- consumeQueue=session.getId().str(); // Unique
+ }
+ else {
+ consumeQueue=session.getId().str(); // Unique name.
session.queueDeclare(arg::queue=consumeQueue,
arg::exclusive=true,
arg::autoDelete=true);
@@ -217,7 +230,6 @@
arg::routingKey="perftest");
}
// Notify publisher we are ready.
- session.queueDeclare(arg::queue="control"); // Control queue
session.messageTransfer(arg::content=Message("ready", "control"));
SubscriptionManager subs(session);
@@ -226,8 +238,15 @@
int consumed=0;
AbsTime start=now();
Message msg;
- while ((msg=consume.pop()).getData() != "done")
+ if (!opts.publish)
+ cout << "Consuming " << flush;
+ while ((msg=consume.pop()).getData() != "done") {
++consumed;
+ if (!opts.publish && (consumed%10000) == 0)
+ cout << "." << flush;
+ }
+ if (!opts.publish)
+ cout << endl;
msg.acknowledge(); // Ack all outstanding messages.
AbsTime end=now();