You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/27 22:23:22 UTC

svn commit: r598770 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/client/ qpid/framing/ qpid/log/ tests/

Author: aconway
Date: Tue Nov 27 13:23:22 2007
New Revision: 598770

URL: http://svn.apache.org/viewvc?rev=598770&view=rev
Log:

perftest improvements.
NOTE: options have changed, see perftest --help.
 - Supports multiple publishers.
 - Subscribers set credit to receive exactly the expected no. of messages.
 - All transfers unconfirmed by default.

client/Connector.cpp: Added connector ID to RECV/SENT logging

client/Completion.h: Added default ctor.

broker/Broker.cpp: --ack defaults to 0 - session acks disabled.

client/SessionCore.cpp: Ignore surplus frames in CLOSING state.

log/Options.cpp: By default log to stdout instead of stderr. Easier to grep.

framing/AMQContentBody.h: Log message content even in NDEBUG mode.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Nov 27 13:23:22 2007
@@ -74,7 +74,7 @@
     storeForce(false),
     enableMgmt(0),
     mgmtPubInterval(10),
-    ack(100)
+    ack(0)
 {
     int c = sys::SystemInfo::concurrency();
     if (c > 0) workerThreads=c;
@@ -102,7 +102,7 @@
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
          "Management Publish Interval")
         ("ack", optValue(ack, "N"),
-         "Send ack/solicit-ack at least every N frames. 0 disables voluntary acks/solitict-ack");
+         "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack");
 }
 
 const std::string empty;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Completion.h Tue Nov 27 13:23:22 2007
@@ -36,6 +36,8 @@
     shared_ptr<SessionCore> session;
 
 public:
+    Completion() {}
+
     Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {}
 
     void sync()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Tue Nov 27 13:23:22 2007
@@ -108,7 +108,7 @@
     writeFrameQueue.push(frame);
     aio->queueWrite();
 
-    QPID_LOG(trace, "SENT: " << frame);
+    QPID_LOG(trace, "SENT [" << this << "]: " << frame);
 }
 
 void Connector::handleClosed() {
@@ -180,8 +180,8 @@
 
 	AMQFrame frame;
 	while(frame.decode(in)){
-	    QPID_LOG(trace, "RECV: " << frame);
-		input->received(frame);
+	    QPID_LOG(trace, "RECV [" << this << "]: " << frame);
+            input->received(frame);
 	}
 	// TODO: unreading needs to go away, and when we can cope
 	// with multiple sub-buffers in the general buffer scheme, it will

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Tue Nov 27 13:23:22 2007
@@ -87,7 +87,6 @@
     // We can be CLOSED or SUSPENDED by error at any time.
     state.waitFor(States(s, CLOSED, SUSPENDED));
     check();
-    assert(state==s);
     invariant();
 }
 
@@ -97,7 +96,8 @@
       sync(false),
       channel(ch),
       proxy(channel),
-      state(OPENING)
+      state(OPENING),
+      detachedLifetime(0)
 {
     l3.out = &out;
     attaching(conn);
@@ -166,10 +166,11 @@
 
 static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session.";
 
-void SessionCore::open(uint32_t detachedLifetime) { // user thread
+void SessionCore::open(uint32_t timeout) { // user thread
     Lock l(state);
     check(state==OPENING && !session,
           COMMAND_INVALID, CANNOT_REOPEN_SESSION);
+    detachedLifetime=timeout;
     proxy.open(detachedLifetime);
     waitFor(OPEN);
 }
@@ -364,8 +365,22 @@
     return Future(l3.send(command, content));
 }
 
+namespace {
+bool isCloseResponse(const AMQFrame& frame) {
+    return frame.getMethod() &&
+        frame.getMethod()->amqpClassId() == SESSION_CLASS_ID &&
+        frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID;
+}
+}
+
 // Network thread.
 void SessionCore::handleIn(AMQFrame& frame) {
+    {
+        Lock l(state);
+        // Ignore frames received while closing other than closed response.
+        if (state==CLOSING && !isCloseResponse(frame))
+            return;             
+    }
     try {
         // Cast to expose private SessionHandler functions.
         if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
@@ -382,7 +397,7 @@
 {
     Lock l(state);
     if (state==OPEN) {
-        if (session->sent(frame)) 
+        if (detachedLifetime > 0 && session->sent(frame)) 
             proxy.solicitAck();
         channel.handle(frame);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h Tue Nov 27 13:23:22 2007
@@ -133,6 +133,7 @@
     framing::ChannelHandler channel;
     framing::AMQP_ServerProxy::Session proxy;
     mutable StateMonitor state;
+    uint32_t detachedLifetime;
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp Tue Nov 27 13:23:22 2007
@@ -40,7 +40,5 @@
 void qpid::framing::AMQContentBody::print(std::ostream& out) const
 {
     out << "content (" << size() << " bytes)";
-#ifndef NDEBUG
-    out << " " << data.substr(0,10) << "...";
-#endif
+    out << " " << data.substr(0,16) << "...";
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Tue Nov 27 13:23:22 2007
@@ -29,8 +29,8 @@
                                  const std::string& exchange)
 {
     setData(data);
-    getDeliveryProperties().setRoutingKey(routingKey);
-    getDeliveryProperties().setExchange(exchange);
+    if (routingKey.size()) getDeliveryProperties().setRoutingKey(routingKey);
+    if (exchange.size()) getDeliveryProperties().setExchange(exchange);
 }
 
 AMQHeaderBody TransferContent::getHeader() const

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp Tue Nov 27 13:23:22 2007
@@ -28,7 +28,7 @@
 Options::Options(const std::string& name) : qpid::Options(name),
     time(true), level(true), thread(false), source(false), function(false), trace(false)
 {
-    outputs.push_back("stderr");
+    outputs.push_back("stdout");
     selectors.push_back("error+");
 
     ostringstream levels;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=598770&r1=598769&r2=598770&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Tue Nov 27 13:23:22 2007
@@ -28,109 +28,172 @@
 #include "qpid/client/Message.h"
 #include "qpid/sys/Time.h"
 
+#include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
 #include <iostream>
 #include <sstream>
+#include <numeric>
+#include <algorithm>
 
 using namespace std;
 using namespace qpid;
 using namespace client;
 using namespace sys;
+using boost::lexical_cast;
+using boost::bind;
+
+enum Mode { SHARED, FANOUT, TOPIC };
+const char* modeNames[] = { "shared", "fanout", "topic" };
+
+// istream/ostream ops so Options can read/display Mode.
+istream& operator>>(istream& in, Mode& mode) {
+    string s;
+    in >> s;
+    int i = find(modeNames, modeNames+3, s) - modeNames;
+    if (i >= 3)  throw Exception("Invalid mode: "+s);
+    mode = Mode(i);
+    return in;
+}
+
+ostream& operator<<(ostream& out, Mode mode) {
+    return out << modeNames[mode];
+}
+
 
 struct Opts : public TestOptions {
 
-    bool listen;
-    bool publish;
-    bool purge;
-    size_t count;
+    // Actions
+    bool setup, control, publish, subscribe;
+
+    // Publisher
+    size_t pubs;
+    size_t count ;
     size_t size;
+    bool confirm;
     bool durable;
-    size_t consumers;
-    std::string mode;
-    size_t autoAck;
+
+    // Subscriber
+    size_t subs;
+    size_t ack;
+
+    // General
+    size_t qt;
+    Mode mode;
     bool summary;
-	bool confirmMode;
-	bool acquireMode;
     
     Opts() :
-        listen(false), publish(false), purge(false),
-        count(500000), size(64), consumers(1),
-        mode("shared"), autoAck(100),
-        summary(false), confirmMode(false), acquireMode(false)
+        setup(false), control(false), publish(false), subscribe(false),
+        pubs(1), count(500000), size(64), confirm(false), durable(false),
+        subs(1), ack(0),
+        qt(1), mode(SHARED), summary(false)
     {
-        addOptions() 
-            ("listen", optValue(listen), "Consume messages.")
-            ("publish", optValue(publish), "Produce messages.")
-            ("purge", optValue(purge), "Purge shared queues.")
-            ("count", optValue(count, "N"), "Messages to send.")
-            ("size", optValue(size, "BYTES"), "Size of messages.")
+        addOptions()
+            ("setup", optValue(setup), "Create shared queues.")
+            ("control", optValue(control), "Run test, print report.")
+            ("publish", optValue(publish), "Publish messages.")
+            ("subscribe", optValue(subscribe), "Subscribe for messages.")
+
+            ("mode", optValue(mode, "shared|fanout|topic"), "Test mode."
+             "\nshared: --qt queues, --npubs publishers and --nsubs subscribers per queue.\n"
+             "\nfanout: --npubs publishers, --nsubs subscribers, fanout exchange."
+             "\ntopic: --qt topics, --npubs publishers and --nsubs subscribers per topic.\n")
+
+            ("npubs", optValue(pubs, "N"), "Create N publishers.")
+            ("count", optValue(count, "N"), "Each publisher sends N messages.")
+            ("size", optValue(size, "BYTES"), "Size of messages in bytes.")
+            ("pub-confirm", optValue(confirm), "Publisher use confirm-mode.")
             ("durable", optValue(durable, "N"), "Publish messages as durable.")
-            ("consumers", optValue(consumers, "N"), "Number of consumers.")
-            ("mode", optValue(mode, "shared|fanout|topic"), "consume mode")
-            ("auto-ack", optValue(autoAck, "N"), "ack every N messages.")
-            ("summary,s", optValue(summary), "summary output only")
-            ("confirm-mode", optValue(confirmMode, "N"), "confirm mode")
-            ("acquire-mode", optValue(acquireMode, "N"), "acquire mode (N - pre acquire, Y - no acquire");
+
+            ("nsubs", optValue(subs, "N"), "Create N subscribers.")
+            ("sub-ack", optValue(ack, "N"), "N>0: Subscriber acks batches of N.\n"
+             "N==0: Subscriber uses unconfirmed mode")
+            
+            ("qt", optValue(qt, "N"), "Create N queues or topics.")
+            ("summary,s", optValue(summary), "Summary output only.");
+    }
+
+    // Computed values
+    size_t totalPubs;
+    size_t totalSubs;
+    size_t transfers;
+    size_t subQuota;
+
+    void parse(int argc, char** argv) {
+        TestOptions::parse(argc, argv);
+        switch (mode) {
+          case SHARED:
+            if (count % subs) {
+                count += subs - (count % subs);
+                cout << "WARNING: Adjusted --count to " << count
+                     << " the nearest multiple of --nsubs" << endl;
+            }                    
+            totalPubs = pubs*qt;
+            totalSubs = subs*qt;
+            subQuota = (pubs*count)/subs;
+            break;
+          case FANOUT:
+            if (qt != 1) cerr << "WARNING: Fanout mode, ignoring --qt="
+                              << qt << endl;
+            qt=1;
+            totalPubs = pubs;
+            totalSubs = subs;
+            subQuota = totalPubs*count;
+            break;
+          case TOPIC:
+            totalPubs = pubs*qt;
+            totalSubs = subs*qt;
+            subQuota = pubs*count;
+            break;
+        }
+        transfers=(totalPubs*count) + (totalSubs*subQuota);
     }
 };
 
-Opts opts;
-enum Mode { SHARED, FANOUT, TOPIC };
-Mode mode;
 
-struct ListenThread : public Runnable { Thread thread; void run(); };
-struct PublishThread : public Runnable { Thread thread; void run(); };
+Opts opts;
 
-// Create and purge the shared queues 
-void setup() {
+struct Client : public Runnable {
     Connection connection;
-    opts.open(connection);
-    Session_0_10 session = connection.newSession();
-    session.setSynchronous(true); // Make sure this is all completed.
-    session.queueDeclare(arg::queue="control"); // Control queue
-    if (opts.purge) {
-        if (!opts.summary) cout << "Purging shared queues" << endl;
-        session.queuePurge(arg::queue="control");
-    }
-    if (mode==SHARED) {
-        session.queueDeclare(arg::queue="perftest", arg::durable=opts.durable); // Shared data queue
-        if (opts.purge)		
-            session.queuePurge(arg::queue="perftest");
-    }
-    session.close();
-    connection.close();
-}
+    Session_0_10 session;
+    Thread thread;
 
-int main(int argc, char** argv) {
-    try {
-        opts.parse(argc, argv);
-        if (opts.mode=="shared") mode=SHARED;
-        else if (opts.mode=="fanout") mode = FANOUT;
-        else if (opts.mode=="topic") mode = TOPIC;
-        else throw Exception("Invalid mode");
-        if (!opts.listen && !opts.publish && !opts.purge)
-            opts.listen = opts.publish = opts.purge = true;
-        setup();
-        std::vector<ListenThread> listen(opts.consumers);
-        PublishThread publish;
-        if (opts.listen) 
-            for (size_t i = 0; i < opts.consumers; ++i)
-                listen[i].thread=Thread(listen[i]);
-        if (opts.publish)
-            publish.thread=Thread(publish);
-        if (opts.listen)
-            for (size_t i = 0; i < opts.consumers; ++i)
-                listen[i].thread.join();
-        if (opts.publish)
-            publish.thread.join();
+    Client() {
+        opts.open(connection);
+        session = connection.newSession();
     }
-    catch (const std::exception& e) {
-        cout << "Unexpected exception: " << e.what() << endl;
+
+    ~Client() {
+        session.close();
+        connection.close();
     }
-}
+};
 
-double secs(Duration d) { return double(d)/TIME_SEC; }
-double secs(AbsTime start, AbsTime finish) { return secs(Duration(start,finish)); }
+struct Setup : public Client {
+    
+    void queueInit(string name, bool durable=false) {
+        session.queueDeclare(arg::queue=name, arg::durable=durable);
+        session.queuePurge(arg::queue=name);
+    }
 
+    void run() {
+        queueInit("pub_start");
+        queueInit("pub_done");
+        queueInit("sub_ready");
+        queueInit("sub_done");
+        if (opts.mode==SHARED) {
+            for (size_t i = 0; i < opts.qt; ++i) {
+                ostringstream qname;
+                qname << "perftest" << i;
+                queueInit(qname.str(), opts.durable); 
+            }
+        }
+        // Make sure this is all completed before we return.
+        session.execution().sendSyncRequest();
+    }
+};
 
 void expect(string actual, string expect) {
     if (expect != actual)
@@ -138,176 +201,297 @@
 
 }
 
-const char* exchange() {
-    switch (mode) {
-      case SHARED: return "";   // Deafult exchange.
-      case FANOUT: return "amq.fanout"; 
-      case TOPIC: return "amq.topic"; 
-    }
-    assert(0);
-    return 0;
+double secs(Duration d) { return double(d)/TIME_SEC; }
+double secs(AbsTime start, AbsTime finish) {
+    return secs(Duration(start,finish));
 }
 
-void PublishThread::run() {
-    try {
-        Connection connection;
-        opts.open(connection);
-        Session_0_10 session = connection.newSession();
 
-        // Wait for consumers.
-        if (!opts.summary) cout << "Waiting for consumers ready " << flush;
-        SubscriptionManager subs(session);
-        LocalQueue control;
-        subs.subscribe(control, "control");
-        for (size_t i = 0; i < opts.consumers; ++i) {
-            if (!opts.summary) cout << "." << flush;
-            expect(control.pop().getData(), "ready");
+// Collect rates & print stats.
+class Stats {
+    vector<double> values;
+    double sum;
+
+  public:
+    Stats() : sum(0) {}
+    
+    // Functor to collect rates.
+    void operator()(const string& data) {
+        double d=lexical_cast<double>(data);
+        values.push_back(d);
+        sum += d;
+    }
+    
+    double mean() const {
+        return sum/values.size();
+    }
+
+    double stdev() const {
+        if (values.size() <= 1) return 0;
+        double avg = mean();
+        double ssq = 0;
+        for (vector<double>::const_iterator i = values.begin();
+             i != values.end(); ++i) {
+            double x=*i;
+            x -= avg;
+            ssq += x*x;
         }
-        if (!opts.summary) cout << endl;
+        return sqrt(ssq/(values.size()-1));
+    }
+    
+    ostream& print(ostream& out) {
+        ostream_iterator<double> o(out, "\n");
+        copy(values.begin(), values.end(), o);
+        out << "Average: " << mean();
+        if (values.size() > 1)
+            out << " (std.dev. " << stdev() << ")";
+        return out << endl;
+    }
+};
+    
 
-        size_t msgSize=max(opts.size, sizeof(size_t));
-        Message msg(string(msgSize, 'X'), "perftest");
-        if (opts.durable)
-	    msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
-
-        AbsTime start=now();
-        if (!opts.summary) cout << "Publishing " << opts.count
-                                << " messages " << flush;
-        for (size_t i=0; i<opts.count; i++) {
-            // Stamp the iteration into the message data, careful to avoid
-            // any heap allocation.
-            char* data = const_cast<char*>(msg.getData().data());
-            *reinterpret_cast<uint32_t*>(data) = i;
-            session.messageTransfer(arg::destination=exchange(),
-                                    arg::content=msg, arg::confirmMode=opts.confirmMode,
-									arg::acquireMode=opts.acquireMode);
-            if (!opts.summary && (i%10000)==0){
-                 cout << "." << flush;
-                 session.execution().sendSyncRequest();
-            }
+// Manage control queues, collect and print reports.
+struct Controller : public Client {
+ 
+   SubscriptionManager subs;
+
+    Controller() : subs(session) {}
+
+    /** Process messages from queue by applying a functor. */
+    void process(size_t n, string queue,
+                 boost::function<void (const string&)> msgFn)
+    {
+        if (!opts.summary) 
+            cout << "Processing " << n << " messages from "
+                 << queue << " " << flush;
+        LocalQueue lq;
+        subs.setFlowControl(n, SubscriptionManager::UNLIMITED, false);
+        subs.subscribe(lq, queue);
+        for (size_t i = 0; i < n; ++i) {
+            if (!opts.summary) cout << "." << flush;
+            msgFn(lq.pop().getData());
         }
-		
-        //Completion compl;
         if (!opts.summary) cout << " done." << endl;
-        msg.setData("done");    // Send done messages.
-        if (mode==SHARED)
-            for (size_t i = 0; i < opts.consumers; ++i)
-                 session.messageTransfer(arg::destination=exchange(), arg::content=msg);
-        else
-            session.messageTransfer(arg::destination=exchange(), arg::content=msg);
-        session.execution().sendSyncRequest();
-        AbsTime end=now();
+    }
 
-        // Report
-        double publish_rate=(opts.count)/secs(start,end);
+    void send(size_t n, string queue, string data) {
         if (!opts.summary)
-            cout << endl
-                 << "publish count:" << opts.count << endl
-                 << "publish secs:" << secs(start,end) << endl
-                 << "publish rate:" << publish_rate << endl;
-        
-        double consume_rate = 0; // Average rate for consumers.
-        //  Wait for consumer(s) to finish.
-        if (!opts.summary) cout << "Waiting for consumers done " << endl;
-        for (size_t i = 0; i < opts.consumers; ++i) {
-            string report=control.pop().getData();
-            if (!opts.summary)
-                cout << endl << report;
+            cout << "Sending " << data << " " << n << " times to " << queue
+                 << endl;
+        Message msg(data, queue);
+        for (size_t i = 0; i < n; ++i) 
+            session.messageTransfer(arg::content=msg);
+    }
+
+    void run() {                // Controller
+        try {
+            // Wait for subscribers to be ready.
+            process(opts.totalSubs, "sub_ready", bind(expect, _1, "ready"));
+
+            Stats pubRates;
+            Stats subRates;
+
+            AbsTime start=now();
+            send(opts.totalPubs, "pub_start", "start"); // Start publishers
+            process(opts.totalPubs, "pub_done", boost::ref(pubRates));
+            process(opts.totalSubs, "sub_done", boost::ref(subRates));
+            AbsTime end=now(); 
+            double time=secs(start, end);
+
+            if (!opts.summary) {
+                cout << endl << "Publish rates: " << endl;
+                pubRates.print(cout);
+                cout << endl << "Subscribe rates: " << endl;
+                subRates.print(cout);
+                cout << endl << "Total transfers: " << opts.transfers << endl;
+                cout << "Total time (secs): " << time << endl;
+                cout << "Total rate: " << opts.transfers/time << endl;
+            }
             else {
-                double rate=boost::lexical_cast<double>(report);
-                consume_rate += rate/opts.consumers;
+                cout << pubRates.mean() << "\t"
+                     << subRates.mean() << "\t"
+                     << opts.transfers/time << endl;
             }
         }
-        end=now();
+        catch (const std::exception& e) {
+            cout << "Controller exception: " << e.what() << endl;
+            exit(1);
+        }
+    }
+};
 
-        // Count total transfers from publisher and to subscribers.
-        int transfers;
-        if (mode==SHARED)       // each message sent/receivd once.
-            transfers=2*opts.count; 
-        else                    // sent once, received N times.
-            transfers=opts.count*(opts.consumers + 1);
-        double total_rate=transfers/secs(start, end);
-        if (opts.summary)
-            cout << opts.mode << '(' << opts.count
-                 << ':' << opts.consumers << ')'
-                 << '\t' << publish_rate
-                 << '\t' << consume_rate
-                 << '\t' << total_rate
-                 << endl;
-        else
-            cout << endl
-                 << "total transfers:" << transfers << endl
-                 << "total secs:" << secs(start, end) << endl
-                 << "total rate:" << total_rate << endl;
-		
-        connection.close();
+struct PublishThread : public Client {
+    string destination;
+    string routingKey;
+
+    PublishThread() {};
+    
+    PublishThread(string key, string dest=string()) {
+        destination=dest;
+        routingKey=key;
     }
-    catch (const std::exception& e) {
-        cout << "PublishThread exception: " << e.what() << endl;
+    
+    void run() {                // Publisher
+        Completion completion;
+        try {
+            size_t msgSize=max(opts.size, sizeof(size_t));
+            Message msg(string(msgSize, 'X'), routingKey);
+            if (opts.durable)
+                msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+
+            SubscriptionManager subs(session);
+            LocalQueue lq(AckPolicy(opts.ack));
+            subs.setFlowControl(1, SubscriptionManager::UNLIMITED, false); 
+            subs.subscribe(lq, "pub_start"); 
+            expect(lq.pop().getData(), "start");
+            
+            AbsTime start=now();
+            for (size_t i=0; i<opts.count; i++) {
+                // Stamp the iteration into the message data, avoid
+                // any heap allocation.
+                char* data = const_cast<char*>(msg.getData().data());
+                *reinterpret_cast<uint32_t*>(data) = i;
+                completion = session.messageTransfer(
+                    arg::destination=destination,
+                    arg::content=msg,
+                    arg::confirmMode=opts.confirm);
+            }
+            if (opts.confirm) completion.sync();
+            AbsTime end=now();
+            double time=secs(start,end);
+
+            // Send result to controller.
+            msg.setData(lexical_cast<string>(opts.count/time));
+            msg.getDeliveryProperties().setRoutingKey("pub_done");
+            session.messageTransfer(arg::content=msg);
+            session.close();
+        }
+        catch (const std::exception& e) {
+            cout << "PublishThread exception: " << e.what() << endl;
+            exit(1);
+        }
+    }
+};
+
+struct SubscribeThread : public Client {
+
+    string queue;
+
+    SubscribeThread() {}
+    
+    SubscribeThread(string q) { queue = q; }
+
+    SubscribeThread(string key, string ex) {
+        queue=session.getId().str(); // Unique name.
+        session.queueDeclare(arg::queue=queue,
+                             arg::exclusive=true,
+                             arg::autoDelete=true,
+                             arg::durable=opts.durable);
+        session.queueBind(arg::queue=queue,
+                          arg::exchange=ex,
+                          arg::routingKey=key);
+    }
+    
+    void run() {                // Subscribe
+        try {
+            SubscriptionManager subs(session);
+            LocalQueue lq(AckPolicy(opts.ack));
+            subs.setConfirmMode(opts.ack > 0);
+            subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
+                                false);
+            subs.subscribe(lq, queue);
+            // Notify controller we are ready.
+            session.messageTransfer(arg::content=Message("ready", "sub_ready"));
+
+            Message msg;
+            AbsTime start=now();
+            for (size_t i = 0; i < opts.subQuota; ++i) {
+                msg=lq.pop();
+                // FIXME aconway 2007-11-23: Verify message sequence numbers.
+                // Need an array of counters, one per publisher and need
+                // publisher ID in the message for multiple publishers.
+            }
+            if (opts.ack !=0)
+                msg.acknowledge(); // Cumulative ack for final batch.
+            AbsTime end=now();
+
+            // FIXME aconway 2007-11-23: close the subscription,
+            // release any pending messages.
+
+            // Report to publisher.
+            Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
+                           "sub_done");
+            session.messageTransfer(arg::content=result);
+            session.close();
+        }
+        catch (const std::exception& e) {
+            cout << "Publisher exception: " << e.what() << endl;
+            exit(1);
+        }
+    }
+};
+
+int main(int argc, char** argv) {
+    string exchange;
+    switch (opts.mode) {
+      case FANOUT: exchange="amq.fanout"; break;
+      case TOPIC: exchange="amq.topic"; break;
+      case SHARED: break;
     }
-}
 
-void ListenThread::run() {
     try {
-        Connection connection;
-        opts.open(connection);
-        Session_0_10 session = connection.newSession();
+        opts.parse(argc, argv);
+        if (!opts.setup && !opts.control && !opts.publish && !opts.subscribe)
+            opts.setup = opts.control = opts.publish = opts.subscribe = true;
 
-        string consumeQueue;
-        if (mode == SHARED) {
-            consumeQueue="perftest";
-        }
-        else {
-            consumeQueue=session.getId().str(); // Unique name.
-            session.queueDeclare(arg::queue=consumeQueue,
-                                 arg::exclusive=true,
-                                 arg::autoDelete=true,
-                                 arg::durable=opts.durable);
-            session.queueBind(arg::queue=consumeQueue,
-                              arg::exchange=exchange(),
-                              arg::routingKey="perftest");
-        }
-        // Notify publisher we are ready.
-        session.messageTransfer(arg::content=Message("ready", "control"));
-
-        SubscriptionManager subs(session);
-        LocalQueue consume(AckPolicy(opts.autoAck));
-		subs.setConfirmMode(opts.confirmMode);
-		subs.setAcquireMode(opts.acquireMode);
-        subs.subscribe(consume, consumeQueue);
-        int consumed=0;
-        AbsTime start=now();
-        Message msg;
-        size_t i = 0;
-        while ((msg=consume.pop()).getData() != "done") {
-            char* data=const_cast<char*>(msg.getData().data());
-            size_t j=*reinterpret_cast<size_t*>(data);
-            if (i > j)
-                throw Exception(
-                    QPID_MSG("Messages out of order " << i
-                             << " before " << j));
-            else
-                i = j;
-            ++consumed;
-        }
-        msg.acknowledge();      // Ack all outstanding messages -- ??
-        AbsTime end=now();
-
-        // Report to publisher.
-        ostringstream report;
-        double consume_rate=consumed/secs(start,end);
-        if (opts.summary)
-            report << consume_rate;
-        else
-            report << "consume count: " << consumed << endl
-                   << "consume secs: " << secs(start, end) << endl
-                   << "consume rate: " << consume_rate << endl;
-        
-        session.messageTransfer(arg::content=Message(report.str(), "control"));
-        connection.close();
+        if (opts.setup) Setup().run();          // Set up queues
+
+        boost::ptr_vector<Client> subs(opts.subs);
+        boost::ptr_vector<Client> pubs(opts.pubs);
+
+        // Start pubs/subs for each queue/topic.
+        for (size_t i = 0; i < opts.qt; ++i) {
+            ostringstream key;
+            key << "perftest" << i; // Queue or topic name.
+            if (opts.publish) {
+                for (size_t j = 0; j < opts.pubs; ++j)  {
+                    pubs.push_back(new PublishThread(key.str(), exchange));
+                    pubs.back().thread=Thread(pubs.back());
+                }
+            }
+            if (opts.subscribe) {
+                for (size_t j = 0; j < opts.subs; ++j)  {
+                    if (opts.mode==SHARED)
+                        subs.push_back(new SubscribeThread(key.str()));
+                    else
+                        subs.push_back(new SubscribeThread(key.str(),exchange));
+                    subs.back().thread=Thread(subs.back());
+                }
+            }
+        }
+
+        if (opts.control) Controller().run();
+
+
+        // Wait for started threads.
+        if (opts.publish) {
+            for (boost::ptr_vector<Client>::iterator i=pubs.begin();
+                 i != pubs.end();
+                 ++i) 
+                i->thread.join();
+        }
+            
+
+        if (opts.subscribe) {
+            for (boost::ptr_vector<Client>::iterator i=subs.begin();
+                 i != subs.end();
+                 ++i) 
+                i->thread.join();
+        }
+        return 0;
     }
     catch (const std::exception& e) {
-        cout << "PublishThread exception: " << e.what() << endl;
+        cout << "Unexpected exception: " << e.what() << endl; 
+       return 1;
     }
 }
-