You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/08/18 15:11:08 UTC
svn commit: r805404 - /qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Author: gsim
Date: Tue Aug 18 13:11:08 2009
New Revision: 805404
URL: http://svn.apache.org/viewvc?rev=805404&view=rev
Log:
QPID-2053: Allow queue names to be controlled for perftest (this allows multiple concurrent instances to be run). Based on a proposal and patch from Frantisek Reznicek.
Modified:
qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=805404&r1=805403&r2=805404&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/perftest.cpp Tue Aug 18 13:11:08 2009
@@ -75,6 +75,7 @@
// Queue policy
uint32_t queueMaxCount;
uint64_t queueMaxSize;
+ std::string baseName;
bool queueDurable;
// Publisher
@@ -106,8 +107,8 @@
static const std::string helpText;
Opts() :
- TestOptions(helpText),
- setup(false), control(false), publish(false), subscribe(false),
+ TestOptions(helpText),
+ setup(false), control(false), publish(false), subscribe(false), baseName("perftest"),
pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
subs(1), ack(0),
qt(1),singleConnect(false), iterations(1), mode(SHARED), summary(false),
@@ -144,6 +145,7 @@
("queue-max-count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'")
("queue-max-size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'")
+ ("base-name", optValue(baseName, "NAME"), "base name used for queues or topics")
("queue-durable", optValue(queueDurable, "N"), "Make queue durable (implied if durable set)")
("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
@@ -219,6 +221,13 @@
Opts opts;
Connection globalConnection;
+std::string fqn(const std::string& name)
+{
+ ostringstream fqn;
+ fqn << opts.baseName << "_" << name;
+ return fqn.str();
+}
+
struct Client : public Runnable {
Connection* connection;
Connection localConnection;
@@ -257,18 +266,18 @@
}
void run() {
- queueInit("pub_start");
- queueInit("pub_done");
- queueInit("sub_ready");
- queueInit("sub_done");
- if (opts.iterations > 1) queueInit("sub_iteration");
+ queueInit(fqn("pub_start"));
+ queueInit(fqn("pub_done"));
+ queueInit(fqn("sub_ready"));
+ queueInit(fqn("sub_done"));
+ if (opts.iterations > 1) queueInit(fqn("sub_iteration"));
if (opts.mode==SHARED) {
framing::FieldTable settings;//queue policy settings
settings.setInt("qpid.max_count", opts.queueMaxCount);
settings.setInt("qpid.max_size", opts.queueMaxSize);
for (size_t i = 0; i < opts.qt; ++i) {
ostringstream qname;
- qname << "perftest" << i;
+ qname << opts.baseName << i;
queueInit(qname.str(), opts.durable || opts.queueDurable, settings);
}
}
@@ -384,13 +393,13 @@
void run() { // Controller
try {
// Wait for subscribers to be ready.
- process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready"));
+ process(opts.totalSubs, fqn("sub_ready"), bind(expect, _1, "ready"));
LocalQueue pubDone;
LocalQueue subDone;
subs.setFlowControl(0, SubscriptionManager::UNLIMITED, false);
- subs.subscribe(pubDone, "pub_done");
- subs.subscribe(subDone, "sub_done");
+ subs.subscribe(pubDone, fqn("pub_done"));
+ subs.subscribe(subDone, fqn("sub_done"));
double txrateTotal(0);
double mbytesTotal(0);
@@ -399,16 +408,16 @@
for (size_t j = 0; j < opts.iterations; ++j) {
AbsTime start=now();
- send(opts.totalPubs, "pub_start", "start"); // Start publishers
+ send(opts.totalPubs, fqn("pub_start"), "start"); // Start publishers
if (j) {
- send(opts.totalPubs, "sub_iteration", "next"); // Start subscribers on next iteration
+ send(opts.totalPubs, fqn("sub_iteration"), "next"); // Start subscribers on next iteration
}
Stats pubRates;
Stats subRates;
- process(opts.totalPubs, pubDone, "pub_done", boost::ref(pubRates));
- process(opts.totalSubs, subDone, "sub_done", boost::ref(subRates));
+ process(opts.totalPubs, pubDone, fqn("pub_done"), boost::ref(pubRates));
+ process(opts.totalSubs, subDone, fqn("sub_done"), boost::ref(subRates));
AbsTime end=now();
@@ -497,7 +506,7 @@
SubscriptionManager subs(session);
LocalQueue lq;
subs.setFlowControl(1, SubscriptionManager::UNLIMITED, true);
- subs.subscribe(lq, "pub_start");
+ subs.subscribe(lq, fqn("pub_start"));
for (size_t j = 0; j < opts.iterations; ++j) {
expect(lq.pop().getData(), "start");
@@ -533,7 +542,7 @@
double time=secs(start,end);
// Send result to controller.
- Message report(lexical_cast<string>(opts.count/time), "pub_done");
+ Message report(lexical_cast<string>(opts.count/time), fqn("pub_done"));
session.messageTransfer(arg::content=report, arg::acceptMode=1);
if (opts.txPub){
sync(session).txCommit();
@@ -587,7 +596,7 @@
LocalQueue lq;
Subscription subscription = subs.subscribe(lq, queue, settings);
// Notify controller we are ready.
- session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
+ session.messageTransfer(arg::content=Message("ready", fqn("sub_ready")), arg::acceptMode=1);
if (opts.txSub) {
if (opts.commitAsync) session.txCommit();
else sync(session).txCommit();
@@ -595,13 +604,13 @@
LocalQueue iterationControl;
if (opts.iterations > 1) {
- subs.subscribe(iterationControl, "sub_iteration", SubscriptionSettings(FlowControl::messageCredit(0)));
+ subs.subscribe(iterationControl, fqn("sub_iteration"), SubscriptionSettings(FlowControl::messageCredit(0)));
}
for (size_t j = 0; j < opts.iterations; ++j) {
if (j > 0) {
//need to wait here until all subs are done
- session.messageFlow("sub_iteration", 0, 1);
+ session.messageFlow(fqn("sub_iteration"), 0, 1);
iterationControl.pop();
//need to allocate some more credit for subscription
@@ -643,7 +652,7 @@
// Report to publisher.
Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
- "sub_done");
+ fqn("sub_done"));
session.messageTransfer(arg::content=result, arg::acceptMode=1);
if (opts.txSub) sync(session).txCommit();
}
@@ -680,7 +689,7 @@
// Start pubs/subs for each queue/topic.
for (size_t i = 0; i < opts.qt; ++i) {
ostringstream key;
- key << "perftest" << i; // Queue or topic name.
+ key << opts.baseName << i; // Queue or topic name.
if (opts.publish) {
size_t n = singleProcess ? opts.pubs : 1;
for (size_t j = 0; j < n; ++j) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org