You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/02 23:51:51 UTC
svn commit: r1164729 -
/qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp
Author: kgiusti
Date: Fri Sep 2 21:51:50 2011
New Revision: 1164729
URL: http://svn.apache.org/viewvc?rev=1164729&view=rev
Log:
QPID-3346: update qpid-send to generate grouped message traffic
Modified:
qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp
Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp?rev=1164729&r1=1164728&r2=1164729&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp Fri Sep 2 21:51:50 2011
@@ -28,6 +28,7 @@
#include <qpid/messaging/FailoverUpdates.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Monitor.h>
+#include <qpid/sys/SystemInfo.h>
#include "TestOptions.h"
#include "Statistics.h"
@@ -76,6 +77,11 @@ struct Options : public qpid::Options
uint flowControl;
bool sequence;
bool timestamp;
+ std::string groupKey;
+ std::string groupPrefix;
+ uint groupSize;
+ bool groupRandSize;
+ uint groupInterleave;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -100,7 +106,11 @@ struct Options : public qpid::Options
sendRate(0),
flowControl(0),
sequence(true),
- timestamp(true)
+ timestamp(true),
+ groupPrefix("GROUP-"),
+ groupSize(10),
+ groupRandSize(false),
+ groupInterleave(1)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -111,8 +121,8 @@ struct Options : public qpid::Options
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
- ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
- ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
+ ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+ ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -131,6 +141,11 @@ struct Options : public qpid::Options
("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)")
("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)")
+ ("group-key", qpid::optValue(groupKey, "KEY"), "Generate groups of messages using message header 'KEY' to hold the group identifier")
+ ("group-prefix", qpid::optValue(groupPrefix, "STRING"), "Generate group identifers with 'STRING' prefix (if group-key specified)")
+ ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group (if group-key specified)")
+ ("group-randomize-size", qpid::optValue(groupRandSize), "Randomize the number of messages per group to [1...group-size] (if group-key specified)")
+ ("group-interleave", qpid::optValue(groupInterleave, "N"), "Simultaineously interleave messages from N different groups (if group-key specified)")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -252,6 +267,68 @@ class MapContentGenerator : public Con
const Options& opts;
};
+// tag each generated message with a group identifer
+//
+class GroupGenerator {
+public:
+ GroupGenerator(const std::string& key,
+ const std::string& prefix,
+ const uint size,
+ const bool randomize,
+ const uint interleave)
+ : groupKey(key), groupPrefix(prefix), groupSize(size),
+ randomizeSize(randomize), groupSuffix(0)
+ {
+ if (randomize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
+
+ for (uint i = 0; i < 1 || i < interleave; ++i) {
+ newGroup();
+ }
+ current = groups.begin();
+ }
+
+ void setGroupInfo(Message &msg)
+ {
+ if (current == groups.end())
+ current = groups.begin();
+ msg.getProperties()[groupKey] = current->id;
+ // std::cout << "SENDING GROUPID=[" << current->id << "]" << std::endl;
+ if (++(current->count) == current->size) {
+ newGroup();
+ groups.erase(current++);
+ } else
+ ++current;
+ }
+
+ private:
+ const std::string& groupKey;
+ const std::string& groupPrefix;
+ const uint groupSize;
+ const bool randomizeSize;
+
+ uint groupSuffix;
+
+ struct GroupState {
+ std::string id;
+ const uint size;
+ uint count;
+ GroupState( const std::string& i, const uint s )
+ : id(i), size(s), count(0) {}
+ };
+ typedef std::list<GroupState> GroupList;
+ GroupList groups;
+ GroupList::iterator current;
+
+ void newGroup() {
+ std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
+ groupId << groupSuffix++;
+ uint size = (randomizeSize) ? (rand() % groupSize) + 1 : groupSize;
+ // std::cout << "New group: GROUPID=[" << groupId.str() << "] size=" << size << std::endl;
+ GroupState group( groupId.str(), size );
+ groups.push_back( group );
+ }
+};
+
int main(int argc, char ** argv)
{
Connection connection;
@@ -296,6 +373,14 @@ int main(int argc, char ** argv)
else
contentGen.reset(new FixedContentGenerator(opts.contentString));
+ std::auto_ptr<GroupGenerator> groupGen;
+ if (!opts.groupKey.empty())
+ groupGen.reset(new GroupGenerator(opts.groupKey,
+ opts.groupPrefix,
+ opts.groupSize,
+ opts.groupRandSize,
+ opts.groupInterleave));
+
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
@@ -323,6 +408,10 @@ int main(int argc, char ** argv)
else
msg.setReplyTo(Address()); // Clear the reply address.
}
+
+ if (groupGen.get())
+ groupGen->setGroupInfo(msg);
+
sender.send(msg);
reporter.message(msg);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org