You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/11/09 23:14:45 UTC
svn commit: r1033264 - in /qpid/trunk/qpid/cpp/src/tests: qpid-cpp-benchmark
qpid-receive.cpp
Author: aconway
Date: Tue Nov 9 22:14:45 2010
New Revision: 1033264
URL: http://svn.apache.org/viewvc?rev=1033264&view=rev
Log:
Added --receive-rate to qpid-recieve to allow simulation of a slow receiver.
Modified:
qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1033264&r1=1033263&r2=1033264&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Tue Nov 9 22:14:45 2010
@@ -38,8 +38,10 @@ op.add_option("-m", "--messages", defaul
help="send N messages per sender (default %default)")
op.add_option("--queue-name", default="benchmark", metavar="NAME",
help="base name for queues (default %default)")
-op.add_option("--send-rate", default=0, metavar="R",
- help="send rate limited to R messages/second, 0 means no limit (default %default)")
+op.add_option("--send-rate", default=0, metavar="N",
+ help="send rate limited to N messages/second, 0 means no limit (default %default)")
+op.add_option("--receive-rate", default=0, metavar="N",
+ help="receive rate limited to N messages/second, 0 means no limit (default %default)")
op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
help="message size in bytes (default %default)")
op.add_option("--ack-frequency", default=0, metavar="N", type="int",
@@ -79,6 +81,7 @@ def start_receive(queue, opts, ready_que
"-m", str((opts.senders*opts.messages)/opts.receivers),
"--forever",
"--print-content=no",
+ "--receive-rate", str(opts.receive_rate),
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
"--ready-address", ready_queue,
Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1033264&r1=1033263&r2=1033264&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Tue Nov 9 22:14:45 2010
@@ -29,6 +29,7 @@
#include <qpid/Options.h>
#include <qpid/log/Logger.h>
#include <qpid/log/Options.h>
+#include "qpid/sys/Time.h"
#include "TestOptions.h"
#include "Statistics.h"
@@ -64,6 +65,7 @@ struct Options : public qpid::Options
uint reportEvery;
bool reportHeader;
string readyAddress;
+ uint receiveRate;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -83,7 +85,8 @@ struct Options : public qpid::Options
log(argv0),
reportTotal(false),
reportEvery(0),
- reportHeader(true)
+ reportHeader(true),
+ receiveRate(0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -104,6 +107,7 @@ struct Options : public qpid::Options
("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")
("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.")
("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
+ ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -181,7 +185,14 @@ int main(int argc, char ** argv)
Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader);
if (!opts.readyAddress.empty())
session.createSender(opts.readyAddress).send(msg);
+
+ uint received = 0;
+ qpid::sys::AbsTime start = qpid::sys::now();
+ int64_t interval = 0;
+ if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate;
+
while (!done && receiver.fetch(msg, timeout)) {
+ ++received;
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
if (msg.getContent() == EOS) {
@@ -213,6 +224,11 @@ int main(int argc, char ** argv)
} else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
session.acknowledge();
}
+ if (opts.receiveRate) {
+ qpid::sys::AbsTime waitTill(start, received*interval);
+ int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+ if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+ }
//opts.rejectFrequency??
}
if (opts.reportTotal) reporter.report();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org