You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/11/07 22:01:30 UTC
svn commit: r592897 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/client/LocalQueue.cpp qpid/client/SubscriptionManager.cpp
tests/perftest.cpp
Author: aconway
Date: Wed Nov 7 13:01:26 2007
New Revision: 592897
URL: http://svn.apache.org/viewvc?rev=592897&view=rev
Log:
auto-ack support for perftest.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=592897&r1=592896&r2=592897&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Wed Nov 7 13:01:26 2007
@@ -35,8 +35,11 @@
if (!queue)
throw ClosedException();
FrameSet::shared_ptr content = queue->pop();
- if (content->isA<MessageTransferBody>())
- return Message(*content, session);
+ if (content->isA<MessageTransferBody>()) {
+ Message m(*content, session);
+ autoAck.ack(m);
+ return m;
+ }
else
throw CommandInvalidException(
QPID_MSG("Unexpected method: " << content->getMethod()));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=592897&r1=592896&r2=592897&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Wed Nov 7 13:01:26 2007
@@ -41,8 +41,8 @@
MessageListener& listener, const std::string& q, const std::string& t)
{
std::string tag=t.empty() ? q:t;
- session.messageSubscribe(arg::queue=q, arg::destination=tag);
dispatcher.listen(tag, &listener);
+ session.messageSubscribe(arg::queue=q, arg::destination=tag);
setFlowControl(tag, messages, bytes, window);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=592897&r1=592896&r2=592897&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Wed Nov 7 13:01:26 2007
@@ -44,10 +44,11 @@
bool durable;
int consumers;
std::string mode;
+ int autoAck;
Opts() :
listen(false), publish(false), count(500000), size(64), consumers(1),
- mode("shared")
+ mode("shared"), autoAck(100)
{
addOptions()
("listen", optValue(listen), "Consume messages.")
@@ -56,7 +57,8 @@
("size", optValue(size, "BYTES"), "Size of messages.")
("durable", optValue(durable, "N"), "Publish messages as durable.")
("consumers", optValue(consumers, "N"), "Number of consumers.")
- ("mode", optValue(mode, "shared|fanout|topic"), "consume mode");
+ ("mode", optValue(mode, "shared|fanout|topic"), "consume mode")
+ ("auto-ack", optValue(autoAck, "N"), "ack every N messages.");
}
};
@@ -219,12 +221,14 @@
session.messageTransfer(arg::content=Message("ready", "control"));
SubscriptionManager subs(session);
- LocalQueue consume;
+ LocalQueue consume(AckPolicy(opts.autoAck));
subs.subscribe(consume, consumeQueue);
int consumed=0;
AbsTime start=now();
- while (consume.pop().getData() != "done")
+ Message msg;
+ while ((msg=consume.pop()).getData() != "done")
++consumed;
+ msg.acknowledge(); // Ack all outstanding messages.
AbsTime end=now();
// Report to publisher.