You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/06/05 17:54:28 UTC

svn commit: r544522 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/tests/ python/ python/qpid/ python/tests_0-9/

Author: gsim
Date: Tue Jun  5 08:54:22 2007
New Revision: 544522

URL: http://svn.apache.org/viewvc?view=rev&rev=544522
Log:
Some tests and fixes for dtx preview.


Added:
    incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/python_tests
    incubator/qpid/trunk/qpid/python/amqp-doc
    incubator/qpid/trunk/qpid/python/qpid/spec.py
    incubator/qpid/trunk/qpid/python/qpid/testlib.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Tue Jun  5 08:54:22 2007
@@ -61,6 +61,7 @@
     prefetchCount(0),
     framesize(_framesize),
     tagGenerator("sgen"),
+    dtxSelected(false),
     accumulatedAck(0),
     store(_store),
     messageBuilder(this, _store, _stagingThreshold),
@@ -103,6 +104,9 @@
 void Channel::close(){
     opened = false;
     consumers.clear();
+    if (dtxBuffer.get()) {
+        dtxBuffer->fail();
+    }
     recover(true);
 }
 
@@ -123,22 +127,41 @@
     accumulatedAck.clear();
 }
 
-void Channel::startDtx(const std::string& xid, DtxManager& mgr){
+void Channel::selectDtx(){
+    dtxSelected = true;
+}
+
+void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){
+    if (!dtxSelected) {
+        throw ConnectionException(503, "Channel has not been selected for use with dtx");
+    }
     dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
     txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
-    mgr.start(xid, dtxBuffer);
+    if (join) {
+        mgr.join(xid, dtxBuffer);
+    } else {
+        mgr.start(xid, dtxBuffer);
+    }
 }
 
-void Channel::endDtx(const std::string& xid){
+void Channel::endDtx(const std::string& xid, bool fail){
+    if (!dtxBuffer) {
+        throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid);
+    }
     if (dtxBuffer->getXid() != xid) {
         throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") 
                                   % dtxBuffer->getXid() % xid);
     }
 
-    TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
-    accumulatedAck.clear();
-    dtxBuffer->enlist(txAck);    
-    dtxBuffer->markEnded();
+    if (fail) {
+        accumulatedAck.clear();
+        dtxBuffer->fail();
+    } else {
+        TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+        accumulatedAck.clear();
+        dtxBuffer->enlist(txAck);    
+        dtxBuffer->markEnded();
+    }
     
     dtxBuffer.reset();
     txBuffer.reset();
@@ -250,7 +273,7 @@
     Exchange::shared_ptr exchange =
         connection.broker.getExchanges().get(msg->getExchange());
     assert(exchange.get());
-    if (txBuffer) {
+    if (txBuffer.get()) {
         TxPublish* deliverable(new TxPublish(msg));
         TxOp::shared_ptr op(deliverable);
         exchange->route(*deliverable, msg->getRoutingKey(),
@@ -276,7 +299,7 @@
 }
 
 void Channel::ack(uint64_t firstTag, uint64_t lastTag){
-    if (txBuffer) {
+    if (txBuffer.get()) {
         accumulatedAck.update(firstTag, lastTag);
 
         //TODO: I think the outstanding prefetch size & count should be updated at this point...

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Tue Jun  5 08:54:22 2007
@@ -92,6 +92,7 @@
     sys::Mutex deliveryLock;
     TxBuffer::shared_ptr txBuffer;
     DtxBuffer::shared_ptr dtxBuffer;
+    bool dtxSelected;
     AccumulatedAck accumulatedAck;
     MessageStore* const store;
     MessageBuilder messageBuilder;//builder for in-progress message
@@ -137,8 +138,9 @@
     void startTx();
     void commit();
     void rollback();
-    void startDtx(const std::string& xid, DtxManager& mgr);
-    void endDtx(const std::string& xid);
+    void selectDtx();
+    void startDtx(const std::string& xid, DtxManager& mgr, bool join);
+    void endDtx(const std::string& xid, bool fail);
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
     void ack();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Tue Jun  5 08:54:22 2007
@@ -23,7 +23,7 @@
 using namespace qpid::broker;
 using qpid::sys::Mutex;
 
-DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false) {}
+DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false), failed(false) {}
 
 DtxBuffer::~DtxBuffer() {}
 
@@ -47,6 +47,20 @@
 bool DtxBuffer::isSuspended() 
 { 
     return suspended; 
+}
+
+void DtxBuffer::fail()
+{
+    Mutex::ScopedLock locker(lock); 
+    rollback();
+    failed = true;
+    ended = true;
+}
+
+bool DtxBuffer::isRollbackOnly()
+{
+    Mutex::ScopedLock locker(lock); 
+    return failed;
 }
 
 const std::string& DtxBuffer::getXid()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h Tue Jun  5 08:54:22 2007
@@ -31,6 +31,8 @@
             const std::string xid;
             bool ended;
             bool suspended;           
+            bool failed;
+
         public:
             typedef boost::shared_ptr<DtxBuffer> shared_ptr;
 
@@ -40,6 +42,8 @@
             bool isEnded();
             void setSuspended(bool suspended);
             bool isSuspended();
+            void fail();
+            bool isRollbackOnly();
             const std::string& getXid();
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Tue Jun  5 08:54:22 2007
@@ -23,6 +23,7 @@
 
 using namespace qpid::broker;
 using qpid::framing::AMQP_ClientProxy;
+using qpid::framing::Buffer;
 using qpid::framing::FieldTable;
 using qpid::framing::MethodContext;
 using std::string;
@@ -35,12 +36,22 @@
 {
 }
 
+const int XA_RBROLLBACK(1);
+const int XA_RBTIMEOUT(2);
+const int XA_HEURHAZ(3);
+const int XA_HEURCOM(4);
+const int XA_HEURRB(5);
+const int XA_HEURMIX(6);
+const int XA_RDONLY(7);
+const int XA_OK(8);
+
 
 // DtxDemarcationHandler:
 
 
 void DtxHandlerImpl::select(const MethodContext& context )
 {
+    channel.selectDtx();
     dClient.selectOk(context.getRequestId());
 }
 
@@ -50,52 +61,58 @@
                          bool fail,
                          bool suspend)
 {
-    if (fail && suspend) {
-        throw ConnectionException(503, "End and suspend cannot both be set.");
-    }
 
-    //TODO: handle fail
-    if (suspend) {
-        channel.suspendDtx(xid);
+    if (fail) {
+        channel.endDtx(xid, true);
+        if (suspend) {
+            throw ConnectionException(503, "End and suspend cannot both be set.");
+        } else {
+            dClient.endOk(XA_RBROLLBACK, context.getRequestId());
+        }
     } else {
-        channel.endDtx(xid);
+        if (suspend) {
+            channel.suspendDtx(xid);
+        } else {
+            channel.endDtx(xid, false);
+        }
+        dClient.endOk(XA_OK, context.getRequestId());
     }
-    dClient.endOk(0/*TODO - set flags*/, context.getRequestId());
 }
 
 void DtxHandlerImpl::start(const MethodContext& context,
                            u_int16_t /*ticket*/,
                            const string& xid,
-                           bool /*join*/,
+                           bool join,
                            bool resume)
 {
-    //TODO: handle join
+    if (join && resume) {
+        throw ConnectionException(503, "Join and resume cannot both be set.");
+    }
     if (resume) {
         channel.resumeDtx(xid);
     } else {
-        channel.startDtx(xid, broker.getDtxManager());
+        channel.startDtx(xid, broker.getDtxManager(), join);
     }
-    dClient.startOk(0/*TODO - set flags*/, context.getRequestId());
+    dClient.startOk(XA_OK, context.getRequestId());
 }
 
 // DtxCoordinationHandler:
 
 void DtxHandlerImpl::prepare(const MethodContext& context,
                              u_int16_t /*ticket*/,
-                             const string& xid )
+                             const string& xid)
 {
-    broker.getDtxManager().prepare(xid);
-    cClient.prepareOk(0/*TODO - set flags*/, context.getRequestId());
+    bool ok = broker.getDtxManager().prepare(xid);
+    cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
 }
 
 void DtxHandlerImpl::commit(const MethodContext& context,
                             u_int16_t /*ticket*/,
                             const string& xid,
-                            bool /*onePhase*/ )
+                            bool onePhase)
 {
-    //TODO use onePhase flag to validate correct sequence
-    broker.getDtxManager().commit(xid);
-    cClient.commitOk(0/*TODO - set flags*/, context.getRequestId());
+    bool ok = broker.getDtxManager().commit(xid, onePhase);
+    cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
 }
 
 
@@ -104,22 +121,54 @@
                               const string& xid )
 {
     broker.getDtxManager().rollback(xid);
-    cClient.rollbackOk(0/*TODO - set flags*/, context.getRequestId());
+    cClient.rollbackOk(XA_OK, context.getRequestId());
 }
 
-void DtxHandlerImpl::recover(const MethodContext& /*context*/,
+void DtxHandlerImpl::recover(const MethodContext& context,
                              u_int16_t /*ticket*/,
                              bool /*startscan*/,
                              u_int32_t /*endscan*/ )
 {
     //TODO
+
+    //TODO: what do startscan and endscan actually mean?
+
+    // response should hold on key value pair with key = 'xids' and
+    // value = sequence of xids
+
+    // until sequences are supported (0-10 encoding), an alternate
+    // scheme is used for testing:
+    //
+    //   key = 'xids' and value = a longstr containing shortstrs for each xid
+    //
+    // note that this restricts the length of the xids more than is
+    // strictly 'legal', but that is ok for testing
+    std::set<std::string> xids;
+    broker.getStore().collectPreparedXids(xids);        
+    uint size(0);
+    for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
+        size += i->size() + 1/*shortstr size*/;        
+    }
+    Buffer buffer(size + 4/*longstr size*/);
+    buffer.putLong(size);
+    for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
+        buffer.putShortString(*i);
+    }
+    buffer.flip();
+    string data;
+    buffer.getLongString(data);
+
+    FieldTable response;
+    response.setString("xids", data);
+    cClient.recoverOk(response, context.getRequestId());
 }
 
 void DtxHandlerImpl::forget(const MethodContext& /*context*/,
                             u_int16_t /*ticket*/,
-                            const string& /*xid*/ )
+                            const string& xid)
 {
-    //TODO
+    //Currently no heuristic completion is supported, so this should never be used.
+    throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
 }
 
 void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Tue Jun  5 08:54:22 2007
@@ -21,6 +21,7 @@
 #include "DtxManager.h"
 #include <boost/format.hpp>
 #include <iostream>
+using qpid::sys::Mutex;
 
 using namespace qpid::broker;
 
@@ -30,31 +31,40 @@
 
 void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops)
 {
-    getOrCreateWork(xid)->add(ops);
+    createWork(xid)->add(ops);
+}
+
+void DtxManager::join(std::string xid, DtxBuffer::shared_ptr ops)
+{
+    getWork(xid)->add(ops);
 }
 
 void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops)
 {
-    getOrCreateWork(xid)->recover(txn, ops);
+    createWork(xid)->recover(txn, ops);
 }
 
-void DtxManager::prepare(const std::string& xid) 
+bool DtxManager::prepare(const std::string& xid) 
 { 
-    getWork(xid)->prepare();
+    return getWork(xid)->prepare();
 }
 
-void DtxManager::commit(const std::string& xid) 
+bool DtxManager::commit(const std::string& xid, bool onePhase) 
 { 
-    getWork(xid)->commit();
+    bool result = getWork(xid)->commit(onePhase);
+    remove(xid);
+    return result;
 }
 
 void DtxManager::rollback(const std::string& xid) 
 { 
     getWork(xid)->rollback();
+    remove(xid);
 }
 
 DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
 {
+    Mutex::ScopedLock locker(lock); 
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
@@ -62,11 +72,24 @@
     return i;
 }
 
-DtxManager::WorkMap::iterator DtxManager::getOrCreateWork(std::string& xid)
+void DtxManager::remove(const std::string& xid)
 {
+    Mutex::ScopedLock locker(lock); 
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
-        i = work.insert(xid, new DtxWorkRecord(xid, store)).first;
+        throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+    } else {
+        work.erase(i);
+    }
+}
+
+DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid)
+{
+    Mutex::ScopedLock locker(lock); 
+    WorkMap::iterator i = work.find(xid);
+    if (i != work.end()) {
+        throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid);
+    } else {
+        return work.insert(xid, new DtxWorkRecord(xid, store)).first;
     }
-    return i;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Tue Jun  5 08:54:22 2007
@@ -26,6 +26,7 @@
 #include "DtxWorkRecord.h"
 #include "TransactionalStore.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Mutex.h"
 
 namespace qpid {
 namespace broker {
@@ -35,17 +36,20 @@
 
     WorkMap work;
     TransactionalStore* const store;
+    qpid::sys::Mutex lock;
 
+    void remove(const std::string& xid);
     WorkMap::iterator getWork(const std::string& xid);
-    WorkMap::iterator getOrCreateWork(std::string& xid);
+    WorkMap::iterator createWork(std::string& xid);
 
 public:
     DtxManager(TransactionalStore* const store);
     ~DtxManager();
     void start(std::string xid, DtxBuffer::shared_ptr work);
+    void join(std::string xid, DtxBuffer::shared_ptr work);
     void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work);
-    void prepare(const std::string& xid);
-    void commit(const std::string& xid);
+    bool prepare(const std::string& xid);
+    bool commit(const std::string& xid, bool onePhase);
     void rollback(const std::string& xid);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Tue Jun  5 08:54:22 2007
@@ -22,24 +22,32 @@
 #include <boost/format.hpp>
 #include <boost/mem_fn.hpp>
 using boost::mem_fn;
+using qpid::sys::Mutex;
 
 using namespace qpid::broker;
 
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false) {}
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : 
+    xid(_xid), store(_store), completed(false), rolledback(false), prepared(false) {}
 
 DtxWorkRecord::~DtxWorkRecord() {}
 
 bool DtxWorkRecord::prepare()
 {
-    checkCompletion();
-    txn = store->begin(xid);
-    if (prepare(txn.get())) {
-        store->prepare(*txn);
-        return true;
+    Mutex::ScopedLock locker(lock);     
+    if (check()) {
+        txn = store->begin(xid);
+        if (prepare(txn.get())) {
+            store->prepare(*txn);
+            prepared = true;
+        } else {
+            abort();
+            //TODO: this should probably be flagged as internal error
+        }
     } else {
+        //some part of the work has been marked rollback only
         abort();
-        return false;
     }
+    return prepared;
 }
 
 bool DtxWorkRecord::prepare(TransactionContext* _txn)
@@ -51,50 +59,77 @@
     return succeeded;
 }
 
-void DtxWorkRecord::commit()
+bool DtxWorkRecord::commit(bool onePhase)
 {
-    checkCompletion();
-    if (txn.get()) {
-        //already prepared
-        store->commit(*txn);
-        txn.reset();
+    Mutex::ScopedLock locker(lock); 
+    if (check()) {
+        if (prepared) {
+            //already prepared i.e. 2pc
+            if (onePhase) {
+                throw ConnectionException(503, 
+                    boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid);        
+            }
 
-        for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
-    } else {
-        //1pc commit optimisation, don't need a 2pc transaction context:
-        std::auto_ptr<TransactionContext> localtxn = store->begin();
-        if (prepare(localtxn.get())) {
-            store->commit(*localtxn);
+            store->commit(*txn);
+            txn.reset();
+            
             for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
+            return true;
         } else {
-            store->abort(*localtxn);
-            abort();
+            //1pc commit optimisation, don't need a 2pc transaction context:
+            if (!onePhase) {
+                throw ConnectionException(503, 
+                    boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid);        
+            }
+            std::auto_ptr<TransactionContext> localtxn = store->begin();
+            if (prepare(localtxn.get())) {
+                store->commit(*localtxn);
+                for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
+                return true;
+            } else {
+                store->abort(*localtxn);
+                abort();
+                //TODO: this should probably be flagged as internal error
+                return false;
+            }
         }
+    } else {
+        //some part of the work has been marked rollback only
+        abort();
+        return false;
     }
 }
 
 void DtxWorkRecord::rollback()
 {
-    checkCompletion();
+    Mutex::ScopedLock locker(lock); 
+    check();
     abort();
 }
 
 void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
 {
+    Mutex::ScopedLock locker(lock); 
+    if (completed) {
+        throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid);
+    }
     work.push_back(ops);
 }
 
-void DtxWorkRecord::checkCompletion()
+bool DtxWorkRecord::check()
 {
     if (!completed) {
         //iterate through all DtxBuffers and ensure they are all ended
         for (Work::iterator i = work.begin(); i != work.end(); i++) {
             if (!(*i)->isEnded()) {
                 throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid);
+            } else if ((*i)->isRollbackOnly()) {
+                rolledback = true;
             }
         }
         completed = true;
     }
+    return !rolledback;
 }
 
 void DtxWorkRecord::abort()
@@ -112,4 +147,5 @@
     txn = _txn;
     ops->markEnded();
     completed = true;
+    prepared = true;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Tue Jun  5 08:54:22 2007
@@ -27,6 +27,7 @@
 #include "DtxBuffer.h"
 #include "TransactionalStore.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Mutex.h"
 
 namespace qpid {
 namespace broker {
@@ -43,17 +44,20 @@
     const std::string xid;
     TransactionalStore* const store;
     bool completed;
+    bool rolledback;
+    bool prepared;
     Work work;
     std::auto_ptr<TPCTransactionContext> txn;
+    qpid::sys::Mutex lock;
 
-    void checkCompletion();
+    bool check();
     void abort();
     bool prepare(TransactionContext* txn);
 public:
     DtxWorkRecord(const std::string& xid, TransactionalStore* const store);
     ~DtxWorkRecord();
     bool prepare();
-    void commit();
+    bool commit(bool onePhase);
     void rollback();
     void add(DtxBuffer::shared_ptr ops);
     void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Tue Jun  5 08:54:22 2007
@@ -25,6 +25,26 @@
 
 #include <iostream>
 
+namespace qpid{
+namespace broker{
+
+const std::string nullxid = "";
+
+class DummyCtxt : public TPCTransactionContext 
+{
+    const std::string xid;
+public:
+    DummyCtxt(const std::string& _xid) : xid(_xid) {}
+    static std::string getXid(TransactionContext& ctxt) 
+    {
+        DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
+        return c ? c->xid : nullxid;
+    }
+};
+
+}
+}
+
 using namespace qpid::broker;
 
 NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
@@ -92,24 +112,27 @@
     return std::auto_ptr<TransactionContext>();
 }
 
-std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string&)
+std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string& xid)
 {
-    return std::auto_ptr<TPCTransactionContext>();
+    return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid));
 }
 
-void NullMessageStore::prepare(TPCTransactionContext&)
+void NullMessageStore::prepare(TPCTransactionContext& ctxt)
 {
+    prepared.insert(DummyCtxt::getXid(ctxt));
 }
 
-void NullMessageStore::commit(TransactionContext&)
+void NullMessageStore::commit(TransactionContext& ctxt)
 {
+    prepared.erase(DummyCtxt::getXid(ctxt));
 }
 
-void NullMessageStore::abort(TransactionContext&)
+void NullMessageStore::abort(TransactionContext& ctxt)
 {
+    prepared.erase(DummyCtxt::getXid(ctxt));
 }
 
-void NullMessageStore::collectPreparedXids(std::set<string>&)
+void NullMessageStore::collectPreparedXids(std::set<string>& out)
 {
-
+    out.insert(prepared.begin(), prepared.end());
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Tue Jun  5 08:54:22 2007
@@ -21,6 +21,7 @@
 #ifndef _NullMessageStore_
 #define _NullMessageStore_
 
+#include <set>
 #include "BrokerMessage.h"
 #include "MessageStore.h"
 #include "BrokerQueue.h"
@@ -33,6 +34,7 @@
  */
 class NullMessageStore : public MessageStore
 {
+    std::set<std::string> prepared;
     const bool warn;
 public:
     NullMessageStore(bool warn = false);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp Tue Jun  5 08:54:22 2007
@@ -59,7 +59,7 @@
         work.add(bufferA);
         work.add(bufferB);
 
-        work.commit();
+        work.commit(true);
 
         store.check();
         CPPUNIT_ASSERT(store.isCommitted());
@@ -93,7 +93,7 @@
         work.add(bufferB);
         work.add(bufferC);
 
-        work.commit();
+        work.commit(true);
 
         CPPUNIT_ASSERT(store.isAborted());
         store.check();
@@ -125,7 +125,7 @@
 
         CPPUNIT_ASSERT(work.prepare());
         CPPUNIT_ASSERT(store.isPrepared());
-        work.commit();
+        work.commit(false);
         store.check();
         CPPUNIT_ASSERT(store.isCommitted());
         opA->check();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/python_tests
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/python_tests?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/python_tests (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/python_tests Tue Jun  5 08:54:22 2007
@@ -1,7 +1,7 @@
 #!/bin/sh
 # Run the python tests.
 if test -d ../../../python ;  then
-    cd ../../../python && ./run-tests -v -s "0-9" -I cpp_failing_0-9.txt $PYTHON_TESTS
+    cd ../../../python && ./run-tests -v -s "0-9" -e ../specs/amqp-dtx-preview.0-9.xml -I cpp_failing_0-9.txt $PYTHON_TESTS
 else
     echo Warning: python tests not found.
 fi

Modified: incubator/qpid/trunk/qpid/python/amqp-doc
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/amqp-doc?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/python/amqp-doc (original)
+++ incubator/qpid/trunk/qpid/python/amqp-doc Tue Jun  5 08:54:22 2007
@@ -37,15 +37,17 @@
 """ % (msg, sys.argv[0])).strip()
 
 try:
-  opts, args = getopt(sys.argv[1:], "s:e", ["regexp", "spec="])
+  opts, args = getopt(sys.argv[1:], "s:ea:", ["regexp", "spec=", "additional="])
 except GetoptError, e:
   die(str(e))
 
 regexp = False
 spec = "../specs/amqp.0-9.xml"
+errata = []
 for k, v in opts:
   if k == "-e" or k == "--regexp": regexp = True
   if k == "-s" or k == "--spec": spec = v
+  if k == "-a" or k == "--additional": errata.append(v)
 
 if regexp:
   def match(pattern, value):
@@ -57,7 +59,7 @@
   def match(pattern, value):
     return fnmatch(value, pattern)
 
-spec = load(spec)
+spec = load(spec, *errata)
 methods = {}
 patterns = args
 for pattern in patterns:

Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Tue Jun  5 08:54:22 2007
@@ -309,8 +309,10 @@
     for nd in root["constant"]:
       const = Constant(spec, pythonize(nd["@name"]), int(nd["@value"]),
                        nd.get("@class"), get_docs(nd))
-      spec.constants.add(const)
-
+      try:
+        spec.constants.add(const)
+      except ValueError, e:  
+        print "Warning:", e
     # domains are typedefs
     for nd in root["domain"]:
       spec.domains.add(Domain(spec, nd.index(), pythonize(nd["@name"]),
@@ -320,18 +322,20 @@
     # classes
     for c_nd in root["class"]:
       cname = pythonize(c_nd["@name"])
-      if root == spec_root:
+      if spec.classes.byname.has_key(cname):
+        klass = spec.classes.byname[cname]
+      else:
         klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"],
                       get_docs(c_nd))
         spec.classes.add(klass)
-      else:
-        klass = spec.classes.byname[cname]
 
       added_methods = []
       load_fields(c_nd, klass.fields, spec.domains.byname)
       for m_nd in c_nd["method"]:
         mname = pythonize(m_nd["@name"])
-        if root == spec_root:
+        if klass.methods.byname.has_key(mname):
+          meth = klass.methods.byname[mname]
+        else:
           meth = Method(klass, mname,
                         int(m_nd["@index"]),
                         m_nd.get_bool("@content", False),
@@ -341,8 +345,6 @@
                         get_docs(m_nd))
           klass.methods.add(meth)
           added_methods.append(meth)
-        else:
-          meth = klass.methods.byname[mname]
         load_fields(m_nd, meth.fields, spec.domains.byname)
       # resolve the responses
       for m in added_methods:

Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?view=diff&rev=544522&r1=544521&r2=544522
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Tue Jun  5 08:54:22 2007
@@ -99,7 +99,7 @@
         self.specfile = "0-8"
         self.errata = []
         try:
-            opts, self.tests = getopt(args, "s:b:h?dvi:I:", ["help", "spec", "server", "verbose", "ignore", "ignore-file"])
+            opts, self.tests = getopt(args, "s:e:b:h?dvi:I:", ["help", "spec", "errata=", "server", "verbose", "ignore", "ignore-file"])
         except GetoptError, e:
             self._die(str(e))
         for opt, value in opts:
@@ -278,14 +278,14 @@
         self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
 
     def assertChannelException(self, expectedCode, message):
-        if not isinstance(message, Message): self.fail("expected channel_close method")
+        if not isinstance(message, Message): self.fail("expected channel_close method, got %s" % (message))
         self.assertEqual("channel", message.method.klass.name)
         self.assertEqual("close", message.method.name)
         self.assertEqual(expectedCode, message.reply_code)
 
 
     def assertConnectionException(self, expectedCode, message): 
-        if not isinstance(message, Message): self.fail("expected connection_close method")
+        if not isinstance(message, Message): self.fail("expected connection_close method, got %s" % (message))
         self.assertEqual("connection", message.method.klass.name)
         self.assertEqual("close", message.method.name)
         self.assertEqual(expectedCode, message.reply_code)

Added: incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py?view=auto&rev=544522
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py Tue Jun  5 08:54:22 2007
@@ -0,0 +1,540 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from struct import pack, unpack
+
+class DtxTests(TestBase):
+    """
+    Tests for the amqp dtx related classes.
+
+    Tests of the form test_simple_xxx test the basic transactional
+    behaviour. The approach here is to 'swap' a message from one queue
+    to another by consuming and re-publishing in the same
+    transaction. That transaction is then completed in different ways
+    and the appropriate result verified.
+
+    The other tests enforce more specific rules and behaviour on a
+    per-method or per-field basis.        
+    """
+
+    XA_RBROLLBACK = 1
+    XA_OK = 8
+
+    def test_simple_commit(self):
+        """        
+        Test basic one-phase commit behaviour.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "commit")
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #commit
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).flags)
+
+        #check result
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(1, "queue-b")
+        self.assertMessageId("commit", "queue-b")
+
+    def test_simple_prepare_commit(self):
+        """        
+        Test basic two-phase commit behaviour.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "prepare-commit")
+
+        #prepare
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags)
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #commit
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).flags)
+
+        #check result
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(1, "queue-b")
+        self.assertMessageId("prepare-commit", "queue-b")
+
+
+    def test_simple_rollback(self):
+        """        
+        Test basic rollback behaviour.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "rollback")
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #rollback
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
+
+        #check result
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("rollback", "queue-a")
+
+    def test_simple_prepare_rollback(self):
+        """        
+        Test basic rollback behaviour after the transaction has been prepared.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "prepare-rollback")
+
+        #prepare
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags)
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #rollback
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
+
+        #check result
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("prepare-rollback", "queue-a")    
+
+    def test_select_required(self):
+        """
+        check that an error is flagged if select is not issued before
+        start or end        
+        """
+        channel = self.channel
+        tx = self.xid("dummy")
+        try:
+            channel.dtx_demarcation_start(xid=tx)
+            
+            #if we get here we have failed, but need to do some cleanup:
+            channel.dtx_demarcation_end(xid=tx)
+            channel.dtx_coordination_rollback(xid=tx)
+            self.fail("Channel not selected for use with dtx, expected exception!")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_start_already_known(self):
+        """
+        Verify that an attempt to start an association with a
+        transaction that is already known is not allowed (unless the
+        join flag is set).
+        """
+        #create two channels on different connection & select them for use with dtx:
+        channel1 = self.channel
+        channel1.dtx_demarcation_select()
+
+        other = self.connect()
+        channel2 = other.channel(1)
+        channel2.channel_open()
+        channel2.dtx_demarcation_select()
+
+        #create a xid
+        tx = self.xid("dummy")
+        #start work on one channel under that xid:
+        channel1.dtx_demarcation_start(xid=tx)
+        #then start on the other without the join set
+        failed = False
+        try:
+            channel2.dtx_demarcation_start(xid=tx)
+        except Closed, e:
+            failed = True
+            error = e
+
+        #cleanup:
+        if not failed:
+            channel2.dtx_demarcation_end(xid=tx)
+            other.close()
+        channel1.dtx_demarcation_end(xid=tx)
+        channel1.dtx_coordination_rollback(xid=tx)
+        
+        #verification:
+        if failed: self.assertConnectionException(503, e.args[0])
+        else: self.fail("Xid already known, expected exception!")                    
+
+    def test_forget_xid_on_completion(self):
+        """
+        Verify that a xid is 'forgotten' - and can therefore be used
+        again - once it is completed.
+        """
+        channel = self.channel
+        #do some transactional work & complete the transaction
+        self.test_simple_commit()
+        
+        #start association for the same xid as the previously completed txn
+        tx = self.xid("my-xid")
+        channel.dtx_demarcation_start(xid=tx)
+        channel.dtx_demarcation_end(xid=tx)
+        channel.dtx_coordination_rollback(xid=tx)
+
+    def test_start_join_and_resume(self):
+        """
+        Ensure the correct error is signalled when both the join and
+        resume flags are set on starting an association between a
+        channel and a transcation.
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        try:
+            channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
+            #failed, but need some cleanup:
+            channel.dtx_demarcation_end(xid=tx)
+            channel.dtx_coordination_rollback(xid=tx)
+            self.fail("Join and resume both set, expected exception!")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_start_join(self):
+        """        
+        Verify 'join' behaviour, where a channel is associated with a
+        transaction that is already associated with another channel.        
+        """
+        #create two channels & select them for use with dtx:
+        channel1 = self.channel
+        channel1.dtx_demarcation_select()
+
+        channel2 = self.client.channel(2)
+        channel2.channel_open()
+        channel2.dtx_demarcation_select()
+
+        #setup
+        channel1.queue_declare(queue="one", exclusive=True)
+        channel1.queue_declare(queue="two", exclusive=True)
+        channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
+        channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
+
+        #create a xid
+        tx = self.xid("dummy")
+        #start work on one channel under that xid:
+        channel1.dtx_demarcation_start(xid=tx)
+        #then start on the other with the join flag set
+        channel2.dtx_demarcation_start(xid=tx, join=True)
+
+        #do work through each channel
+        self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
+        self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
+
+        #mark end on both channels
+        channel1.dtx_demarcation_end(xid=tx)
+        channel2.dtx_demarcation_end(xid=tx)
+        
+        #commit and check
+        channel1.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+        
+
+    def test_suspend_resume(self):
+        """
+        Test suspension and resumption of an association
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+
+        #setup
+        channel.queue_declare(queue="one", exclusive=True)
+        channel.queue_declare(queue="two", exclusive=True)
+        channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
+        channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
+
+        tx = self.xid("dummy")
+
+        channel.dtx_demarcation_start(xid=tx)
+        self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
+        channel.dtx_demarcation_end(xid=tx, suspend=True)
+
+        channel.dtx_demarcation_start(xid=tx, resume=True)
+        self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
+        channel.dtx_demarcation_end(xid=tx)
+        
+        #commit and check
+        channel.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+
+    def test_end_suspend_and_fail(self):
+        """        
+        Verify that the correct error is signalled if the suspend and
+        fail flag are both set when disassociating a transaction from
+        the channel        
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+        tx = self.xid("suspend_and_fail")
+        channel.dtx_demarcation_start(xid=tx)
+        try:
+            channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
+            self.fail("Suspend and fail both set, expected exception!")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+        #cleanup    
+        other = self.connect()
+        channel = other.channel(1)
+        channel.channel_open()
+        channel.dtx_coordination_rollback(xid=tx)
+        channel.channel_close()
+        other.close()
+    
+
+    def test_end_unknown_xid(self):
+        """        
+        Verifies that the correct exception is thrown when an attempt
+        is made to end the association for a xid not previously
+        associated with the channel
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+        tx = self.xid("unknown-xid")
+        try:
+            channel.dtx_demarcation_end(xid=tx)
+            self.fail("Attempted to end association with unknown xid, expected exception!")
+        except Closed, e:
+            #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
+            self.assertConnectionException(503, e.args[0])
+
+    def test_end(self):
+        """
+        Verify that the association is terminated by end and subsequent
+        operations are non-transactional        
+        """
+        channel = self.client.channel(2)
+        channel.channel_open()
+        channel.queue_declare(queue="tx-queue", exclusive=True)
+
+        #publish a message under a transaction
+        channel.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        channel.dtx_demarcation_start(xid=tx)
+        channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage")        
+        channel.dtx_demarcation_end(xid=tx)
+
+        #now that association with txn is ended, publish another message
+        channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage")
+
+        #check the second message is available, but not the first
+        self.assertMessageCount(1, "tx-queue")
+        channel.message_consume(queue="tx-queue", destination="results", no_ack=False)
+        msg = self.client.queue("results").get(timeout=1)
+        self.assertEqual("two", msg.message_id)
+        channel.message_cancel(destination="results")
+        #ack the message then close the channel
+        msg.ok()
+        channel.channel_close()
+
+        channel = self.channel        
+        #commit the transaction and check that the first message (and
+        #only the first message) is then delivered
+        channel.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "tx-queue")
+        self.assertMessageId("one", "tx-queue")
+
+    def test_invalid_commit_one_phase_true(self):
+        """
+        Test that a commit with one_phase = True is rejected if the
+        transaction in question has already been prepared.        
+        """
+        other = self.connect()
+        tester = other.channel(1)
+        tester.channel_open()
+        tester.queue_declare(queue="dummy", exclusive=True)
+        tester.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        tester.dtx_demarcation_start(xid=tx)
+        tester.message_transfer(routing_key="dummy", body="whatever")
+        tester.dtx_demarcation_end(xid=tx)
+        tester.dtx_coordination_prepare(xid=tx)
+        failed = False
+        try:
+            tester.dtx_coordination_commit(xid=tx, one_phase=True)
+        except Closed, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.channel.dtx_coordination_rollback(xid=tx)
+            self.assertConnectionException(503, e.args[0])
+        else:
+            tester.channel_close()
+            other.close()
+            self.fail("Invalid use of one_phase=True, expected exception!")
+
+    def test_invalid_commit_one_phase_false(self):
+        """
+        Test that a commit with one_phase = False is rejected if the
+        transaction in question has not yet been prepared.        
+        """
+        """
+        Test that a commit with one_phase = True is rejected if the
+        transaction in question has already been prepared.        
+        """
+        other = self.connect()
+        tester = other.channel(1)
+        tester.channel_open()
+        tester.queue_declare(queue="dummy", exclusive=True)
+        tester.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        tester.dtx_demarcation_start(xid=tx)
+        tester.message_transfer(routing_key="dummy", body="whatever")
+        tester.dtx_demarcation_end(xid=tx)
+        failed = False
+        try:
+            tester.dtx_coordination_commit(xid=tx, one_phase=False)
+        except Closed, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.channel.dtx_coordination_rollback(xid=tx)
+            self.assertConnectionException(503, e.args[0])
+        else:
+            tester.channel_close()
+            other.close()
+            self.fail("Invalid use of one_phase=False, expected exception!")
+
+    def test_implicit_end(self):
+        """
+        Test that an association is implicitly ended when the channel
+        is closed (whether by exception or explicit client request)
+        and the transaction in question is marked as rollback only.
+        """
+        channel1 = self.channel
+        channel2 = self.client.channel(2)
+        channel2.channel_open()
+
+        #setup:
+        channel2.queue_declare(queue="dummy", exclusive=True)
+        channel2.message_transfer(routing_key="dummy", body="whatever")
+        tx = self.xid("dummy")
+
+        channel2.dtx_demarcation_select()
+        channel2.dtx_demarcation_start(xid=tx)
+        channel2.message_get(queue="dummy", destination="dummy")
+        self.client.queue("dummy").get(timeout=1).ok()
+        channel2.message_transfer(routing_key="dummy", body="whatever")
+        channel2.channel_close()
+
+        self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags)
+        channel1.dtx_coordination_rollback(xid=tx)
+
+    def test_recover(self):
+        """
+        Test basic recover behaviour
+        """
+        channel = self.channel
+
+        channel.dtx_demarcation_select()
+        channel.queue_declare(queue="dummy", exclusive=True)
+
+        prepared = []
+        for i in range(1, 10):
+            tx = self.xid("tx%s" % (i))
+            channel.dtx_demarcation_start(xid=tx)
+            channel.message_transfer(routing_key="dummy", body="message%s" % (i))
+            channel.dtx_demarcation_end(xid=tx)
+            if i in [2, 5, 6, 8]:
+                channel.dtx_coordination_prepare(xid=tx)
+                prepared.append(tx)
+            else:    
+                channel.dtx_coordination_rollback(xid=tx)
+
+        indoubt = channel.dtx_coordination_recover().xids
+        #convert indoubt table to a list of xids (note: this will change for 0-10)
+        data = indoubt["xids"]
+        xids = []
+        pos = 0
+        while pos < len(data):
+            size = unpack("!B", data[pos])[0]
+            start = pos + 1
+            end = start + size
+            xid = data[start:end]
+            xids.append(xid)
+            pos = end
+        
+        #rollback the prepared transactions returned by recover
+        for x in xids:
+            channel.dtx_coordination_rollback(xid=x)            
+
+        #validate against the expected list of prepared transactions
+        actual = set(xids)
+        expected = set(prepared)
+        intersection = actual.intersection(expected)
+        
+        if intersection != expected:
+            missing = expected.difference(actual)
+            extra = actual.difference(expected)
+            for x in missing:
+                channel.dtx_coordination_rollback(xid=x)            
+            self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
+
+    def xid(self, txid, branchqual = ''):
+        return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
+        
+    def txswap(self, tx, id):
+        channel = self.channel
+        #declare two queues:
+        channel.queue_declare(queue="queue-a", exclusive=True)
+        channel.queue_declare(queue="queue-b", exclusive=True)
+        #put message with specified id on one queue:
+        channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage")
+
+        #start the transaction:
+        channel.dtx_demarcation_select()        
+        self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).flags)
+
+        #'swap' the message from one queue to the other, under that transaction:
+        self.swap(self.channel, "queue-a", "queue-b")
+
+        #mark the end of the transactional work:
+        self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).flags)
+
+    def swap(self, channel, src, dest):
+        #consume from src:
+        channel.message_get(destination="temp-swap", queue=src)
+        msg = self.client.queue("temp-swap").get(timeout=1)
+        msg.ok();        
+
+        #re-publish to dest
+        channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)        
+
+    def assertMessageCount(self, expected, queue):
+        self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count)
+
+    def assertMessageId(self, expected, queue):
+        self.channel.message_consume(queue=queue, destination="results", no_ack=True)
+        self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id)
+        self.channel.message_cancel(destination="results")

Propchange: incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py
------------------------------------------------------------------------------
    svn:eol-style = native