You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2010/01/20 13:17:35 UTC
svn commit: r901153 - in /qpid/trunk/qpid/cpp/src:
qpid/client/amqp0_10/SessionImpl.cpp tests/qpid_recv.cpp tests/qpid_send.cpp
Author: gsim
Date: Wed Jan 20 12:17:34 2010
New Revision: 901153
URL: http://svn.apache.org/viewvc?rev=901153&view=rev
Log:
QPID-664: added support for testing transactions to qpid-send and qpid-recv, fixed bug in rollback.
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=901153&r1=901152&r2=901153&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Wed Jan 20 12:17:34 2010
@@ -398,7 +398,7 @@
void SessionImpl::acknowledgeImpl()
{
- incoming.accept();
+ if (!transactional) incoming.accept();
}
void SessionImpl::rejectImpl(qpid::messaging::Message& m)
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp?rev=901153&r1=901152&r2=901153&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_recv.cpp Wed Jan 20 12:17:34 2010
@@ -48,12 +48,15 @@
bool help;
std::string url;
std::string address;
+ std::string connectionOptions;
int64_t timeout;
bool forever;
uint messages;
bool ignoreDuplicates;
uint capacity;
uint ackFrequency;
+ uint tx;
+ uint rollbackFrequency;
bool printHeaders;
qpid::log::Options log;
@@ -67,18 +70,23 @@
ignoreDuplicates(false),
capacity(0),
ackFrequency(1),
+ tx(0),
+ rollbackFrequency(0),
printHeaders(false),
log(argv0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("address,a", qpid::optValue(address, "ADDRESS"), "address to receive from")
+ ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "timeout in seconds to wait before exiting")
("forever,f", qpid::optValue(forever), "ignore timeout and wait forever")
("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
- ("credit-window", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)")
+ ("capacity", qpid::optValue(capacity, "N"), "Credit window (0 implies infinite window)")
("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)")
+ ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
+ ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("print-headers", qpid::optValue(printHeaders), "If specified print out all message headers as well as content")
("help", qpid::optValue(help), "print this usage statement");
add(log);
@@ -111,25 +119,6 @@
}
};
-struct Args : public qpid::TestOptions
-{
- string address;
- uint messages;
- bool ignoreDuplicates;
- uint capacity;
- uint ackFrequency;
-
- Args() : address("test-queue"), messages(0), ignoreDuplicates(false), capacity(0), ackFrequency(1)
- {
- addOptions()
- ("address", qpid::optValue(address, "ADDRESS"), "Address from which to request messages")
- ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
- ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
- ("prefetch", qpid::optValue(capacity, "N"), "Number of messages that can be prefetched (0 implies no prefetch)")
- ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)");
- }
-};
-
const string EOS("eos");
class SequenceTracker
@@ -159,12 +148,17 @@
Options opts;
if (opts.parse(argc, argv)) {
try {
- Connection connection = Connection::open(opts.url);
- Session session = connection.newSession();
+ Variant::Map connectionOptions;
+ if (opts.connectionOptions.size()) {
+ parseOptionString(opts.connectionOptions, connectionOptions);
+ }
+ Connection connection = Connection::open(opts.url, connectionOptions);
+ Session session = connection.newSession(opts.tx > 0);
Receiver receiver = session.createReceiver(opts.address);
receiver.setCapacity(opts.capacity);
Message msg;
uint count = 0;
+ uint txCount = 0;
SequenceTracker sequenceTracker;
Duration timeout = opts.getTimeout();
bool done = false;
@@ -189,12 +183,26 @@
if (opts.messages && count >= opts.messages) done = true;
}
}
- if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
+ if (opts.tx && (count % opts.tx == 0)) {
+ if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ session.rollback();
+ } else {
+ session.commit();
+ }
+ } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
session.acknowledge();
}
//opts.rejectFrequency??
}
- session.acknowledge();
+ if (opts.tx) {
+ if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ session.rollback();
+ } else {
+ session.commit();
+ }
+ } else {
+ session.acknowledge();
+ }
session.close();
connection.close();
return 0;
Modified: qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp?rev=901153&r1=901152&r2=901153&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid_send.cpp Wed Jan 20 12:17:34 2010
@@ -47,6 +47,7 @@
{
bool help;
std::string url;
+ std::string connectionOptions;
std::string address;
int64_t timeout;
uint count;
@@ -60,6 +61,8 @@
string_vector properties;
string_vector entries;
std::string content;
+ uint tx;
+ uint rollbackFrequency;
qpid::log::Options log;
Options(const std::string& argv0=std::string())
@@ -71,11 +74,14 @@
sendEos(0),
durable(false),
ttl(0),
+ tx(0),
+ rollbackFrequency(0),
log(argv0)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
("address,a", qpid::optValue(address, "ADDRESS"), "address to drain from")
+ ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
("timeout,t", qpid::optValue(timeout, "TIMEOUT"), "exit after the specified time")
("count,c", qpid::optValue(count, "COUNT"), "stop after count messages have been sent, zero disables")
("id,i", qpid::optValue(id, "ID"), "use the supplied id instead of generating one")
@@ -88,6 +94,8 @@
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
("content", qpid::optValue(content, "CONTENT"), "specify textual content")
+ ("tx", qpid::optValue(tx, "N"), "batch size for transactions (0 implies transaction are not used)")
+ ("rollback-frequency", qpid::optValue(rollbackFrequency, "N"), "rollback frequency (0 implies no transaction will be rolledback)")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -172,8 +180,12 @@
Options opts;
if (opts.parse(argc, argv)) {
try {
- Connection connection = Connection::open(opts.url);
- Session session = connection.newSession();
+ Variant::Map connectionOptions;
+ if (opts.connectionOptions.size()) {
+ parseOptionString(opts.connectionOptions, connectionOptions);
+ }
+ Connection connection = Connection::open(opts.url, connectionOptions);
+ Session session = connection.newSession(opts.tx > 0);
Sender sender = session.createSender(opts.address);
Message msg;
msg.setDurable(opts.durable);
@@ -186,16 +198,31 @@
opts.setProperties(msg);
std::string content;
uint sent = 0;
+ uint txCount = 0;
while (getline(std::cin, content)) {
msg.setContent(content);
msg.getHeaders()["sn"] = ++sent;
sender.send(msg);
+ if (opts.tx && (sent % opts.tx == 0)) {
+ if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ session.rollback();
+ } else {
+ session.commit();
+ }
+ }
}
for (uint i = opts.sendEos; i > 0; --i) {
msg.getHeaders()["sn"] = ++sent;
msg.setContent(EOS);//TODO: add in ability to send digest or similar
sender.send(msg);
}
+ if (opts.tx) {
+ if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) {
+ session.rollback();
+ } else {
+ session.commit();
+ }
+ }
session.sync();
session.close();
connection.close();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org