You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2008/08/18 21:14:23 UTC
svn commit: r686852 -
/incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp
Author: kpvdr
Date: Mon Aug 18 12:14:22 2008
New Revision: 686852
URL: http://svn.apache.org/viewvc?rev=686852&view=rev
Log:
Added --dtx option to txtest for DTX transaction testing
Modified:
incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp
Modified: incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp?rev=686852&r1=686851&r2=686852&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp Mon Aug 18 12:14:22 2008
@@ -20,6 +20,7 @@
*/
#include <algorithm>
+#include <iomanip>
#include <iostream>
#include <memory>
#include <sstream>
@@ -47,10 +48,12 @@
uint msgsPerTx;
uint txCount;
uint totalMsgCount;
+ bool dtx;
Args() : init(true), transfer(true), check(true),
size(256), durable(true), queues(2),
- base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10)
+ base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10),
+ dtx(false)
{
addOptions()
@@ -63,7 +66,8 @@
("queue-base-name", optValue(base, "<name>"), "base name for queues")
("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages transferred per transaction")
("tx-count", optValue(txCount, "N"), "number of transactions per 'agent'")
- ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'");
+ ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'")
+ ("dtx", optValue(dtx, "yes|no"), "use distributed transactions");
}
};
@@ -120,14 +124,17 @@
std::string src;
std::string dest;
Thread thread;
+ unsigned long xid_cnt;
+ framing::Xid xid;
- Transfer(const std::string& to, const std::string& from) : src(to), dest(from) {}
+ Transfer(const std::string& to, const std::string& from) : src(to), dest(from), xid_cnt(0), xid(0x4c414e47, "", from) {}
void run()
{
try {
- session.txSelect();
+ if (opts.dtx) session.dtxSelect();
+ else session.txSelect();
SubscriptionManager subs(session);
LocalQueue lq(AckPolicy(0));//manual acking
@@ -137,6 +144,10 @@
for (uint t = 0; t < opts.txCount; t++) {
Message in;
Message out("", dest);
+ if (opts.dtx) {
+ setNextXid(xid);
+ session.dtxStart(arg::xid=xid);
+ }
for (uint m = 0; m < opts.msgsPerTx; m++) {
in = lq.pop();
out.setData(in.getData());
@@ -145,12 +156,24 @@
session.messageTransfer(arg::content=out, arg::acceptMode=1);
}
lq.getAckPolicy().ackOutstanding(session);
- session.txCommit();
+ if (opts.dtx) {
+ session.dtxEnd(arg::xid=xid);
+ session.dtxPrepare(arg::xid=xid);
+ session.dtxCommit(arg::xid=xid);
+ } else {
+ session.txCommit();
+ }
}
} catch(const std::exception& e) {
std::cout << "Transfer interrupted: " << e.what() << std::endl;
}
}
+
+ void setNextXid(framing::Xid& xid) {
+ std::ostringstream oss;
+ oss << std::setfill('0') << std::hex << "xid-" << std::setw(12) << (++xid_cnt);
+ xid.setGlobalId(oss.str());
+ }
};
struct Controller : public Client