You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/02 23:51:51 UTC

svn commit: r1164729 - /qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp

Author: kgiusti
Date: Fri Sep  2 21:51:50 2011
New Revision: 1164729

URL: http://svn.apache.org/viewvc?rev=1164729&view=rev
Log:
QPID-3346: update qpid-send to generate grouped message traffic

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp?rev=1164729&r1=1164728&r2=1164729&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/qpid-send.cpp Fri Sep  2 21:51:50 2011
@@ -28,6 +28,7 @@
 #include <qpid/messaging/FailoverUpdates.h>
 #include <qpid/sys/Time.h>
 #include <qpid/sys/Monitor.h>
+#include <qpid/sys/SystemInfo.h>
 #include "TestOptions.h"
 #include "Statistics.h"
 
@@ -76,6 +77,11 @@ struct Options : public qpid::Options
     uint flowControl;
     bool sequence;
     bool timestamp;
+    std::string groupKey;
+    std::string groupPrefix;
+    uint groupSize;
+    bool groupRandSize;
+    uint groupInterleave;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
@@ -100,7 +106,11 @@ struct Options : public qpid::Options
           sendRate(0),
           flowControl(0),
           sequence(true),
-          timestamp(true)
+          timestamp(true),
+          groupPrefix("GROUP-"),
+          groupSize(10),
+          groupRandSize(false),
+          groupInterleave(1)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -111,8 +121,8 @@ struct Options : public qpid::Options
             ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address")
             ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
             ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
-	    ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
-	    ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
+            ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+            ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
             ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
             ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
             ("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -131,6 +141,11 @@ struct Options : public qpid::Options
             ("flow-control", qpid::optValue(flowControl,"N"), "Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.")
             ("sequence", qpid::optValue(sequence, "yes|no"), "Add a sequence number messages property (required for duplicate/lost message detection)")
             ("timestamp", qpid::optValue(timestamp, "yes|no"), "Add a time stamp messages property (required for latency measurement)")
+            ("group-key", qpid::optValue(groupKey, "KEY"), "Generate groups of messages using message header 'KEY' to hold the group identifier")
+            ("group-prefix", qpid::optValue(groupPrefix, "STRING"), "Generate group identifers with 'STRING' prefix (if group-key specified)")
+            ("group-size", qpid::optValue(groupSize, "N"), "Number of messages per a group (if group-key specified)")
+            ("group-randomize-size", qpid::optValue(groupRandSize), "Randomize the number of messages per group to [1...group-size] (if group-key specified)")
+            ("group-interleave", qpid::optValue(groupInterleave, "N"), "Simultaineously interleave messages from N different groups (if group-key specified)")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -252,6 +267,68 @@ class MapContentGenerator   : public Con
     const Options& opts;
 };
 
+// tag each generated message with a group identifer
+//
+class GroupGenerator {
+public:
+    GroupGenerator(const std::string& key,
+                   const std::string& prefix,
+                   const uint size,
+                   const bool randomize,
+                   const uint interleave)
+        : groupKey(key), groupPrefix(prefix), groupSize(size),
+          randomizeSize(randomize), groupSuffix(0)
+    {
+        if (randomize) srand((unsigned int)qpid::sys::SystemInfo::getProcessId());
+
+        for (uint i = 0; i < 1 || i < interleave; ++i) {
+            newGroup();
+        }
+        current = groups.begin();
+    }
+
+    void setGroupInfo(Message &msg)
+    {
+        if (current == groups.end())
+            current = groups.begin();
+        msg.getProperties()[groupKey] = current->id;
+        // std::cout << "SENDING GROUPID=[" << current->id << "]" << std::endl;
+        if (++(current->count) == current->size) {
+            newGroup();
+            groups.erase(current++);
+        } else
+            ++current;
+    }
+
+  private:
+    const std::string& groupKey;
+    const std::string& groupPrefix;
+    const uint groupSize;
+    const bool randomizeSize;
+
+    uint groupSuffix;
+
+    struct GroupState {
+        std::string id;
+        const uint size;
+        uint count;
+        GroupState( const std::string& i, const uint s )
+            : id(i), size(s), count(0) {}
+    };
+    typedef std::list<GroupState> GroupList;
+    GroupList groups;
+    GroupList::iterator current;
+
+    void newGroup() {
+        std::ostringstream groupId(groupPrefix, ios_base::out|ios_base::ate);
+        groupId << groupSuffix++;
+        uint size = (randomizeSize) ? (rand() % groupSize) + 1 : groupSize;
+        // std::cout << "New group: GROUPID=[" << groupId.str() << "] size=" << size << std::endl;
+        GroupState group( groupId.str(), size );
+        groups.push_back( group );
+    }
+};
+
 int main(int argc, char ** argv)
 {
     Connection connection;
@@ -296,6 +373,14 @@ int main(int argc, char ** argv)
             else
                 contentGen.reset(new FixedContentGenerator(opts.contentString));
 
+            std::auto_ptr<GroupGenerator> groupGen;
+            if (!opts.groupKey.empty())
+                groupGen.reset(new GroupGenerator(opts.groupKey,
+                                                  opts.groupPrefix,
+                                                  opts.groupSize,
+                                                  opts.groupRandSize,
+                                                  opts.groupInterleave));
+
             qpid::sys::AbsTime start = qpid::sys::now();
             int64_t interval = 0;
             if (opts.sendRate) interval = qpid::sys::TIME_SEC/opts.sendRate;
@@ -323,6 +408,10 @@ int main(int argc, char ** argv)
                     else
                         msg.setReplyTo(Address()); // Clear the reply address.
                 }
+
+                if (groupGen.get())
+                    groupGen->setGroupInfo(msg);
+
                 sender.send(msg);
                 reporter.message(msg);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org