You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/04/20 19:11:30 UTC

svn commit: r530853 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ tests/

Author: gsim
Date: Fri Apr 20 10:11:23 2007
New Revision: 530853

URL: http://svn.apache.org/viewvc?view=rev&rev=530853
Log:
Added some dtx related unit tests
Added support for suspend and resume


Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Fri Apr 20 10:11:23 2007
@@ -26,6 +26,7 @@
 #include <functional>
 
 #include <boost/bind.hpp>
+#include <boost/format.hpp>
 
 #include "BrokerChannel.h"
 #include "qpid/framing/ChannelAdapter.h"
@@ -121,18 +122,44 @@
 }
 
 void Channel::startDtx(const std::string& xid, DtxManager& mgr){
-    dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer());
+    dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
     txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
     mgr.start(xid, dtxBuffer);
 }
 
-void Channel::endDtx(){
+void Channel::endDtx(const std::string& xid){
+    if (dtxBuffer->getXid() != xid) {
+        throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") 
+                                  % dtxBuffer->getXid() % xid);
+    }
+
     TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
     dtxBuffer->enlist(txAck);    
     dtxBuffer->markEnded();
     
     dtxBuffer.reset();
     txBuffer.reset();
+}
+
+void Channel::suspendDtx(const std::string& xid){
+    if (dtxBuffer->getXid() != xid) {
+        throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") 
+                                  % dtxBuffer->getXid() % xid);
+    }
+    dtxBuffer->setSuspended(true);
+    txBuffer.reset();
+}
+
+void Channel::resumeDtx(const std::string& xid){
+    if (dtxBuffer->getXid() != xid) {
+        throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") 
+                                  % dtxBuffer->getXid() % xid);
+    }
+    if (!dtxBuffer->isSuspended()) {
+        throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+    }
+    dtxBuffer->setSuspended(true);
+    txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
 }
 
 void Channel::deliver(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Fri Apr 20 10:11:23 2007
@@ -138,7 +138,9 @@
     void commit();
     void rollback();
     void startDtx(const std::string& xid, DtxManager& mgr);
-    void endDtx();
+    void endDtx(const std::string& xid);
+    void suspendDtx(const std::string& xid);
+    void resumeDtx(const std::string& xid);
     void ack();
     void ack(uint64_t deliveryTag, bool multiple);
     void ack(uint64_t deliveryTag, uint64_t endTag);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Fri Apr 20 10:11:23 2007
@@ -23,7 +23,7 @@
 using namespace qpid::broker;
 using qpid::sys::Mutex;
 
-DtxBuffer::DtxBuffer() : ended(false) {}
+DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false) {}
 
 DtxBuffer::~DtxBuffer() {}
 
@@ -38,3 +38,19 @@
     Mutex::ScopedLock locker(lock); 
     return ended; 
 }
+
+void DtxBuffer::setSuspended(bool isSuspended) 
+{ 
+    suspended = isSuspended; 
+}
+
+bool DtxBuffer::isSuspended() 
+{ 
+    return suspended; 
+}
+
+const std::string& DtxBuffer::getXid()
+{
+    return xid;
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h Fri Apr 20 10:11:23 2007
@@ -28,14 +28,19 @@
     namespace broker {
         class DtxBuffer : public TxBuffer{
             sys::Mutex lock;
+            const std::string xid;
             bool ended;
+            bool suspended;           
         public:
             typedef boost::shared_ptr<DtxBuffer> shared_ptr;
 
-            DtxBuffer();
+            DtxBuffer(const std::string& xid = "");
             ~DtxBuffer();
             void markEnded();
             bool isEnded();
+            void setSuspended(bool suspended);
+            bool isSuspended();
+            const std::string& getXid();
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Apr 20 10:11:23 2007
@@ -17,6 +17,7 @@
  */
 #include "DtxHandlerImpl.h"
 
+#include <boost/format.hpp>
 #include "Broker.h"
 #include "BrokerChannel.h"
 
@@ -30,18 +31,6 @@
 
 // DtxDemarcationHandler:
 
-void DtxHandlerImpl::end(const MethodContext& /*context*/,
-                         u_int16_t /*ticket*/,
-                         const string& /*xid*/,
-                         bool /*fail*/,
-                         bool /*suspend*/ )
-{
-    channel.endDtx();
-    //send end-ok
-    //TODO: handle fail and suspend
-    //TODO: check xid is as expected?
-}
-
 
 void DtxHandlerImpl::select(const MethodContext& /*context*/ )
 {
@@ -49,16 +38,38 @@
     //send select-ok
 }
 
+void DtxHandlerImpl::end(const MethodContext& /*context*/,
+                         u_int16_t /*ticket*/,
+                         const string& xid,
+                         bool fail,
+                         bool suspend)
+{
+    if (fail && suspend) {
+        throw ConnectionException(503, "End and suspend cannot both be set.");
+    }
+
+    //TODO: handle fail
+    if (suspend) {
+        channel.suspendDtx(xid);
+    } else {
+        channel.endDtx(xid);
+    }
+    //send end-ok
+}
 
 void DtxHandlerImpl::start(const MethodContext& /*context*/,
                            u_int16_t /*ticket*/,
                            const string& xid,
                            bool /*join*/,
-                           bool /*resume*/ )
+                           bool resume)
 {
-    channel.startDtx(xid, broker.getDtxManager());
+    //TODO: handle join
+    if (resume) {
+        channel.resumeDtx(xid);
+    } else {
+        channel.startDtx(xid, broker.getDtxManager());
+    }
     //send start-ok
-    //TODO: handle join and resume
 }
 
 // DtxCoordinationHandler:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri Apr 20 10:11:23 2007
@@ -25,7 +25,7 @@
 
 using namespace qpid::broker;
 
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store) {}
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false) {}
 
 DtxWorkRecord::~DtxWorkRecord() {}
 
@@ -65,6 +65,7 @@
         std::auto_ptr<TransactionContext> localtxn = store->begin();
         if (prepare(localtxn.get())) {
             store->commit(*localtxn);
+            for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
         } else {
             store->abort(*localtxn);
             abort();
@@ -103,5 +104,4 @@
         txn.reset();
     }
     for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
-
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Fri Apr 20 10:11:23 2007
@@ -31,6 +31,11 @@
 namespace qpid {
 namespace broker {
 
+/**
+ * Represents the work done under a particular distributed transaction
+ * across potentially multiple channels. Identified by a xid. Allows
+ * that work to be prepared, committed and rolled-back.
+ */
 class DtxWorkRecord
 {
     typedef std::vector<DtxBuffer::shared_ptr> Work;

Added: incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp?view=auto&rev=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp Fri Apr 20 10:11:23 2007
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/DtxWorkRecord.h"
+#include "qpid_test_plugin.h"
+#include <iostream>
+#include <vector>
+#include "TxMocks.h"
+
+using namespace qpid::broker;
+using boost::static_pointer_cast;
+
+class DtxWorkRecordTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(DtxWorkRecordTest);
+    CPPUNIT_TEST(testOnePhaseCommit);
+    CPPUNIT_TEST(testFailOnOnePhaseCommit);
+    CPPUNIT_TEST(testTwoPhaseCommit);
+    CPPUNIT_TEST(testFailOnTwoPhaseCommit);
+    CPPUNIT_TEST(testRollback);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    void testOnePhaseCommit(){
+        MockTransactionalStore store;
+        store.expectBegin().expectCommit();
+
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectPrepare().expectCommit();
+        MockTxOp::shared_ptr opB(new MockTxOp());
+        opB->expectPrepare().expectCommit();
+
+        DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+        bufferA->enlist(static_pointer_cast<TxOp>(opA));
+        bufferA->markEnded();
+        DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+        bufferB->enlist(static_pointer_cast<TxOp>(opB));
+        bufferB->markEnded();
+        
+        DtxWorkRecord work("my-xid", &store);
+        work.add(bufferA);
+        work.add(bufferB);
+
+        work.commit();
+
+        store.check();
+        CPPUNIT_ASSERT(store.isCommitted());
+        opA->check();
+        opB->check();
+    }
+
+    void testFailOnOnePhaseCommit(){
+        MockTransactionalStore store;
+        store.expectBegin().expectAbort();
+
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectPrepare().expectRollback();
+        MockTxOp::shared_ptr opB(new MockTxOp(true));
+        opB->expectPrepare().expectRollback();
+        MockTxOp::shared_ptr opC(new MockTxOp());
+        opC->expectRollback();
+
+        DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+        bufferA->enlist(static_pointer_cast<TxOp>(opA));
+        bufferA->markEnded();
+        DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+        bufferB->enlist(static_pointer_cast<TxOp>(opB));
+        bufferB->markEnded();
+        DtxBuffer::shared_ptr bufferC(new DtxBuffer());
+        bufferC->enlist(static_pointer_cast<TxOp>(opC));
+        bufferC->markEnded();
+        
+        DtxWorkRecord work("my-xid", &store);
+        work.add(bufferA);
+        work.add(bufferB);
+        work.add(bufferC);
+
+        work.commit();
+
+        CPPUNIT_ASSERT(store.isAborted());
+        store.check();
+
+        opA->check();
+        opB->check();
+        opC->check();
+    }
+
+    void testTwoPhaseCommit(){
+        MockTransactionalStore store;
+        store.expectBegin2PC().expectPrepare().expectCommit();
+
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectPrepare().expectCommit();
+        MockTxOp::shared_ptr opB(new MockTxOp());
+        opB->expectPrepare().expectCommit();
+
+        DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+        bufferA->enlist(static_pointer_cast<TxOp>(opA));
+        bufferA->markEnded();
+        DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+        bufferB->enlist(static_pointer_cast<TxOp>(opB));
+        bufferB->markEnded();
+        
+        DtxWorkRecord work("my-xid", &store);
+        work.add(bufferA);
+        work.add(bufferB);
+
+        CPPUNIT_ASSERT(work.prepare());
+        CPPUNIT_ASSERT(store.isPrepared());
+        work.commit();
+        store.check();
+        CPPUNIT_ASSERT(store.isCommitted());
+        opA->check();
+        opB->check();
+    }
+
+    void testFailOnTwoPhaseCommit(){
+        MockTransactionalStore store;
+        store.expectBegin2PC().expectAbort();
+
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectPrepare().expectRollback();
+        MockTxOp::shared_ptr opB(new MockTxOp(true));
+        opB->expectPrepare().expectRollback();
+        MockTxOp::shared_ptr opC(new MockTxOp());
+        opC->expectRollback();
+
+        DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+        bufferA->enlist(static_pointer_cast<TxOp>(opA));
+        bufferA->markEnded();
+        DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+        bufferB->enlist(static_pointer_cast<TxOp>(opB));
+        bufferB->markEnded();
+        DtxBuffer::shared_ptr bufferC(new DtxBuffer());
+        bufferC->enlist(static_pointer_cast<TxOp>(opC));
+        bufferC->markEnded();
+        
+        DtxWorkRecord work("my-xid", &store);
+        work.add(bufferA);
+        work.add(bufferB);
+        work.add(bufferC);
+
+        CPPUNIT_ASSERT(!work.prepare());
+        CPPUNIT_ASSERT(store.isAborted());
+        store.check();
+        opA->check();
+        opB->check();
+        opC->check();
+    }
+
+    void testRollback(){
+        MockTransactionalStore store;
+        store.expectBegin2PC().expectPrepare().expectAbort();
+
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectPrepare().expectRollback();
+        MockTxOp::shared_ptr opB(new MockTxOp());
+        opB->expectPrepare().expectRollback();
+
+        DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+        bufferA->enlist(static_pointer_cast<TxOp>(opA));
+        bufferA->markEnded();
+        DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+        bufferB->enlist(static_pointer_cast<TxOp>(opB));
+        bufferB->markEnded();
+        
+        DtxWorkRecord work("my-xid", &store);
+        work.add(bufferA);
+        work.add(bufferB);
+
+        CPPUNIT_ASSERT(work.prepare());
+        CPPUNIT_ASSERT(store.isPrepared());
+        work.rollback();
+        store.check();
+        CPPUNIT_ASSERT(store.isAborted());
+        opA->check();
+        opB->check();
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(DtxWorkRecordTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Apr 20 10:11:23 2007
@@ -6,6 +6,7 @@
   AccumulatedAckTest	\
   BrokerChannelTest 	\
   ConfigurationTest	\
+  DtxWorkRecordTest     \
   ExchangeTest		\
   HeadersExchangeTest	\
   InMemoryContentTest	\
@@ -72,6 +73,7 @@
   InProcessBroker.h				\
   MockChannel.h					\
   MockConnectionInputHandler.h			\
+  TxMocks.h					\
   qpid_test_plugin.h				\
   APRBaseTest.cpp
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp Fri Apr 20 10:11:23 2007
@@ -22,144 +22,13 @@
 #include "qpid_test_plugin.h"
 #include <iostream>
 #include <vector>
+#include "TxMocks.h"
 
 using namespace qpid::broker;
 using boost::static_pointer_cast;
 
-template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){
-    unsigned int i = 0;
-    while(i < expected.size() && i < actual.size()){
-        CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]);
-        i++;
-    }
-    CPPUNIT_ASSERT(i == expected.size());
-    CPPUNIT_ASSERT(i == actual.size());
-}
-
 class TxBufferTest : public CppUnit::TestCase  
 {
-    class MockTxOp : public TxOp{
-        enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8};
-        std::vector<int> expected;
-        std::vector<int> actual;
-        bool failOnPrepare;
-    public:
-        typedef boost::shared_ptr<MockTxOp> shared_ptr;
-
-        MockTxOp() : failOnPrepare(false) {}
-        MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
-
-        bool prepare(TransactionContext*) throw(){
-            actual.push_back(PREPARE);
-            return !failOnPrepare;
-        }
-        void commit()  throw(){
-            actual.push_back(COMMIT);
-        }
-        void rollback()  throw(){
-            actual.push_back(ROLLBACK);
-        }
-        MockTxOp& expectPrepare(){
-            expected.push_back(PREPARE);
-            return *this;
-        }
-        MockTxOp& expectCommit(){
-            expected.push_back(COMMIT);
-            return *this;
-        }
-        MockTxOp& expectRollback(){
-            expected.push_back(ROLLBACK);
-            return *this;
-        }
-        void check(){
-            assertEqualVector(expected, actual);
-        }
-        ~MockTxOp(){}        
-    };
-
-    class MockTransactionalStore : public TransactionalStore{
-        enum op_codes {BEGIN=2, COMMIT=4, ABORT=8};
-        std::vector<int> expected;
-        std::vector<int> actual;
-
-        enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3};
-        int state;
-
-        class TestTransactionContext : public TransactionContext{
-            MockTransactionalStore* store;
-        public:
-            TestTransactionContext(MockTransactionalStore* _store) : store(_store) {}
-            void commit(){
-                if(store->state != OPEN) throw "txn already completed";
-                store->state = COMMITTED;
-            }
-
-            void abort(){
-                if(store->state != OPEN) throw "txn already completed";
-                store->state = ABORTED;
-            }
-            ~TestTransactionContext(){}
-        };
-
-
-    public:
-        MockTransactionalStore() : state(OPEN){}
-
-        std::auto_ptr<TPCTransactionContext> begin(const std::string&){ 
-            throw "Operation not supported";
-        }
-        void prepare(TPCTransactionContext&){
-            throw "Operation not supported";
-        }
-        void collectPreparedXids(std::set<std::string>&)
-        {
-            throw "Operation not supported";            
-        }
-
-
-        std::auto_ptr<TransactionContext> begin(){ 
-            actual.push_back(BEGIN);
-            std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this));
-            return txn;
-        }
-        void commit(TransactionContext& ctxt){
-            actual.push_back(COMMIT);
-            dynamic_cast<TestTransactionContext&>(ctxt).commit();
-        }
-        void abort(TransactionContext& ctxt){
-            actual.push_back(ABORT);
-            dynamic_cast<TestTransactionContext&>(ctxt).abort();
-        }        
-        MockTransactionalStore& expectBegin(){
-            expected.push_back(BEGIN);
-            return *this;
-        }
-        MockTransactionalStore& expectCommit(){
-            expected.push_back(COMMIT);
-            return *this;
-        }
-        MockTransactionalStore& expectAbort(){
-            expected.push_back(ABORT);
-            return *this;
-        }
-        void check(){
-            assertEqualVector(expected, actual);
-        }
-
-        bool isCommitted(){
-            return state == COMMITTED;
-        }
-        
-        bool isAborted(){
-            return state == ABORTED;
-        }
-        
-        bool isOpen() const{
-            return state == OPEN;
-        }
-        ~MockTransactionalStore(){}
-    };
-
     CPPUNIT_TEST_SUITE(TxBufferTest);
     CPPUNIT_TEST(testCommitLocal);
     CPPUNIT_TEST(testFailOnCommitLocal);

Added: incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h?view=auto&rev=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h Fri Apr 20 10:11:23 2007
@@ -0,0 +1,227 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _tests_TxMocks_h
+#define _tests_TxMocks_h
+
+
+#include "qpid/Exception.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxOp.h"
+#include <boost/format.hpp>
+#include <iostream>
+#include <vector>
+
+using namespace qpid::broker;
+using boost::static_pointer_cast;
+using std::string;
+
+template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){
+    unsigned int i = 0;
+    while(i < expected.size() && i < actual.size()){
+        CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]);
+        i++;
+    }
+    if (i < expected.size()) {
+        throw qpid::Exception(boost::format("Missing %1%") % expected[i]);
+    } else if (i < actual.size()) {
+        throw qpid::Exception(boost::format("Extra %1%") % actual[i]);
+    }
+    CPPUNIT_ASSERT_EQUAL(expected.size(), actual.size());
+}
+
+class TxOpConstants{
+protected:
+    const string PREPARE;
+    const string COMMIT;
+    const string ROLLBACK;
+
+    TxOpConstants() : PREPARE("PREPARE"), COMMIT("COMMIT"), ROLLBACK("ROLLBACK") {}
+};
+
+class MockTxOp : public TxOp, public TxOpConstants{
+    std::vector<string> expected;
+    std::vector<string> actual;
+    bool failOnPrepare;
+    string debugName;
+public:
+    typedef boost::shared_ptr<MockTxOp> shared_ptr;
+    
+    MockTxOp() : failOnPrepare(false) {}
+    MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
+        
+    void setDebugName(string name){
+        debugName = name;
+    }
+
+    void printExpected(){        
+        std::cout << std::endl << "MockTxOp[" << debugName << "] expects: ";
+        for (std::vector<string>::iterator i = expected.begin(); i < expected.end(); i++) {
+            if(i != expected.begin()) std::cout << ", ";
+            std::cout << *i;
+        }
+        std::cout << std::endl;
+    }
+
+    void printActual(){        
+        std::cout << std::endl << "MockTxOp[" << debugName << "] actual: ";
+        for (std::vector<string>::iterator i = actual.begin(); i < actual.end(); i++) {
+            if(i != actual.begin()) std::cout << ", ";
+            std::cout << *i;
+        }
+        std::cout << std::endl;
+    }
+            
+    bool prepare(TransactionContext*) throw(){
+        actual.push_back(PREPARE);
+        return !failOnPrepare;
+    }
+    void commit()  throw(){
+        actual.push_back(COMMIT);
+    }
+    void rollback()  throw(){
+        if(!debugName.empty()) std::cout << std::endl << "MockTxOp[" << debugName << "]::rollback()" << std::endl;
+        actual.push_back(ROLLBACK);
+    }
+    MockTxOp& expectPrepare(){
+        expected.push_back(PREPARE);
+        return *this;
+    }
+    MockTxOp& expectCommit(){
+        expected.push_back(COMMIT);
+        return *this;
+    }
+    MockTxOp& expectRollback(){
+        expected.push_back(ROLLBACK);
+        return *this;
+    }
+    void check(){
+        assertEqualVector(expected, actual);
+    }
+    ~MockTxOp(){}        
+};
+
+class MockTransactionalStore : public TransactionalStore{
+    const string BEGIN;
+    const string BEGIN2PC;
+    const string PREPARE;
+    const string COMMIT;
+    const string ABORT;
+    std::vector<string> expected;
+    std::vector<string> actual;
+    
+    enum states {OPEN = 1, PREPARED = 2, COMMITTED = 3, ABORTED = 4};
+    int state;
+    
+    class TestTransactionContext : public TPCTransactionContext{
+        MockTransactionalStore* store;
+    public:
+        TestTransactionContext(MockTransactionalStore* _store) : store(_store) {}
+        void prepare(){
+            if(!store->isOpen()) throw "txn already completed";
+            store->state = PREPARED;
+        }
+
+        void commit(){
+            if(!store->isOpen() && !store->isPrepared()) throw "txn already completed";
+            store->state = COMMITTED;
+        }
+        
+        void abort(){
+            if(!store->isOpen() && !store->isPrepared()) throw "txn already completed";
+            store->state = ABORTED;
+        }
+        ~TestTransactionContext(){}
+    };
+    
+public:
+    MockTransactionalStore() :
+            BEGIN("BEGIN"), BEGIN2PC("BEGIN2PC"), PREPARE("PREPARE"), COMMIT("COMMIT"), ABORT("ABORT"),  state(OPEN){}
+
+    void collectPreparedXids(std::set<std::string>&)
+    {
+        throw "Operation not supported";            
+    }
+        
+    std::auto_ptr<TPCTransactionContext> begin(const std::string&){ 
+        actual.push_back(BEGIN2PC);
+        std::auto_ptr<TPCTransactionContext> txn(new TestTransactionContext(this));
+        return txn;
+    }
+    std::auto_ptr<TransactionContext> begin(){ 
+        actual.push_back(BEGIN);
+        std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this));
+        return txn;
+    }
+    void prepare(TPCTransactionContext& ctxt){
+        actual.push_back(PREPARE);
+        dynamic_cast<TestTransactionContext&>(ctxt).prepare();
+    }
+    void commit(TransactionContext& ctxt){
+        actual.push_back(COMMIT);
+        dynamic_cast<TestTransactionContext&>(ctxt).commit();
+    }
+    void abort(TransactionContext& ctxt){
+        actual.push_back(ABORT);
+        dynamic_cast<TestTransactionContext&>(ctxt).abort();
+    }        
+    MockTransactionalStore& expectBegin(){
+        expected.push_back(BEGIN);
+        return *this;
+    }
+    MockTransactionalStore& expectBegin2PC(){
+        expected.push_back(BEGIN2PC);
+        return *this;
+    }
+    MockTransactionalStore& expectPrepare(){
+        expected.push_back(PREPARE);
+        return *this;
+    }
+    MockTransactionalStore& expectCommit(){
+        expected.push_back(COMMIT);
+        return *this;
+    }
+    MockTransactionalStore& expectAbort(){
+        expected.push_back(ABORT);
+        return *this;
+    }
+    void check(){
+        assertEqualVector(expected, actual);
+    }
+    
+    bool isPrepared(){
+        return state == PREPARED;
+    }
+    
+    bool isCommitted(){
+        return state == COMMITTED;
+    }
+    
+    bool isAborted(){
+        return state == ABORTED;
+    }
+    
+    bool isOpen() const{
+        return state == OPEN;
+    }
+    ~MockTransactionalStore(){}
+};
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date