You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/11/09 23:14:45 UTC

svn commit: r1033264 - in /qpid/trunk/qpid/cpp/src/tests: qpid-cpp-benchmark qpid-receive.cpp

Author: aconway
Date: Tue Nov  9 22:14:45 2010
New Revision: 1033264

URL: http://svn.apache.org/viewvc?rev=1033264&view=rev
Log:
Added --receive-rate to qpid-recieve to allow simulation of a slow receiver.

Modified:
    qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
    qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1033264&r1=1033263&r2=1033264&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Tue Nov  9 22:14:45 2010
@@ -38,8 +38,10 @@ op.add_option("-m", "--messages", defaul
               help="send N messages per sender (default %default)")
 op.add_option("--queue-name", default="benchmark", metavar="NAME",
                help="base name for queues (default %default)")
-op.add_option("--send-rate", default=0, metavar="R",
-              help="send rate limited to R messages/second, 0 means no limit (default %default)")
+op.add_option("--send-rate", default=0, metavar="N",
+              help="send rate limited to N messages/second, 0 means no limit (default %default)")
+op.add_option("--receive-rate", default=0, metavar="N",
+              help="receive rate limited to N messages/second, 0 means no limit (default %default)")
 op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
               help="message size in bytes (default %default)")
 op.add_option("--ack-frequency", default=0, metavar="N", type="int",
@@ -79,6 +81,7 @@ def start_receive(queue, opts, ready_que
                "-m", str((opts.senders*opts.messages)/opts.receivers),
                "--forever",
                "--print-content=no",
+               "--receive-rate", str(opts.receive_rate),
                "--report-total",
                "--ack-frequency", str(opts.ack_frequency),
                "--ready-address", ready_queue,

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1033264&r1=1033263&r2=1033264&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Tue Nov  9 22:14:45 2010
@@ -29,6 +29,7 @@
 #include <qpid/Options.h>
 #include <qpid/log/Logger.h>
 #include <qpid/log/Options.h>
+#include "qpid/sys/Time.h"
 #include "TestOptions.h"
 #include "Statistics.h"
 
@@ -64,6 +65,7 @@ struct Options : public qpid::Options
     uint reportEvery;
     bool reportHeader;
     string readyAddress;
+    uint receiveRate;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
@@ -83,7 +85,8 @@ struct Options : public qpid::Options
           log(argv0),
           reportTotal(false),
           reportEvery(0),
-          reportHeader(true)
+          reportHeader(true),
+          receiveRate(0)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -104,6 +107,7 @@ struct Options : public qpid::Options
             ("report-every", qpid::optValue(reportEvery,"N"), "Report throughput and latency statistics every N messages.")
             ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.")
             ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
+            ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -181,7 +185,14 @@ int main(int argc, char ** argv)
             Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader);
             if (!opts.readyAddress.empty())
                 session.createSender(opts.readyAddress).send(msg);
+            
+            uint received = 0;
+            qpid::sys::AbsTime start = qpid::sys::now();
+            int64_t interval = 0;
+            if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate;
+
             while (!done && receiver.fetch(msg, timeout)) {
+                ++received;
                 reporter.message(msg);
                 if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
                     if (msg.getContent() == EOS) {
@@ -213,6 +224,11 @@ int main(int argc, char ** argv)
                 } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
                     session.acknowledge();
                 }
+                if (opts.receiveRate) {
+                    qpid::sys::AbsTime waitTill(start, received*interval);
+                    int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
+                    if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
+                }
                 //opts.rejectFrequency??
             }
             if (opts.reportTotal) reporter.report();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org