You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/04/12 19:49:57 UTC

svn commit: r933333 - in /qpid/trunk/qpid/cpp/src/tests: qpid_cpp_benchmark qpid_receive.cpp

Author: aconway
Date: Mon Apr 12 17:49:57 2010
New Revision: 933333

URL: http://svn.apache.org/viewvc?rev=933333&view=rev
Log:
qpid_cpp_benchmark waits for receivers to be ready before starting senders.

This avoids exaggerated latency numbers due to messages siting on the
queue while receivers are connecting and subscribing.

Modified:
    qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark
    qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark?rev=933333&r1=933332&r2=933333&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark Mon Apr 12 17:49:57 2010
@@ -43,7 +43,7 @@ op.add_option("--content-size", default=
 op.add_option("--ack-frequency", default=0, metavar="N", type="int",
               help="receiver ack's every N messages, 0 means unconfirmed")
 
-def start_receive(queue, opts):
+def start_receive(queue, opts, ready_queue):
     return Popen(["qpid_receive",
                   "-b", opts.broker,
                   "-a", "%s;{create:always}"%(queue),
@@ -51,6 +51,7 @@ def start_receive(queue, opts):
                   "--print-content=no",
                   "--report-total",
                   "--ack-frequency", str(opts.ack_frequency),
+                  "--ready-address", ready_queue
                   ],
                  stdout=PIPE, stderr=STDOUT)
 
@@ -78,34 +79,42 @@ def delete_queues(queues, broker):
         except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue"
     c.close()
 
-def wait_for_queues(queues, broker):
-    c = qpid.messaging.Connection(broker)
-    c.open()
-    s = c.session()
-    while True:
-        try:
-            for q in queues: s.sender(q)
-            break
-        except: pass
-    c.close()
-
 def skip_first_line(text): return "\n".join(text.split("\n")[1:])
 
 def print_output(processes):
     print wait_for_output(processes.pop(0)),
     for p in processes: print skip_first_line(wait_for_output(p)),
 
+class ReadyReceiver:
+    """A receiver for ready messages"""
+    def __init__(self, queue, broker):
+        delete_queues([queue], broker)
+        self.connection = qpid.messaging.Connection(broker)
+        self.connection.open()
+        self.receiver = self.connection.session().receiver(
+            "%s;{create:always,delete:always}"%(queue))
+        self.timeout=2
+
+    def wait(self, n):
+        try:
+            for i in xrange(n): self.receiver.fetch(self.timeout)
+        except qpid.messaging.Empty: raise "Timed out waiting for receivers to be ready"
+        self.connection.close()
+
 def main():
     opts, args = op.parse_args()
+    ready_queue="%s-ready"%(opts.queue_name)
     queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
     delete_queues(queues, opts.broker)
-    receivers = [start_receive(q, opts) for q in queues for i in xrange(opts.receivers)]
-    wait_for_queues(queues, opts.broker)     # Wait for receivers to be ready
+    ready_receiver = ReadyReceiver(ready_queue, opts.broker)
+    receivers = [start_receive(q, opts, ready_queue) for q in queues for i in xrange(opts.receivers)]
+    ready_receiver.wait(len(receivers)) # Wait for receivers to be ready.
     senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)]
     print "Send"
     print_output(senders)
     print "\nReceive"
     print_output(receivers)
+    print
     delete_queues(queues, opts.broker)
 
 if __name__ == "__main__": main()

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp?rev=933333&r1=933332&r2=933333&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp Mon Apr 12 17:49:57 2010
@@ -22,6 +22,7 @@
 #include <qpid/messaging/Address.h>
 #include <qpid/messaging/Connection.h>
 #include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
 #include <qpid/messaging/Session.h>
 #include <qpid/messaging/Message.h>
 #include <qpid/Options.h>
@@ -63,6 +64,7 @@ struct Options : public qpid::Options
     qpid::log::Options log;
     bool reportTotal;
     uint reportEvery;
+    string readyAddress;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
@@ -100,6 +102,8 @@ struct Options : public qpid::Options
             ("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
             ("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics")
             ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")
+            ("ready-address", qpid::optValue(readyAddress, "ADDRESS"),
+             "send a message to this address when ready to receive")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -173,6 +177,8 @@ int main(int argc, char ** argv)
             Duration timeout = opts.getTimeout();
             bool done = false;
             Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery);
+            if (!opts.readyAddress.empty()) 
+                session.createSender(opts.readyAddress).send(msg);
             while (!done && receiver.fetch(msg, timeout)) {
                 reporter.message(msg);
                 if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org