You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2008/08/18 21:14:23 UTC

svn commit: r686852 - /incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp

Author: kpvdr
Date: Mon Aug 18 12:14:22 2008
New Revision: 686852

URL: http://svn.apache.org/viewvc?rev=686852&view=rev
Log:
Added --dtx option to txtest for DTX transaction testing

Modified:
    incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp

Modified: incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp?rev=686852&r1=686851&r2=686852&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp (original)
+++ incubator/qpid/branches/qpid.0-10/cpp/src/tests/txtest.cpp Mon Aug 18 12:14:22 2008
@@ -20,6 +20,7 @@
  */
 
 #include <algorithm>
+#include <iomanip>
 #include <iostream>
 #include <memory>
 #include <sstream>
@@ -47,10 +48,12 @@
     uint msgsPerTx;
     uint txCount;
     uint totalMsgCount;
+    bool dtx;
 
     Args() : init(true), transfer(true), check(true), 
              size(256), durable(true), queues(2), 
-             base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10)
+             base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10),
+             dtx(false)
     {
         addOptions()            
 
@@ -63,7 +66,8 @@
             ("queue-base-name", optValue(base, "<name>"), "base name for queues")
             ("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages transferred per transaction")
             ("tx-count", optValue(txCount, "N"), "number of transactions per 'agent'")
-            ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'");
+            ("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'")
+            ("dtx", optValue(dtx, "yes|no"), "use distributed transactions");
     }
 };
 
@@ -120,14 +124,17 @@
     std::string src;
     std::string dest;
     Thread thread;
+    unsigned long xid_cnt;
+    framing::Xid xid;
 
-    Transfer(const std::string& to, const std::string& from) : src(to), dest(from) {}
+    Transfer(const std::string& to, const std::string& from) : src(to), dest(from), xid_cnt(0), xid(0x4c414e47, "", from) {}
 
     void run() 
     {
         try {
         
-            session.txSelect();
+            if (opts.dtx) session.dtxSelect();
+            else session.txSelect();
             SubscriptionManager subs(session);
             
             LocalQueue lq(AckPolicy(0));//manual acking
@@ -137,6 +144,10 @@
             for (uint t = 0; t < opts.txCount; t++) {
                 Message in;
                 Message out("", dest);
+                if (opts.dtx) {
+                    setNextXid(xid);
+                    session.dtxStart(arg::xid=xid);
+                }
                 for (uint m = 0; m < opts.msgsPerTx; m++) {
                     in = lq.pop();
                     out.setData(in.getData());
@@ -145,12 +156,24 @@
                     session.messageTransfer(arg::content=out, arg::acceptMode=1);
                 }
                 lq.getAckPolicy().ackOutstanding(session);
-                session.txCommit();
+                if (opts.dtx) {
+                    session.dtxEnd(arg::xid=xid);
+                    session.dtxPrepare(arg::xid=xid);
+                    session.dtxCommit(arg::xid=xid);
+                } else {
+                    session.txCommit();
+                }
             }
         } catch(const std::exception& e) {
             std::cout << "Transfer interrupted: " << e.what() << std::endl;
         }
     }
+
+    void setNextXid(framing::Xid& xid) {
+        std::ostringstream oss;
+        oss << std::setfill('0') << std::hex << "xid-" << std::setw(12) << (++xid_cnt);
+        xid.setGlobalId(oss.str());
+    }
 };
 
 struct Controller : public Client