You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/04/12 19:49:57 UTC
svn commit: r933333 - in /qpid/trunk/qpid/cpp/src/tests: qpid_cpp_benchmark
qpid_receive.cpp
Author: aconway
Date: Mon Apr 12 17:49:57 2010
New Revision: 933333
URL: http://svn.apache.org/viewvc?rev=933333&view=rev
Log:
qpid_cpp_benchmark waits for receivers to be ready before starting senders.
This avoids exaggerated latency numbers due to messages siting on the
queue while receivers are connecting and subscribing.
Modified:
qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark
qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark?rev=933333&r1=933332&r2=933333&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark Mon Apr 12 17:49:57 2010
@@ -43,7 +43,7 @@ op.add_option("--content-size", default=
op.add_option("--ack-frequency", default=0, metavar="N", type="int",
help="receiver ack's every N messages, 0 means unconfirmed")
-def start_receive(queue, opts):
+def start_receive(queue, opts, ready_queue):
return Popen(["qpid_receive",
"-b", opts.broker,
"-a", "%s;{create:always}"%(queue),
@@ -51,6 +51,7 @@ def start_receive(queue, opts):
"--print-content=no",
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
+ "--ready-address", ready_queue
],
stdout=PIPE, stderr=STDOUT)
@@ -78,34 +79,42 @@ def delete_queues(queues, broker):
except qpid.messaging.exceptions.SendError:pass # Ignore "no such queue"
c.close()
-def wait_for_queues(queues, broker):
- c = qpid.messaging.Connection(broker)
- c.open()
- s = c.session()
- while True:
- try:
- for q in queues: s.sender(q)
- break
- except: pass
- c.close()
-
def skip_first_line(text): return "\n".join(text.split("\n")[1:])
def print_output(processes):
print wait_for_output(processes.pop(0)),
for p in processes: print skip_first_line(wait_for_output(p)),
+class ReadyReceiver:
+ """A receiver for ready messages"""
+ def __init__(self, queue, broker):
+ delete_queues([queue], broker)
+ self.connection = qpid.messaging.Connection(broker)
+ self.connection.open()
+ self.receiver = self.connection.session().receiver(
+ "%s;{create:always,delete:always}"%(queue))
+ self.timeout=2
+
+ def wait(self, n):
+ try:
+ for i in xrange(n): self.receiver.fetch(self.timeout)
+ except qpid.messaging.Empty: raise "Timed out waiting for receivers to be ready"
+ self.connection.close()
+
def main():
opts, args = op.parse_args()
+ ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
delete_queues(queues, opts.broker)
- receivers = [start_receive(q, opts) for q in queues for i in xrange(opts.receivers)]
- wait_for_queues(queues, opts.broker) # Wait for receivers to be ready
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker)
+ receivers = [start_receive(q, opts, ready_queue) for q in queues for i in xrange(opts.receivers)]
+ ready_receiver.wait(len(receivers)) # Wait for receivers to be ready.
senders = [start_send(q, opts) for q in queues for i in xrange(opts.senders)]
print "Send"
print_output(senders)
print "\nReceive"
print_output(receivers)
+ print
delete_queues(queues, opts.broker)
if __name__ == "__main__": main()
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp?rev=933333&r1=933332&r2=933333&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp Mon Apr 12 17:49:57 2010
@@ -22,6 +22,7 @@
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Message.h>
#include <qpid/Options.h>
@@ -63,6 +64,7 @@ struct Options : public qpid::Options
qpid::log::Options log;
bool reportTotal;
uint reportEvery;
+ string readyAddress;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -100,6 +102,8 @@ struct Options : public qpid::Options
("failover-updates", qpid::optValue(failoverUpdates), "Listen for membership updates distributed via amq.failover")
("report-total", qpid::optValue(reportTotal), "Report total throughput and latency statistics")
("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")
+ ("ready-address", qpid::optValue(readyAddress, "ADDRESS"),
+ "send a message to this address when ready to receive")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -173,6 +177,8 @@ int main(int argc, char ** argv)
Duration timeout = opts.getTimeout();
bool done = false;
Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery);
+ if (!opts.readyAddress.empty())
+ session.createSender(opts.readyAddress).send(msg);
while (!done && receiver.fetch(msg, timeout)) {
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org