You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/07 22:01:30 UTC

svn commit: r592897 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/client/LocalQueue.cpp qpid/client/SubscriptionManager.cpp tests/perftest.cpp

Author: aconway
Date: Wed Nov  7 13:01:26 2007
New Revision: 592897

URL: http://svn.apache.org/viewvc?rev=592897&view=rev
Log:

auto-ack support for perftest.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=592897&r1=592896&r2=592897&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Wed Nov  7 13:01:26 2007
@@ -35,8 +35,11 @@
     if (!queue)
         throw ClosedException();
     FrameSet::shared_ptr content = queue->pop();
-    if (content->isA<MessageTransferBody>()) 
-        return Message(*content, session);
+    if (content->isA<MessageTransferBody>()) {
+        Message m(*content, session);
+        autoAck.ack(m);
+        return m;
+    }
     else
         throw CommandInvalidException(
             QPID_MSG("Unexpected method: " << content->getMethod()));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=592897&r1=592896&r2=592897&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Wed Nov  7 13:01:26 2007
@@ -41,8 +41,8 @@
     MessageListener& listener, const std::string& q, const std::string& t)
 {
     std::string tag=t.empty() ? q:t;
-    session.messageSubscribe(arg::queue=q, arg::destination=tag);
     dispatcher.listen(tag, &listener);
+    session.messageSubscribe(arg::queue=q, arg::destination=tag);
     setFlowControl(tag, messages, bytes, window);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=592897&r1=592896&r2=592897&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Wed Nov  7 13:01:26 2007
@@ -44,10 +44,11 @@
     bool durable;
     int consumers;
     std::string mode;
+    int autoAck;
     
     Opts() :
         listen(false), publish(false), count(500000), size(64), consumers(1),
-        mode("shared")
+        mode("shared"), autoAck(100)
     {
         addOptions() 
             ("listen", optValue(listen), "Consume messages.")
@@ -56,7 +57,8 @@
             ("size", optValue(size, "BYTES"), "Size of messages.")
             ("durable", optValue(durable, "N"), "Publish messages as durable.")
             ("consumers", optValue(consumers, "N"), "Number of consumers.")
-            ("mode", optValue(mode, "shared|fanout|topic"), "consume mode");
+            ("mode", optValue(mode, "shared|fanout|topic"), "consume mode")
+            ("auto-ack", optValue(autoAck, "N"), "ack every N messages.");
     }
 };
 
@@ -219,12 +221,14 @@
         session.messageTransfer(arg::content=Message("ready", "control"));
 
         SubscriptionManager subs(session);
-        LocalQueue consume;
+        LocalQueue consume(AckPolicy(opts.autoAck));
         subs.subscribe(consume, consumeQueue);
         int consumed=0;
         AbsTime start=now();
-        while (consume.pop().getData() != "done")
+        Message msg;
+        while ((msg=consume.pop()).getData() != "done")
             ++consumed;
+        msg.acknowledge();      // Ack all outstanding messages.
         AbsTime end=now();
 
         // Report to publisher.