You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/27 22:23:22 UTC
svn commit: r598770 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/
qpid/client/ qpid/framing/ qpid/log/ tests/
Author: aconway
Date: Tue Nov 27 13:23:22 2007
New Revision: 598770
URL: http://svn.apache.org/viewvc?rev=598770&view=rev
Log:
perftest improvements.
NOTE: options have changed, see perftest --help.
- Supports multiple publishers.
- Subscribers set credit to receive exactly the expected no. of messages.
- All transfers unconfirmed by default.
client/Connector.cpp: Added connector ID to RECV/SENT logging
client/Completion.h: Added default ctor.
broker/Broker.cpp: --ack defaults to 0 - session acks disabled.
client/SessionCore.cpp: Ignore surplus frames in CLOSING state.
log/Options.cpp: By default log to stdout instead of stderr. Easier to grep.
framing/AMQContentBody.h: Log message content even in NDEBUG mode.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Nov 27 13:23:22 2007
@@ -74,7 +74,7 @@
storeForce(false),
enableMgmt(0),
mgmtPubInterval(10),
- ack(100)
+ ack(0)
{
int c = sys::SystemInfo::concurrency();
if (c > 0) workerThreads=c;
@@ -102,7 +102,7 @@
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
"Management Publish Interval")
("ack", optValue(ack, "N"),
- "Send ack/solicit-ack at least every N frames. 0 disables voluntary acks/solitict-ack");
+ "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack");
}
const std::string empty;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h Tue Nov 27 13:23:22 2007
@@ -36,6 +36,8 @@
shared_ptr<SessionCore> session;
public:
+ Completion() {}
+
Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {}
void sync()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Nov 27 13:23:22 2007
@@ -108,7 +108,7 @@
writeFrameQueue.push(frame);
aio->queueWrite();
- QPID_LOG(trace, "SENT: " << frame);
+ QPID_LOG(trace, "SENT [" << this << "]: " << frame);
}
void Connector::handleClosed() {
@@ -180,8 +180,8 @@
AMQFrame frame;
while(frame.decode(in)){
- QPID_LOG(trace, "RECV: " << frame);
- input->received(frame);
+ QPID_LOG(trace, "RECV [" << this << "]: " << frame);
+ input->received(frame);
}
// TODO: unreading needs to go away, and when we can cope
// with multiple sub-buffers in the general buffer scheme, it will
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Tue Nov 27 13:23:22 2007
@@ -87,7 +87,6 @@
// We can be CLOSED or SUSPENDED by error at any time.
state.waitFor(States(s, CLOSED, SUSPENDED));
check();
- assert(state==s);
invariant();
}
@@ -97,7 +96,8 @@
sync(false),
channel(ch),
proxy(channel),
- state(OPENING)
+ state(OPENING),
+ detachedLifetime(0)
{
l3.out = &out;
attaching(conn);
@@ -166,10 +166,11 @@
static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session.";
-void SessionCore::open(uint32_t detachedLifetime) { // user thread
+void SessionCore::open(uint32_t timeout) { // user thread
Lock l(state);
check(state==OPENING && !session,
COMMAND_INVALID, CANNOT_REOPEN_SESSION);
+ detachedLifetime=timeout;
proxy.open(detachedLifetime);
waitFor(OPEN);
}
@@ -364,8 +365,22 @@
return Future(l3.send(command, content));
}
+namespace {
+bool isCloseResponse(const AMQFrame& frame) {
+ return frame.getMethod() &&
+ frame.getMethod()->amqpClassId() == SESSION_CLASS_ID &&
+ frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID;
+}
+}
+
// Network thread.
void SessionCore::handleIn(AMQFrame& frame) {
+ {
+ Lock l(state);
+ // Ignore frames received while closing other than closed response.
+ if (state==CLOSING && !isCloseResponse(frame))
+ return;
+ }
try {
// Cast to expose private SessionHandler functions.
if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
@@ -382,7 +397,7 @@
{
Lock l(state);
if (state==OPEN) {
- if (session->sent(frame))
+ if (detachedLifetime > 0 && session->sent(frame))
proxy.solicitAck();
channel.handle(frame);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Tue Nov 27 13:23:22 2007
@@ -133,6 +133,7 @@
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session proxy;
mutable StateMonitor state;
+ uint32_t detachedLifetime;
};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp Tue Nov 27 13:23:22 2007
@@ -40,7 +40,5 @@
void qpid::framing::AMQContentBody::print(std::ostream& out) const
{
out << "content (" << size() << " bytes)";
-#ifndef NDEBUG
- out << " " << data.substr(0,10) << "...";
-#endif
+ out << " " << data.substr(0,16) << "...";
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Tue Nov 27 13:23:22 2007
@@ -29,8 +29,8 @@
const std::string& exchange)
{
setData(data);
- getDeliveryProperties().setRoutingKey(routingKey);
- getDeliveryProperties().setExchange(exchange);
+ if (routingKey.size()) getDeliveryProperties().setRoutingKey(routingKey);
+ if (exchange.size()) getDeliveryProperties().setExchange(exchange);
}
AMQHeaderBody TransferContent::getHeader() const
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp Tue Nov 27 13:23:22 2007
@@ -28,7 +28,7 @@
Options::Options(const std::string& name) : qpid::Options(name),
time(true), level(true), thread(false), source(false), function(false), trace(false)
{
- outputs.push_back("stderr");
+ outputs.push_back("stdout");
selectors.push_back("error+");
ostringstream levels;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Tue Nov 27 13:23:22 2007
@@ -28,109 +28,172 @@
#include "qpid/client/Message.h"
#include "qpid/sys/Time.h"
+#include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
#include <iostream>
#include <sstream>
+#include <numeric>
+#include <algorithm>
using namespace std;
using namespace qpid;
using namespace client;
using namespace sys;
+using boost::lexical_cast;
+using boost::bind;
+
+enum Mode { SHARED, FANOUT, TOPIC };
+const char* modeNames[] = { "shared", "fanout", "topic" };
+
+// istream/ostream ops so Options can read/display Mode.
+istream& operator>>(istream& in, Mode& mode) {
+ string s;
+ in >> s;
+ int i = find(modeNames, modeNames+3, s) - modeNames;
+ if (i >= 3) throw Exception("Invalid mode: "+s);
+ mode = Mode(i);
+ return in;
+}
+
+ostream& operator<<(ostream& out, Mode mode) {
+ return out << modeNames[mode];
+}
+
struct Opts : public TestOptions {
- bool listen;
- bool publish;
- bool purge;
- size_t count;
+ // Actions
+ bool setup, control, publish, subscribe;
+
+ // Publisher
+ size_t pubs;
+ size_t count ;
size_t size;
+ bool confirm;
bool durable;
- size_t consumers;
- std::string mode;
- size_t autoAck;
+
+ // Subscriber
+ size_t subs;
+ size_t ack;
+
+ // General
+ size_t qt;
+ Mode mode;
bool summary;
- bool confirmMode;
- bool acquireMode;
Opts() :
- listen(false), publish(false), purge(false),
- count(500000), size(64), consumers(1),
- mode("shared"), autoAck(100),
- summary(false), confirmMode(false), acquireMode(false)
+ setup(false), control(false), publish(false), subscribe(false),
+ pubs(1), count(500000), size(64), confirm(false), durable(false),
+ subs(1), ack(0),
+ qt(1), mode(SHARED), summary(false)
{
- addOptions()
- ("listen", optValue(listen), "Consume messages.")
- ("publish", optValue(publish), "Produce messages.")
- ("purge", optValue(purge), "Purge shared queues.")
- ("count", optValue(count, "N"), "Messages to send.")
- ("size", optValue(size, "BYTES"), "Size of messages.")
+ addOptions()
+ ("setup", optValue(setup), "Create shared queues.")
+ ("control", optValue(control), "Run test, print report.")
+ ("publish", optValue(publish), "Publish messages.")
+ ("subscribe", optValue(subscribe), "Subscribe for messages.")
+
+ ("mode", optValue(mode, "shared|fanout|topic"), "Test mode."
+ "\nshared: --qt queues, --npubs publishers and --nsubs subscribers per queue.\n"
+ "\nfanout: --npubs publishers, --nsubs subscribers, fanout exchange."
+ "\ntopic: --qt topics, --npubs publishers and --nsubs subscribers per topic.\n")
+
+ ("npubs", optValue(pubs, "N"), "Create N publishers.")
+ ("count", optValue(count, "N"), "Each publisher sends N messages.")
+ ("size", optValue(size, "BYTES"), "Size of messages in bytes.")
+ ("pub-confirm", optValue(confirm), "Publisher use confirm-mode.")
("durable", optValue(durable, "N"), "Publish messages as durable.")
- ("consumers", optValue(consumers, "N"), "Number of consumers.")
- ("mode", optValue(mode, "shared|fanout|topic"), "consume mode")
- ("auto-ack", optValue(autoAck, "N"), "ack every N messages.")
- ("summary,s", optValue(summary), "summary output only")
- ("confirm-mode", optValue(confirmMode, "N"), "confirm mode")
- ("acquire-mode", optValue(acquireMode, "N"), "acquire mode (N - pre acquire, Y - no acquire");
+
+ ("nsubs", optValue(subs, "N"), "Create N subscribers.")
+ ("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n"
+ "N==0: Subscriber uses unconfirmed mode")
+
+ ("qt", optValue(qt, "N"), "Create N queues or topics.")
+ ("summary,s", optValue(summary), "Summary output only.");
+ }
+
+ // Computed values
+ size_t totalPubs;
+ size_t totalSubs;
+ size_t transfers;
+ size_t subQuota;
+
+ void parse(int argc, char** argv) {
+ TestOptions::parse(argc, argv);
+ switch (mode) {
+ case SHARED:
+ if (count % subs) {
+ count += subs - (count % subs);
+ cout << "WARNING: Adjusted --count to " << count
+ << " the nearest multiple of --nsubs" << endl;
+ }
+ totalPubs = pubs*qt;
+ totalSubs = subs*qt;
+ subQuota = (pubs*count)/subs;
+ break;
+ case FANOUT:
+ if (qt != 1) cerr << "WARNING: Fanout mode, ignoring --qt="
+ << qt << endl;
+ qt=1;
+ totalPubs = pubs;
+ totalSubs = subs;
+ subQuota = totalPubs*count;
+ break;
+ case TOPIC:
+ totalPubs = pubs*qt;
+ totalSubs = subs*qt;
+ subQuota = pubs*count;
+ break;
+ }
+ transfers=(totalPubs*count) + (totalSubs*subQuota);
}
};
-Opts opts;
-enum Mode { SHARED, FANOUT, TOPIC };
-Mode mode;
-struct ListenThread : public Runnable { Thread thread; void run(); };
-struct PublishThread : public Runnable { Thread thread; void run(); };
+Opts opts;
-// Create and purge the shared queues
-void setup() {
+struct Client : public Runnable {
Connection connection;
- opts.open(connection);
- Session_0_10 session = connection.newSession();
- session.setSynchronous(true); // Make sure this is all completed.
- session.queueDeclare(arg::queue="control"); // Control queue
- if (opts.purge) {
- if (!opts.summary) cout << "Purging shared queues" << endl;
- session.queuePurge(arg::queue="control");
- }
- if (mode==SHARED) {
- session.queueDeclare(arg::queue="perftest", arg::durable=opts.durable); // Shared data queue
- if (opts.purge)
- session.queuePurge(arg::queue="perftest");
- }
- session.close();
- connection.close();
-}
+ Session_0_10 session;
+ Thread thread;
-int main(int argc, char** argv) {
- try {
- opts.parse(argc, argv);
- if (opts.mode=="shared") mode=SHARED;
- else if (opts.mode=="fanout") mode = FANOUT;
- else if (opts.mode=="topic") mode = TOPIC;
- else throw Exception("Invalid mode");
- if (!opts.listen && !opts.publish && !opts.purge)
- opts.listen = opts.publish = opts.purge = true;
- setup();
- std::vector<ListenThread> listen(opts.consumers);
- PublishThread publish;
- if (opts.listen)
- for (size_t i = 0; i < opts.consumers; ++i)
- listen[i].thread=Thread(listen[i]);
- if (opts.publish)
- publish.thread=Thread(publish);
- if (opts.listen)
- for (size_t i = 0; i < opts.consumers; ++i)
- listen[i].thread.join();
- if (opts.publish)
- publish.thread.join();
+ Client() {
+ opts.open(connection);
+ session = connection.newSession();
}
- catch (const std::exception& e) {
- cout << "Unexpected exception: " << e.what() << endl;
+
+ ~Client() {
+ session.close();
+ connection.close();
}
-}
+};
-double secs(Duration d) { return double(d)/TIME_SEC; }
-double secs(AbsTime start, AbsTime finish) { return secs(Duration(start,finish)); }
+struct Setup : public Client {
+
+ void queueInit(string name, bool durable=false) {
+ session.queueDeclare(arg::queue=name, arg::durable=durable);
+ session.queuePurge(arg::queue=name);
+ }
+ void run() {
+ queueInit("pub_start");
+ queueInit("pub_done");
+ queueInit("sub_ready");
+ queueInit("sub_done");
+ if (opts.mode==SHARED) {
+ for (size_t i = 0; i < opts.qt; ++i) {
+ ostringstream qname;
+ qname << "perftest" << i;
+ queueInit(qname.str(), opts.durable);
+ }
+ }
+ // Make sure this is all completed before we return.
+ session.execution().sendSyncRequest();
+ }
+};
void expect(string actual, string expect) {
if (expect != actual)
@@ -138,176 +201,297 @@
}
-const char* exchange() {
- switch (mode) {
- case SHARED: return ""; // Deafult exchange.
- case FANOUT: return "amq.fanout";
- case TOPIC: return "amq.topic";
- }
- assert(0);
- return 0;
+double secs(Duration d) { return double(d)/TIME_SEC; }
+double secs(AbsTime start, AbsTime finish) {
+ return secs(Duration(start,finish));
}
-void PublishThread::run() {
- try {
- Connection connection;
- opts.open(connection);
- Session_0_10 session = connection.newSession();
- // Wait for consumers.
- if (!opts.summary) cout << "Waiting for consumers ready " << flush;
- SubscriptionManager subs(session);
- LocalQueue control;
- subs.subscribe(control, "control");
- for (size_t i = 0; i < opts.consumers; ++i) {
- if (!opts.summary) cout << "." << flush;
- expect(control.pop().getData(), "ready");
+// Collect rates & print stats.
+class Stats {
+ vector<double> values;
+ double sum;
+
+ public:
+ Stats() : sum(0) {}
+
+ // Functor to collect rates.
+ void operator()(const string& data) {
+ double d=lexical_cast<double>(data);
+ values.push_back(d);
+ sum += d;
+ }
+
+ double mean() const {
+ return sum/values.size();
+ }
+
+ double stdev() const {
+ if (values.size() <= 1) return 0;
+ double avg = mean();
+ double ssq = 0;
+ for (vector<double>::const_iterator i = values.begin();
+ i != values.end(); ++i) {
+ double x=*i;
+ x -= avg;
+ ssq += x*x;
}
- if (!opts.summary) cout << endl;
+ return sqrt(ssq/(values.size()-1));
+ }
+
+ ostream& print(ostream& out) {
+ ostream_iterator<double> o(out, "\n");
+ copy(values.begin(), values.end(), o);
+ out << "Average: " << mean();
+ if (values.size() > 1)
+ out << " (std.dev. " << stdev() << ")";
+ return out << endl;
+ }
+};
+
- size_t msgSize=max(opts.size, sizeof(size_t));
- Message msg(string(msgSize, 'X'), "perftest");
- if (opts.durable)
- msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
-
- AbsTime start=now();
- if (!opts.summary) cout << "Publishing " << opts.count
- << " messages " << flush;
- for (size_t i=0; i<opts.count; i++) {
- // Stamp the iteration into the message data, careful to avoid
- // any heap allocation.
- char* data = const_cast<char*>(msg.getData().data());
- *reinterpret_cast<uint32_t*>(data) = i;
- session.messageTransfer(arg::destination=exchange(),
- arg::content=msg, arg::confirmMode=opts.confirmMode,
- arg::acquireMode=opts.acquireMode);
- if (!opts.summary && (i%10000)==0){
- cout << "." << flush;
- session.execution().sendSyncRequest();
- }
+// Manage control queues, collect and print reports.
+struct Controller : public Client {
+
+ SubscriptionManager subs;
+
+ Controller() : subs(session) {}
+
+ /** Process messages from queue by applying a functor. */
+ void process(size_t n, string queue,
+ boost::function<void (const string&)> msgFn)
+ {
+ if (!opts.summary)
+ cout << "Processing " << n << " messages from "
+ << queue << " " << flush;
+ LocalQueue lq;
+ subs.setFlowControl(n, SubscriptionManager::UNLIMITED, false);
+ subs.subscribe(lq, queue);
+ for (size_t i = 0; i < n; ++i) {
+ if (!opts.summary) cout << "." << flush;
+ msgFn(lq.pop().getData());
}
-
- //Completion compl;
if (!opts.summary) cout << " done." << endl;
- msg.setData("done"); // Send done messages.
- if (mode==SHARED)
- for (size_t i = 0; i < opts.consumers; ++i)
- session.messageTransfer(arg::destination=exchange(), arg::content=msg);
- else
- session.messageTransfer(arg::destination=exchange(), arg::content=msg);
- session.execution().sendSyncRequest();
- AbsTime end=now();
+ }
- // Report
- double publish_rate=(opts.count)/secs(start,end);
+ void send(size_t n, string queue, string data) {
if (!opts.summary)
- cout << endl
- << "publish count:" << opts.count << endl
- << "publish secs:" << secs(start,end) << endl
- << "publish rate:" << publish_rate << endl;
-
- double consume_rate = 0; // Average rate for consumers.
- // Wait for consumer(s) to finish.
- if (!opts.summary) cout << "Waiting for consumers done " << endl;
- for (size_t i = 0; i < opts.consumers; ++i) {
- string report=control.pop().getData();
- if (!opts.summary)
- cout << endl << report;
+ cout << "Sending " << data << " " << n << " times to " << queue
+ << endl;
+ Message msg(data, queue);
+ for (size_t i = 0; i < n; ++i)
+ session.messageTransfer(arg::content=msg);
+ }
+
+ void run() { // Controller
+ try {
+ // Wait for subscribers to be ready.
+ process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready"));
+
+ Stats pubRates;
+ Stats subRates;
+
+ AbsTime start=now();
+ send(opts.totalPubs, "pub_start", "start"); // Start publishers
+ process(opts.totalPubs, "pub_done", boost::ref(pubRates));
+ process(opts.totalSubs, "sub_done", boost::ref(subRates));
+ AbsTime end=now();
+ double time=secs(start, end);
+
+ if (!opts.summary) {
+ cout << endl << "Publish rates: " << endl;
+ pubRates.print(cout);
+ cout << endl << "Subscribe rates: " << endl;
+ subRates.print(cout);
+ cout << endl << "Total transfers: " << opts.transfers << endl;
+ cout << "Total time (secs): " << time << endl;
+ cout << "Total rate: " << opts.transfers/time << endl;
+ }
else {
- double rate=boost::lexical_cast<double>(report);
- consume_rate += rate/opts.consumers;
+ cout << pubRates.mean() << "\t"
+ << subRates.mean() << "\t"
+ << opts.transfers/time << endl;
}
}
- end=now();
+ catch (const std::exception& e) {
+ cout << "Controller exception: " << e.what() << endl;
+ exit(1);
+ }
+ }
+};
- // Count total transfers from publisher and to subscribers.
- int transfers;
- if (mode==SHARED) // each message sent/receivd once.
- transfers=2*opts.count;
- else // sent once, received N times.
- transfers=opts.count*(opts.consumers + 1);
- double total_rate=transfers/secs(start, end);
- if (opts.summary)
- cout << opts.mode << '(' << opts.count
- << ':' << opts.consumers << ')'
- << '\t' << publish_rate
- << '\t' << consume_rate
- << '\t' << total_rate
- << endl;
- else
- cout << endl
- << "total transfers:" << transfers << endl
- << "total secs:" << secs(start, end) << endl
- << "total rate:" << total_rate << endl;
-
- connection.close();
+struct PublishThread : public Client {
+ string destination;
+ string routingKey;
+
+ PublishThread() {};
+
+ PublishThread(string key, string dest=string()) {
+ destination=dest;
+ routingKey=key;
}
- catch (const std::exception& e) {
- cout << "PublishThread exception: " << e.what() << endl;
+
+ void run() { // Publisher
+ Completion completion;
+ try {
+ size_t msgSize=max(opts.size, sizeof(size_t));
+ Message msg(string(msgSize, 'X'), routingKey);
+ if (opts.durable)
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+
+ SubscriptionManager subs(session);
+ LocalQueue lq(AckPolicy(opts.ack));
+ subs.setFlowControl(1, SubscriptionManager::UNLIMITED, false);
+ subs.subscribe(lq, "pub_start");
+ expect(lq.pop().getData(), "start");
+
+ AbsTime start=now();
+ for (size_t i=0; i<opts.count; i++) {
+ // Stamp the iteration into the message data, avoid
+ // any heap allocation.
+ char* data = const_cast<char*>(msg.getData().data());
+ *reinterpret_cast<uint32_t*>(data) = i;
+ completion = session.messageTransfer(
+ arg::destination=destination,
+ arg::content=msg,
+ arg::confirmMode=opts.confirm);
+ }
+ if (opts.confirm) completion.sync();
+ AbsTime end=now();
+ double time=secs(start,end);
+
+ // Send result to controller.
+ msg.setData(lexical_cast<string>(opts.count/time));
+ msg.getDeliveryProperties().setRoutingKey("pub_done");
+ session.messageTransfer(arg::content=msg);
+ session.close();
+ }
+ catch (const std::exception& e) {
+ cout << "PublishThread exception: " << e.what() << endl;
+ exit(1);
+ }
+ }
+};
+
+struct SubscribeThread : public Client {
+
+ string queue;
+
+ SubscribeThread() {}
+
+ SubscribeThread(string q) { queue = q; }
+
+ SubscribeThread(string key, string ex) {
+ queue=session.getId().str(); // Unique name.
+ session.queueDeclare(arg::queue=queue,
+ arg::exclusive=true,
+ arg::autoDelete=true,
+ arg::durable=opts.durable);
+ session.queueBind(arg::queue=queue,
+ arg::exchange=ex,
+ arg::routingKey=key);
+ }
+
+ void run() { // Subscribe
+ try {
+ SubscriptionManager subs(session);
+ LocalQueue lq(AckPolicy(opts.ack));
+ subs.setConfirmMode(opts.ack > 0);
+ subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
+ false);
+ subs.subscribe(lq, queue);
+ // Notify controller we are ready.
+ session.messageTransfer(arg::content=Message("ready", "sub_ready"));
+
+ Message msg;
+ AbsTime start=now();
+ for (size_t i = 0; i < opts.subQuota; ++i) {
+ msg=lq.pop();
+ // FIXME aconway 2007-11-23: Verify message sequence numbers.
+ // Need an array of counters, one per publisher and need
+ // publisher ID in the message for multiple publishers.
+ }
+ if (opts.ack !=0)
+ msg.acknowledge(); // Cumulative ack for final batch.
+ AbsTime end=now();
+
+ // FIXME aconway 2007-11-23: close the subscription,
+ // release any pending messages.
+
+ // Report to publisher.
+ Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
+ "sub_done");
+ session.messageTransfer(arg::content=result);
+ session.close();
+ }
+ catch (const std::exception& e) {
+ cout << "Publisher exception: " << e.what() << endl;
+ exit(1);
+ }
+ }
+};
+
+int main(int argc, char** argv) {
+ string exchange;
+ switch (opts.mode) {
+ case FANOUT: exchange="amq.fanout"; break;
+ case TOPIC: exchange="amq.topic"; break;
+ case SHARED: break;
}
-}
-void ListenThread::run() {
try {
- Connection connection;
- opts.open(connection);
- Session_0_10 session = connection.newSession();
+ opts.parse(argc, argv);
+ if (!opts.setup && !opts.control && !opts.publish && !opts.subscribe)
+ opts.setup = opts.control = opts.publish = opts.subscribe = true;
- string consumeQueue;
- if (mode == SHARED) {
- consumeQueue="perftest";
- }
- else {
- consumeQueue=session.getId().str(); // Unique name.
- session.queueDeclare(arg::queue=consumeQueue,
- arg::exclusive=true,
- arg::autoDelete=true,
- arg::durable=opts.durable);
- session.queueBind(arg::queue=consumeQueue,
- arg::exchange=exchange(),
- arg::routingKey="perftest");
- }
- // Notify publisher we are ready.
- session.messageTransfer(arg::content=Message("ready", "control"));
-
- SubscriptionManager subs(session);
- LocalQueue consume(AckPolicy(opts.autoAck));
- subs.setConfirmMode(opts.confirmMode);
- subs.setAcquireMode(opts.acquireMode);
- subs.subscribe(consume, consumeQueue);
- int consumed=0;
- AbsTime start=now();
- Message msg;
- size_t i = 0;
- while ((msg=consume.pop()).getData() != "done") {
- char* data=const_cast<char*>(msg.getData().data());
- size_t j=*reinterpret_cast<size_t*>(data);
- if (i > j)
- throw Exception(
- QPID_MSG("Messages out of order " << i
- << " before " << j));
- else
- i = j;
- ++consumed;
- }
- msg.acknowledge(); // Ack all outstanding messages -- ??
- AbsTime end=now();
-
- // Report to publisher.
- ostringstream report;
- double consume_rate=consumed/secs(start,end);
- if (opts.summary)
- report << consume_rate;
- else
- report << "consume count: " << consumed << endl
- << "consume secs: " << secs(start, end) << endl
- << "consume rate: " << consume_rate << endl;
-
- session.messageTransfer(arg::content=Message(report.str(), "control"));
- connection.close();
+ if (opts.setup) Setup().run(); // Set up queues
+
+ boost::ptr_vector<Client> subs(opts.subs);
+ boost::ptr_vector<Client> pubs(opts.pubs);
+
+ // Start pubs/subs for each queue/topic.
+ for (size_t i = 0; i < opts.qt; ++i) {
+ ostringstream key;
+ key << "perftest" << i; // Queue or topic name.
+ if (opts.publish) {
+ for (size_t j = 0; j < opts.pubs; ++j) {
+ pubs.push_back(new PublishThread(key.str(), exchange));
+ pubs.back().thread=Thread(pubs.back());
+ }
+ }
+ if (opts.subscribe) {
+ for (size_t j = 0; j < opts.subs; ++j) {
+ if (opts.mode==SHARED)
+ subs.push_back(new SubscribeThread(key.str()));
+ else
+ subs.push_back(new SubscribeThread(key.str(),exchange));
+ subs.back().thread=Thread(subs.back());
+ }
+ }
+ }
+
+ if (opts.control) Controller().run();
+
+
+ // Wait for started threads.
+ if (opts.publish) {
+ for (boost::ptr_vector<Client>::iterator i=pubs.begin();
+ i != pubs.end();
+ ++i)
+ i->thread.join();
+ }
+
+
+ if (opts.subscribe) {
+ for (boost::ptr_vector<Client>::iterator i=subs.begin();
+ i != subs.end();
+ ++i)
+ i->thread.join();
+ }
+ return 0;
}
catch (const std::exception& e) {
- cout << "PublishThread exception: " << e.what() << endl;
+ cout << "Unexpected exception: " << e.what() << endl;
+ return 1;
}
}
-