You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/04/09 22:31:37 UTC

svn commit: r932580 - in /qpid/trunk/qpid/cpp/src/tests: qpid_cpp_benchmark qpid_receive.cpp qpid_send.cpp

Author: aconway
Date: Fri Apr  9 20:31:36 2010
New Revision: 932580

URL: http://svn.apache.org/viewvc?rev=932580&view=rev
Log:
Rationalize message count and message content options in new API send/receive/benchmark.

Modified:
    qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark
    qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark?rev=932580&r1=932579&r2=932580&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark Fri Apr  9 20:31:36 2010
@@ -37,7 +37,9 @@ op.add_option("-m", "--messages", defaul
 op.add_option("--queue-name", default="benchmark",
                help="base name for queues (default %default)")
 op.add_option("--send-rate", default=0, metavar="R",
-              help="send rate limited to R messages/second, 0 means no limit")
+              help="send rate limited to R messages/second, 0 means no limit (default %default)")
+op.add_option("--content-size", default=1024, type="int", metavar="BYTES", 
+              help="message size in bytes (default %default)")
 
 def start_receive(queue, opts):
     return Popen(["qpid_receive",
@@ -53,9 +55,9 @@ def start_send(queue, opts):
     return Popen(["qpid_send",
                   "-b", opts.broker,
                   "-a", queue,
-                  "--count", str(opts.messages),
+                  "--messages", str(opts.messages),
                   "--send-eos", str(opts.receivers),
-                  "--content", "benchmark",
+                  "--content-size", str(opts.content_size),
                   "--rate", str(opts.send_rate),
                   "--report-total"],
                  stdout=PIPE, stderr=STDOUT)

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp?rev=932580&r1=932579&r2=932580&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp Fri Apr  9 20:31:36 2010
@@ -89,7 +89,7 @@ struct Options : public qpid::Options
             ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
             ("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting")
             ("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
-            ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
+            ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
             ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
             ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
             ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")

Modified: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=932580&r1=932579&r2=932580&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Fri Apr  9 20:31:36 2010
@@ -34,13 +34,12 @@
 #include <iostream>
 #include <memory>
 
+using namespace std;
 using namespace qpid::messaging;
 using namespace qpid::types;
 using qpid::client::amqp0_10::FailoverUpdates;
 typedef std::vector<std::string> string_vector;
 
-using namespace std;
-
 namespace qpid {
 namespace tests {
 
@@ -50,7 +49,7 @@ struct Options : public qpid::Options
     std::string url;
     std::string connectionOptions;
     std::string address;
-    uint count;
+    uint messages;
     std::string id;
     std::string replyto;
     uint sendEos;
@@ -60,7 +59,9 @@ struct Options : public qpid::Options
     std::string correlationid;
     string_vector properties;
     string_vector entries;
-    std::string content;
+    std::string contentString;
+    uint contentSize;
+    bool contentStdin;
     uint tx;
     uint rollbackFrequency;
     uint capacity;
@@ -74,10 +75,13 @@ struct Options : public qpid::Options
         : qpid::Options("Options"),
           help(false),
           url("amqp:tcp:127.0.0.1"),
-          count(0),
+          messages(1),
           sendEos(0),
           durable(false),
           ttl(0),
+          contentString(),
+          contentSize(0),
+          contentStdin(false),
           tx(0),
           rollbackFrequency(0),
           capacity(1000),
@@ -91,17 +95,19 @@ struct Options : public qpid::Options
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
             ("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
             ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
-            ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
+            ("messages,m", qpid::optValue(messages, "N"), "stop after N messages have been sent, 0 means no limit")
             ("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
             ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
             ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
             ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
 	    ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
             ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
-            ("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
             ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
             ("user-id", qpid::optValue(userid, "USERID"), "userid for message")
-            ("content", qpid::optValue(content, "CONTENT"), "use CONTENT as message content instead of reading from stdin")
+            ("content-string", qpid::optValue(contentString, "CONTENT"), "use CONTENT as message content")
+            ("content-size", qpid::optValue(contentSize, "N"), "create an N-byte message content")
+            ("content-map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
+            ("content-stdin", qpid::optValue(contentStdin), "read message content from stdin, one line per message")
             ("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
             ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
             ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
@@ -191,26 +197,42 @@ using namespace qpid::tests;
 class ContentGenerator {
   public:
     virtual ~ContentGenerator() {}
-    virtual bool getContent(std::string& content) = 0;
+    virtual bool setContent(Message& msg) = 0;
 };
 
 class GetlineContentGenerator : public ContentGenerator {
   public:
-    virtual bool getContent(std::string& content) { return getline(std::cin, content); }
+    virtual bool setContent(Message& msg) {
+        string content;
+        bool got = getline(std::cin, content);
+        if (got) msg.setContent(content);
+        return got;
+    }
 };
 
 class FixedContentGenerator   : public ContentGenerator {
   public:
-    FixedContentGenerator(std::string s) : content(s) {}
-    virtual bool getContent(std::string& contentOut) {
-        contentOut = content;
+    FixedContentGenerator(const string& s) : content(s) {}
+    virtual bool setContent(Message& msg) {
+        msg.setContent(content);
         return true;
     }
   private:
     std::string content;
 };
 
-
+class MapContentGenerator   : public ContentGenerator {
+  public:
+    MapContentGenerator(const Options& opt) : opts(opt) {}
+    virtual bool setContent(Message& msg) {
+        Variant::Map map;
+        opts.setEntries(map);
+        encode(map, msg);
+        return true;
+    }
+  private:
+    const Options& opts;
+};
 
 int main(int argc, char ** argv)
 {
@@ -232,23 +254,27 @@ int main(int argc, char ** argv)
             if (!opts.userid.empty()) msg.setUserId(opts.userid);
             if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid);
             opts.setProperties(msg);
-            std::string content;
             uint sent = 0;
             uint txCount = 0;
             Reporter<Throughput> reporter(std::cout, opts.reportEvery);
 
             std::auto_ptr<ContentGenerator> contentGen;
-            if (!opts.content.empty())
-                contentGen.reset(new FixedContentGenerator(opts.content));
-            else
+            if (opts.contentStdin) {
+                opts.messages = 0; // Don't limit # messages sent.
                 contentGen.reset(new GetlineContentGenerator);
+            }
+            else if (opts.entries.size() > 0)
+                contentGen.reset(new MapContentGenerator(opts));
+            else if (opts.contentSize > 0)
+                contentGen.reset(new FixedContentGenerator(string(opts.contentSize, 'X')));
+            else 
+                contentGen.reset(new FixedContentGenerator(opts.contentString));
 
             qpid::sys::AbsTime start = qpid::sys::now();
             int64_t interval = 0;
             if (opts.rate) interval = qpid::sys::TIME_SEC/opts.rate;
 
-            while (contentGen->getContent(content)) {
-                msg.setContent(content);
+            while (contentGen->setContent(msg)) {
                 msg.getProperties()["sn"] = ++sent;
                 msg.getProperties()["ts"] = int64_t(
                     qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
@@ -261,7 +287,7 @@ int main(int argc, char ** argv)
                     else
                         session.commit();
                 }
-                if (opts.count && sent >= opts.count) break;
+                if (opts.messages && sent >= opts.messages) break;
                 if (opts.rate) {
                     qpid::sys::AbsTime waitTill(start, sent*interval);
                     int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org