You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/11/20 18:24:55 UTC

svn commit: r719298 - in /incubator/qpid/trunk/qpid/cpp/src/tests: receiver.cpp sender.cpp txjob.cpp

Author: gsim
Date: Thu Nov 20 09:24:55 2008
New Revision: 719298

URL: http://svn.apache.org/viewvc?rev=719298&view=rev
Log:
Added some extra test options.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp?rev=719298&r1=719297&r2=719298&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp Thu Nov 20 09:24:55 2008
@@ -41,13 +41,17 @@
     string queue;
     uint messages;
     bool ignoreDuplicates;
+    uint creditWindow;
+    uint ackFrequency;
 
-    Args() : queue("test-queue"), messages(0), ignoreDuplicates(false)
+    Args() : queue("test-queue"), messages(0), ignoreDuplicates(false), creditWindow(0), ackFrequency(1)
     {
         addOptions()            
             ("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages")
             ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
-            ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)");
+            ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
+            ("credit-window", qpid::optValue(creditWindow, "N"), "Credit window (0 implies infinite window)")
+            ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)");
     }
 };
 
@@ -56,13 +60,14 @@
 class Receiver : public MessageListener, public FailoverManager::Command
 {
   public:
-    Receiver(const string& queue, uint messages, bool ignoreDuplicates);
+    Receiver(const string& queue, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency);
     void received(Message& message);
     void execute(AsyncSession& session, bool isRetry);
   private:
     const string queue;
     const uint count;
     const bool skipDups;
+    SubscriptionSettings settings;
     Subscription subscription;
     uint processed;
     uint lastSn;
@@ -70,8 +75,12 @@
     bool isDuplicate(Message& message);
 };
 
-Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates) : 
-    queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) {}
+Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency) : 
+    queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) 
+{
+    if (creditWindow) settings.flowControl = FlowControl::messageWindow(creditWindow);
+    settings.autoAck = ackFrequency;
+}
 
 void Receiver::received(Message & message) 
 {
@@ -96,7 +105,7 @@
 void Receiver::execute(AsyncSession& session, bool /*isRetry*/)
 {
     SubscriptionManager subs(session);
-    subscription = subs.subscribe(*this, queue);
+    subscription = subs.subscribe(*this, queue, settings);
     subs.run();
 }
 
@@ -106,7 +115,7 @@
     try {
         opts.parse(argc, argv);
         FailoverManager connection(opts.con);
-        Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates);
+        Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates, opts.creditWindow, opts.ackFrequency);
         connection.execute(receiver);
         connection.close();
         return 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp?rev=719298&r1=719297&r2=719298&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp Thu Nov 20 09:24:55 2008
@@ -39,14 +39,14 @@
 {
     string destination;
     string key;
-    bool sendEos;
+    uint sendEos;
 
-    Args() : key("test-queue"), sendEos(false)
+    Args() : key("test-queue"), sendEos(0)
     {
         addOptions()            
             ("exchange", qpid::optValue(destination, "EXCHANGE"), "Exchange to send messages to")
             ("routing-key", qpid::optValue(key, "KEY"), "Routing key to add to messages")
-            ("send-eos", qpid::optValue(sendEos), "Send EOS message to mark end of input");
+            ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input");
     }
 };
 
@@ -55,16 +55,16 @@
 class Sender : public FailoverManager::Command
 {
   public:
-    Sender(const std::string& destination, const std::string& key, bool sendEos);
+    Sender(const std::string& destination, const std::string& key, uint sendEos);
     void execute(AsyncSession& session, bool isRetry);
   private:
     MessageReplayTracker sender;
     Message message;  
-    const bool sendEos;
+    const uint sendEos;
     uint sent;
 };
 
-Sender::Sender(const std::string& destination, const std::string& key, bool eos) : 
+Sender::Sender(const std::string& destination, const std::string& key, uint eos) : 
     sender(10), message(destination, key), sendEos(eos), sent(0) {}
 
 void Sender::execute(AsyncSession& session, bool isRetry)
@@ -77,7 +77,7 @@
         message.getHeaders().setInt("sn", ++sent);
         sender.send(message);
     }
-    if (sendEos) {
+    for (uint i = sendEos; i > 0; --i) {
         message.setData(EOS);
         sender.send(message);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp?rev=719298&r1=719297&r2=719298&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp Thu Nov 20 09:24:55 2008
@@ -81,7 +81,7 @@
         }
 
         if (opts.quit) {
-            async(session).messageTransfer(arg::content=Message("quit", opts.workQueue));            
+            async(session).messageTransfer(arg::content=Message("quit", opts.workQueue));
         }
 
         session.sync();