You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/11/20 18:24:55 UTC
svn commit: r719298 - in /incubator/qpid/trunk/qpid/cpp/src/tests:
receiver.cpp sender.cpp txjob.cpp
Author: gsim
Date: Thu Nov 20 09:24:55 2008
New Revision: 719298
URL: http://svn.apache.org/viewvc?rev=719298&view=rev
Log:
Added some extra test options.
Modified:
incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp?rev=719298&r1=719297&r2=719298&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp Thu Nov 20 09:24:55 2008
@@ -41,13 +41,17 @@
string queue;
uint messages;
bool ignoreDuplicates;
+ uint creditWindow;
+ uint ackFrequency;
- Args() : queue("test-queue"), messages(0), ignoreDuplicates(false)
+ Args() : queue("test-queue"), messages(0), ignoreDuplicates(false), creditWindow(0), ackFrequency(1)
{
addOptions()
("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages")
("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
- ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)");
+ ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
+ ("credit-window", qpid::optValue(creditWindow, "N"), "Credit window (0 implies infinite window)")
+ ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)");
}
};
@@ -56,13 +60,14 @@
class Receiver : public MessageListener, public FailoverManager::Command
{
public:
- Receiver(const string& queue, uint messages, bool ignoreDuplicates);
+ Receiver(const string& queue, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency);
void received(Message& message);
void execute(AsyncSession& session, bool isRetry);
private:
const string queue;
const uint count;
const bool skipDups;
+ SubscriptionSettings settings;
Subscription subscription;
uint processed;
uint lastSn;
@@ -70,8 +75,12 @@
bool isDuplicate(Message& message);
};
-Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates) :
- queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) {}
+Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency) :
+ queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0)
+{
+ if (creditWindow) settings.flowControl = FlowControl::messageWindow(creditWindow);
+ settings.autoAck = ackFrequency;
+}
void Receiver::received(Message & message)
{
@@ -96,7 +105,7 @@
void Receiver::execute(AsyncSession& session, bool /*isRetry*/)
{
SubscriptionManager subs(session);
- subscription = subs.subscribe(*this, queue);
+ subscription = subs.subscribe(*this, queue, settings);
subs.run();
}
@@ -106,7 +115,7 @@
try {
opts.parse(argc, argv);
FailoverManager connection(opts.con);
- Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates);
+ Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates, opts.creditWindow, opts.ackFrequency);
connection.execute(receiver);
connection.close();
return 0;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp?rev=719298&r1=719297&r2=719298&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp Thu Nov 20 09:24:55 2008
@@ -39,14 +39,14 @@
{
string destination;
string key;
- bool sendEos;
+ uint sendEos;
- Args() : key("test-queue"), sendEos(false)
+ Args() : key("test-queue"), sendEos(0)
{
addOptions()
("exchange", qpid::optValue(destination, "EXCHANGE"), "Exchange to send messages to")
("routing-key", qpid::optValue(key, "KEY"), "Routing key to add to messages")
- ("send-eos", qpid::optValue(sendEos), "Send EOS message to mark end of input");
+ ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input");
}
};
@@ -55,16 +55,16 @@
class Sender : public FailoverManager::Command
{
public:
- Sender(const std::string& destination, const std::string& key, bool sendEos);
+ Sender(const std::string& destination, const std::string& key, uint sendEos);
void execute(AsyncSession& session, bool isRetry);
private:
MessageReplayTracker sender;
Message message;
- const bool sendEos;
+ const uint sendEos;
uint sent;
};
-Sender::Sender(const std::string& destination, const std::string& key, bool eos) :
+Sender::Sender(const std::string& destination, const std::string& key, uint eos) :
sender(10), message(destination, key), sendEos(eos), sent(0) {}
void Sender::execute(AsyncSession& session, bool isRetry)
@@ -77,7 +77,7 @@
message.getHeaders().setInt("sn", ++sent);
sender.send(message);
}
- if (sendEos) {
+ for (uint i = sendEos; i > 0; --i) {
message.setData(EOS);
sender.send(message);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp?rev=719298&r1=719297&r2=719298&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp Thu Nov 20 09:24:55 2008
@@ -81,7 +81,7 @@
}
if (opts.quit) {
- async(session).messageTransfer(arg::content=Message("quit", opts.workQueue));
+ async(session).messageTransfer(arg::content=Message("quit", opts.workQueue));
}
session.sync();