You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/04/09 22:31:37 UTC
svn commit: r932580 - in /qpid/trunk/qpid/cpp/src/tests: qpid_cpp_benchmark
qpid_receive.cpp qpid_send.cpp
Author: aconway
Date: Fri Apr 9 20:31:36 2010
New Revision: 932580
URL: http://svn.apache.org/viewvc?rev=932580&view=rev
Log:
Rationalize message count and message content options in new API send/receive/benchmark.
Modified:
qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark
qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp
qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark?rev=932580&r1=932579&r2=932580&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_cpp_benchmark Fri Apr 9 20:31:36 2010
@@ -37,7 +37,9 @@ op.add_option("-m", "--messages", defaul
op.add_option("--queue-name", default="benchmark",
help="base name for queues (default %default)")
op.add_option("--send-rate", default=0, metavar="R",
- help="send rate limited to R messages/second, 0 means no limit")
+ help="send rate limited to R messages/second, 0 means no limit (default %default)")
+op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
+ help="message size in bytes (default %default)")
def start_receive(queue, opts):
return Popen(["qpid_receive",
@@ -53,9 +55,9 @@ def start_send(queue, opts):
return Popen(["qpid_send",
"-b", opts.broker,
"-a", queue,
- "--count", str(opts.messages),
+ "--messages", str(opts.messages),
"--send-eos", str(opts.receivers),
- "--content", "benchmark",
+ "--content-size", str(opts.content_size),
"--rate", str(opts.send_rate),
"--report-total"],
stdout=PIPE, stderr=STDOUT)
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp?rev=932580&r1=932579&r2=932580&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_receive.cpp Fri Apr 9 20:31:36 2010
@@ -89,7 +89,7 @@ struct Options : public qpid::Options
("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting")
("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
- ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
+ ("messages,m", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=932580&r1=932579&r2=932580&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Fri Apr 9 20:31:36 2010
@@ -34,13 +34,12 @@
#include <iostream>
#include <memory>
+using namespace std;
using namespace qpid::messaging;
using namespace qpid::types;
using qpid::client::amqp0_10::FailoverUpdates;
typedef std::vector<std::string> string_vector;
-using namespace std;
-
namespace qpid {
namespace tests {
@@ -50,7 +49,7 @@ struct Options : public qpid::Options
std::string url;
std::string connectionOptions;
std::string address;
- uint count;
+ uint messages;
std::string id;
std::string replyto;
uint sendEos;
@@ -60,7 +59,9 @@ struct Options : public qpid::Options
std::string correlationid;
string_vector properties;
string_vector entries;
- std::string content;
+ std::string contentString;
+ uint contentSize;
+ bool contentStdin;
uint tx;
uint rollbackFrequency;
uint capacity;
@@ -74,10 +75,13 @@ struct Options : public qpid::Options
: qpid::Options("Options"),
help(false),
url("amqp:tcp:127.0.0.1"),
- count(0),
+ messages(1),
sendEos(0),
durable(false),
ttl(0),
+ contentString(),
+ contentSize(0),
+ contentStdin(false),
tx(0),
rollbackFrequency(0),
capacity(1000),
@@ -91,17 +95,19 @@ struct Options : public qpid::Options
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
- ("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
+ ("messages,m", qpid::optValue(messages, "N"), "stop after N messages have been sent, 0 means no limit")
("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
- ("map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
- ("content", qpid::optValue(content, "CONTENT"), "use CONTENT as message content instead of reading from stdin")
+ ("content-string", qpid::optValue(contentString, "CONTENT"), "use CONTENT as message content")
+ ("content-size", qpid::optValue(contentSize, "N"), "create an N-byte message content")
+ ("content-map,M", qpid::optValue(entries, "NAME=VALUE"), "specify entry for map content")
+ ("content-stdin", qpid::optValue(contentStdin), "read message content from stdin, one line per message")
("capacity", qpid::optValue(capacity, "N"), "size of the senders outgoing message queue")
("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
@@ -191,26 +197,42 @@ using namespace qpid::tests;
class ContentGenerator {
public:
virtual ~ContentGenerator() {}
- virtual bool getContent(std::string& content) = 0;
+ virtual bool setContent(Message& msg) = 0;
};
class GetlineContentGenerator : public ContentGenerator {
public:
- virtual bool getContent(std::string& content) { return getline(std::cin, content); }
+ virtual bool setContent(Message& msg) {
+ string content;
+ bool got = getline(std::cin, content);
+ if (got) msg.setContent(content);
+ return got;
+ }
};
class FixedContentGenerator : public ContentGenerator {
public:
- FixedContentGenerator(std::string s) : content(s) {}
- virtual bool getContent(std::string& contentOut) {
- contentOut = content;
+ FixedContentGenerator(const string& s) : content(s) {}
+ virtual bool setContent(Message& msg) {
+ msg.setContent(content);
return true;
}
private:
std::string content;
};
-
+class MapContentGenerator : public ContentGenerator {
+ public:
+ MapContentGenerator(const Options& opt) : opts(opt) {}
+ virtual bool setContent(Message& msg) {
+ Variant::Map map;
+ opts.setEntries(map);
+ encode(map, msg);
+ return true;
+ }
+ private:
+ const Options& opts;
+};
int main(int argc, char ** argv)
{
@@ -232,23 +254,27 @@ int main(int argc, char ** argv)
if (!opts.userid.empty()) msg.setUserId(opts.userid);
if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid);
opts.setProperties(msg);
- std::string content;
uint sent = 0;
uint txCount = 0;
Reporter<Throughput> reporter(std::cout, opts.reportEvery);
std::auto_ptr<ContentGenerator> contentGen;
- if (!opts.content.empty())
- contentGen.reset(new FixedContentGenerator(opts.content));
- else
+ if (opts.contentStdin) {
+ opts.messages = 0; // Don't limit # messages sent.
contentGen.reset(new GetlineContentGenerator);
+ }
+ else if (opts.entries.size() > 0)
+ contentGen.reset(new MapContentGenerator(opts));
+ else if (opts.contentSize > 0)
+ contentGen.reset(new FixedContentGenerator(string(opts.contentSize, 'X')));
+ else
+ contentGen.reset(new FixedContentGenerator(opts.contentString));
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
if (opts.rate) interval = qpid::sys::TIME_SEC/opts.rate;
- while (contentGen->getContent(content)) {
- msg.setContent(content);
+ while (contentGen->setContent(msg)) {
msg.getProperties()["sn"] = ++sent;
msg.getProperties()["ts"] = int64_t(
qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
@@ -261,7 +287,7 @@ int main(int argc, char ** argv)
else
session.commit();
}
- if (opts.count && sent >= opts.count) break;
+ if (opts.messages && sent >= opts.messages) break;
if (opts.rate) {
qpid::sys::AbsTime waitTill(start, sent*interval);
int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org