You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/04/20 19:11:30 UTC
svn commit: r530853 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/
tests/
Author: gsim
Date: Fri Apr 20 10:11:23 2007
New Revision: 530853
URL: http://svn.apache.org/viewvc?view=rev&rev=530853
Log:
Added some dtx related unit tests
Added support for suspend and resume
Added:
incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Fri Apr 20 10:11:23 2007
@@ -26,6 +26,7 @@
#include <functional>
#include <boost/bind.hpp>
+#include <boost/format.hpp>
#include "BrokerChannel.h"
#include "qpid/framing/ChannelAdapter.h"
@@ -121,18 +122,44 @@
}
void Channel::startDtx(const std::string& xid, DtxManager& mgr){
- dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer());
+ dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
mgr.start(xid, dtxBuffer);
}
-void Channel::endDtx(){
+void Channel::endDtx(const std::string& xid){
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
+ % dtxBuffer->getXid() % xid);
+ }
+
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
dtxBuffer->enlist(txAck);
dtxBuffer->markEnded();
dtxBuffer.reset();
txBuffer.reset();
+}
+
+void Channel::suspendDtx(const std::string& xid){
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
+ % dtxBuffer->getXid() % xid);
+ }
+ dtxBuffer->setSuspended(true);
+ txBuffer.reset();
+}
+
+void Channel::resumeDtx(const std::string& xid){
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
+ % dtxBuffer->getXid() % xid);
+ }
+ if (!dtxBuffer->isSuspended()) {
+ throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+ }
+ dtxBuffer->setSuspended(true);
+ txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
}
void Channel::deliver(
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Fri Apr 20 10:11:23 2007
@@ -138,7 +138,9 @@
void commit();
void rollback();
void startDtx(const std::string& xid, DtxManager& mgr);
- void endDtx();
+ void endDtx(const std::string& xid);
+ void suspendDtx(const std::string& xid);
+ void resumeDtx(const std::string& xid);
void ack();
void ack(uint64_t deliveryTag, bool multiple);
void ack(uint64_t deliveryTag, uint64_t endTag);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Fri Apr 20 10:11:23 2007
@@ -23,7 +23,7 @@
using namespace qpid::broker;
using qpid::sys::Mutex;
-DtxBuffer::DtxBuffer() : ended(false) {}
+DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false) {}
DtxBuffer::~DtxBuffer() {}
@@ -38,3 +38,19 @@
Mutex::ScopedLock locker(lock);
return ended;
}
+
+void DtxBuffer::setSuspended(bool isSuspended)
+{
+ suspended = isSuspended;
+}
+
+bool DtxBuffer::isSuspended()
+{
+ return suspended;
+}
+
+const std::string& DtxBuffer::getXid()
+{
+ return xid;
+}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h Fri Apr 20 10:11:23 2007
@@ -28,14 +28,19 @@
namespace broker {
class DtxBuffer : public TxBuffer{
sys::Mutex lock;
+ const std::string xid;
bool ended;
+ bool suspended;
public:
typedef boost::shared_ptr<DtxBuffer> shared_ptr;
- DtxBuffer();
+ DtxBuffer(const std::string& xid = "");
~DtxBuffer();
void markEnded();
bool isEnded();
+ void setSuspended(bool suspended);
+ bool isSuspended();
+ const std::string& getXid();
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Apr 20 10:11:23 2007
@@ -17,6 +17,7 @@
*/
#include "DtxHandlerImpl.h"
+#include <boost/format.hpp>
#include "Broker.h"
#include "BrokerChannel.h"
@@ -30,18 +31,6 @@
// DtxDemarcationHandler:
-void DtxHandlerImpl::end(const MethodContext& /*context*/,
- u_int16_t /*ticket*/,
- const string& /*xid*/,
- bool /*fail*/,
- bool /*suspend*/ )
-{
- channel.endDtx();
- //send end-ok
- //TODO: handle fail and suspend
- //TODO: check xid is as expected?
-}
-
void DtxHandlerImpl::select(const MethodContext& /*context*/ )
{
@@ -49,16 +38,38 @@
//send select-ok
}
+void DtxHandlerImpl::end(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ const string& xid,
+ bool fail,
+ bool suspend)
+{
+ if (fail && suspend) {
+ throw ConnectionException(503, "End and suspend cannot both be set.");
+ }
+
+ //TODO: handle fail
+ if (suspend) {
+ channel.suspendDtx(xid);
+ } else {
+ channel.endDtx(xid);
+ }
+ //send end-ok
+}
void DtxHandlerImpl::start(const MethodContext& /*context*/,
u_int16_t /*ticket*/,
const string& xid,
bool /*join*/,
- bool /*resume*/ )
+ bool resume)
{
- channel.startDtx(xid, broker.getDtxManager());
+ //TODO: handle join
+ if (resume) {
+ channel.resumeDtx(xid);
+ } else {
+ channel.startDtx(xid, broker.getDtxManager());
+ }
//send start-ok
- //TODO: handle join and resume
}
// DtxCoordinationHandler:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri Apr 20 10:11:23 2007
@@ -25,7 +25,7 @@
using namespace qpid::broker;
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store) {}
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false) {}
DtxWorkRecord::~DtxWorkRecord() {}
@@ -65,6 +65,7 @@
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
store->commit(*localtxn);
+ for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
} else {
store->abort(*localtxn);
abort();
@@ -103,5 +104,4 @@
txn.reset();
}
for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
-
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Fri Apr 20 10:11:23 2007
@@ -31,6 +31,11 @@
namespace qpid {
namespace broker {
+/**
+ * Represents the work done under a particular distributed transaction
+ * across potentially multiple channels. Identified by a xid. Allows
+ * that work to be prepared, committed and rolled-back.
+ */
class DtxWorkRecord
{
typedef std::vector<DtxBuffer::shared_ptr> Work;
Added: incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp?view=auto&rev=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp Fri Apr 20 10:11:23 2007
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/DtxWorkRecord.h"
+#include "qpid_test_plugin.h"
+#include <iostream>
+#include <vector>
+#include "TxMocks.h"
+
+using namespace qpid::broker;
+using boost::static_pointer_cast;
+
+class DtxWorkRecordTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(DtxWorkRecordTest);
+ CPPUNIT_TEST(testOnePhaseCommit);
+ CPPUNIT_TEST(testFailOnOnePhaseCommit);
+ CPPUNIT_TEST(testTwoPhaseCommit);
+ CPPUNIT_TEST(testFailOnTwoPhaseCommit);
+ CPPUNIT_TEST(testRollback);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void testOnePhaseCommit(){
+ MockTransactionalStore store;
+ store.expectBegin().expectCommit();
+
+ MockTxOp::shared_ptr opA(new MockTxOp());
+ opA->expectPrepare().expectCommit();
+ MockTxOp::shared_ptr opB(new MockTxOp());
+ opB->expectPrepare().expectCommit();
+
+ DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ bufferA->enlist(static_pointer_cast<TxOp>(opA));
+ bufferA->markEnded();
+ DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ bufferB->enlist(static_pointer_cast<TxOp>(opB));
+ bufferB->markEnded();
+
+ DtxWorkRecord work("my-xid", &store);
+ work.add(bufferA);
+ work.add(bufferB);
+
+ work.commit();
+
+ store.check();
+ CPPUNIT_ASSERT(store.isCommitted());
+ opA->check();
+ opB->check();
+ }
+
+ void testFailOnOnePhaseCommit(){
+ MockTransactionalStore store;
+ store.expectBegin().expectAbort();
+
+ MockTxOp::shared_ptr opA(new MockTxOp());
+ opA->expectPrepare().expectRollback();
+ MockTxOp::shared_ptr opB(new MockTxOp(true));
+ opB->expectPrepare().expectRollback();
+ MockTxOp::shared_ptr opC(new MockTxOp());
+ opC->expectRollback();
+
+ DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ bufferA->enlist(static_pointer_cast<TxOp>(opA));
+ bufferA->markEnded();
+ DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ bufferB->enlist(static_pointer_cast<TxOp>(opB));
+ bufferB->markEnded();
+ DtxBuffer::shared_ptr bufferC(new DtxBuffer());
+ bufferC->enlist(static_pointer_cast<TxOp>(opC));
+ bufferC->markEnded();
+
+ DtxWorkRecord work("my-xid", &store);
+ work.add(bufferA);
+ work.add(bufferB);
+ work.add(bufferC);
+
+ work.commit();
+
+ CPPUNIT_ASSERT(store.isAborted());
+ store.check();
+
+ opA->check();
+ opB->check();
+ opC->check();
+ }
+
+ void testTwoPhaseCommit(){
+ MockTransactionalStore store;
+ store.expectBegin2PC().expectPrepare().expectCommit();
+
+ MockTxOp::shared_ptr opA(new MockTxOp());
+ opA->expectPrepare().expectCommit();
+ MockTxOp::shared_ptr opB(new MockTxOp());
+ opB->expectPrepare().expectCommit();
+
+ DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ bufferA->enlist(static_pointer_cast<TxOp>(opA));
+ bufferA->markEnded();
+ DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ bufferB->enlist(static_pointer_cast<TxOp>(opB));
+ bufferB->markEnded();
+
+ DtxWorkRecord work("my-xid", &store);
+ work.add(bufferA);
+ work.add(bufferB);
+
+ CPPUNIT_ASSERT(work.prepare());
+ CPPUNIT_ASSERT(store.isPrepared());
+ work.commit();
+ store.check();
+ CPPUNIT_ASSERT(store.isCommitted());
+ opA->check();
+ opB->check();
+ }
+
+ void testFailOnTwoPhaseCommit(){
+ MockTransactionalStore store;
+ store.expectBegin2PC().expectAbort();
+
+ MockTxOp::shared_ptr opA(new MockTxOp());
+ opA->expectPrepare().expectRollback();
+ MockTxOp::shared_ptr opB(new MockTxOp(true));
+ opB->expectPrepare().expectRollback();
+ MockTxOp::shared_ptr opC(new MockTxOp());
+ opC->expectRollback();
+
+ DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ bufferA->enlist(static_pointer_cast<TxOp>(opA));
+ bufferA->markEnded();
+ DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ bufferB->enlist(static_pointer_cast<TxOp>(opB));
+ bufferB->markEnded();
+ DtxBuffer::shared_ptr bufferC(new DtxBuffer());
+ bufferC->enlist(static_pointer_cast<TxOp>(opC));
+ bufferC->markEnded();
+
+ DtxWorkRecord work("my-xid", &store);
+ work.add(bufferA);
+ work.add(bufferB);
+ work.add(bufferC);
+
+ CPPUNIT_ASSERT(!work.prepare());
+ CPPUNIT_ASSERT(store.isAborted());
+ store.check();
+ opA->check();
+ opB->check();
+ opC->check();
+ }
+
+ void testRollback(){
+ MockTransactionalStore store;
+ store.expectBegin2PC().expectPrepare().expectAbort();
+
+ MockTxOp::shared_ptr opA(new MockTxOp());
+ opA->expectPrepare().expectRollback();
+ MockTxOp::shared_ptr opB(new MockTxOp());
+ opB->expectPrepare().expectRollback();
+
+ DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ bufferA->enlist(static_pointer_cast<TxOp>(opA));
+ bufferA->markEnded();
+ DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ bufferB->enlist(static_pointer_cast<TxOp>(opB));
+ bufferB->markEnded();
+
+ DtxWorkRecord work("my-xid", &store);
+ work.add(bufferA);
+ work.add(bufferB);
+
+ CPPUNIT_ASSERT(work.prepare());
+ CPPUNIT_ASSERT(store.isPrepared());
+ work.rollback();
+ store.check();
+ CPPUNIT_ASSERT(store.isAborted());
+ opA->check();
+ opB->check();
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(DtxWorkRecordTest);
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/DtxWorkRecordTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Apr 20 10:11:23 2007
@@ -6,6 +6,7 @@
AccumulatedAckTest \
BrokerChannelTest \
ConfigurationTest \
+ DtxWorkRecordTest \
ExchangeTest \
HeadersExchangeTest \
InMemoryContentTest \
@@ -72,6 +73,7 @@
InProcessBroker.h \
MockChannel.h \
MockConnectionInputHandler.h \
+ TxMocks.h \
qpid_test_plugin.h \
APRBaseTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp?view=diff&rev=530853&r1=530852&r2=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp Fri Apr 20 10:11:23 2007
@@ -22,144 +22,13 @@
#include "qpid_test_plugin.h"
#include <iostream>
#include <vector>
+#include "TxMocks.h"
using namespace qpid::broker;
using boost::static_pointer_cast;
-template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){
- unsigned int i = 0;
- while(i < expected.size() && i < actual.size()){
- CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]);
- i++;
- }
- CPPUNIT_ASSERT(i == expected.size());
- CPPUNIT_ASSERT(i == actual.size());
-}
-
class TxBufferTest : public CppUnit::TestCase
{
- class MockTxOp : public TxOp{
- enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8};
- std::vector<int> expected;
- std::vector<int> actual;
- bool failOnPrepare;
- public:
- typedef boost::shared_ptr<MockTxOp> shared_ptr;
-
- MockTxOp() : failOnPrepare(false) {}
- MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
-
- bool prepare(TransactionContext*) throw(){
- actual.push_back(PREPARE);
- return !failOnPrepare;
- }
- void commit() throw(){
- actual.push_back(COMMIT);
- }
- void rollback() throw(){
- actual.push_back(ROLLBACK);
- }
- MockTxOp& expectPrepare(){
- expected.push_back(PREPARE);
- return *this;
- }
- MockTxOp& expectCommit(){
- expected.push_back(COMMIT);
- return *this;
- }
- MockTxOp& expectRollback(){
- expected.push_back(ROLLBACK);
- return *this;
- }
- void check(){
- assertEqualVector(expected, actual);
- }
- ~MockTxOp(){}
- };
-
- class MockTransactionalStore : public TransactionalStore{
- enum op_codes {BEGIN=2, COMMIT=4, ABORT=8};
- std::vector<int> expected;
- std::vector<int> actual;
-
- enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3};
- int state;
-
- class TestTransactionContext : public TransactionContext{
- MockTransactionalStore* store;
- public:
- TestTransactionContext(MockTransactionalStore* _store) : store(_store) {}
- void commit(){
- if(store->state != OPEN) throw "txn already completed";
- store->state = COMMITTED;
- }
-
- void abort(){
- if(store->state != OPEN) throw "txn already completed";
- store->state = ABORTED;
- }
- ~TestTransactionContext(){}
- };
-
-
- public:
- MockTransactionalStore() : state(OPEN){}
-
- std::auto_ptr<TPCTransactionContext> begin(const std::string&){
- throw "Operation not supported";
- }
- void prepare(TPCTransactionContext&){
- throw "Operation not supported";
- }
- void collectPreparedXids(std::set<std::string>&)
- {
- throw "Operation not supported";
- }
-
-
- std::auto_ptr<TransactionContext> begin(){
- actual.push_back(BEGIN);
- std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this));
- return txn;
- }
- void commit(TransactionContext& ctxt){
- actual.push_back(COMMIT);
- dynamic_cast<TestTransactionContext&>(ctxt).commit();
- }
- void abort(TransactionContext& ctxt){
- actual.push_back(ABORT);
- dynamic_cast<TestTransactionContext&>(ctxt).abort();
- }
- MockTransactionalStore& expectBegin(){
- expected.push_back(BEGIN);
- return *this;
- }
- MockTransactionalStore& expectCommit(){
- expected.push_back(COMMIT);
- return *this;
- }
- MockTransactionalStore& expectAbort(){
- expected.push_back(ABORT);
- return *this;
- }
- void check(){
- assertEqualVector(expected, actual);
- }
-
- bool isCommitted(){
- return state == COMMITTED;
- }
-
- bool isAborted(){
- return state == ABORTED;
- }
-
- bool isOpen() const{
- return state == OPEN;
- }
- ~MockTransactionalStore(){}
- };
-
CPPUNIT_TEST_SUITE(TxBufferTest);
CPPUNIT_TEST(testCommitLocal);
CPPUNIT_TEST(testFailOnCommitLocal);
Added: incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h?view=auto&rev=530853
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h Fri Apr 20 10:11:23 2007
@@ -0,0 +1,227 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _tests_TxMocks_h
+#define _tests_TxMocks_h
+
+
+#include "qpid/Exception.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxOp.h"
+#include <boost/format.hpp>
+#include <iostream>
+#include <vector>
+
+using namespace qpid::broker;
+using boost::static_pointer_cast;
+using std::string;
+
+template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){
+ unsigned int i = 0;
+ while(i < expected.size() && i < actual.size()){
+ CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]);
+ i++;
+ }
+ if (i < expected.size()) {
+ throw qpid::Exception(boost::format("Missing %1%") % expected[i]);
+ } else if (i < actual.size()) {
+ throw qpid::Exception(boost::format("Extra %1%") % actual[i]);
+ }
+ CPPUNIT_ASSERT_EQUAL(expected.size(), actual.size());
+}
+
+class TxOpConstants{
+protected:
+ const string PREPARE;
+ const string COMMIT;
+ const string ROLLBACK;
+
+ TxOpConstants() : PREPARE("PREPARE"), COMMIT("COMMIT"), ROLLBACK("ROLLBACK") {}
+};
+
+class MockTxOp : public TxOp, public TxOpConstants{
+ std::vector<string> expected;
+ std::vector<string> actual;
+ bool failOnPrepare;
+ string debugName;
+public:
+ typedef boost::shared_ptr<MockTxOp> shared_ptr;
+
+ MockTxOp() : failOnPrepare(false) {}
+ MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
+
+ void setDebugName(string name){
+ debugName = name;
+ }
+
+ void printExpected(){
+ std::cout << std::endl << "MockTxOp[" << debugName << "] expects: ";
+ for (std::vector<string>::iterator i = expected.begin(); i < expected.end(); i++) {
+ if(i != expected.begin()) std::cout << ", ";
+ std::cout << *i;
+ }
+ std::cout << std::endl;
+ }
+
+ void printActual(){
+ std::cout << std::endl << "MockTxOp[" << debugName << "] actual: ";
+ for (std::vector<string>::iterator i = actual.begin(); i < actual.end(); i++) {
+ if(i != actual.begin()) std::cout << ", ";
+ std::cout << *i;
+ }
+ std::cout << std::endl;
+ }
+
+ bool prepare(TransactionContext*) throw(){
+ actual.push_back(PREPARE);
+ return !failOnPrepare;
+ }
+ void commit() throw(){
+ actual.push_back(COMMIT);
+ }
+ void rollback() throw(){
+ if(!debugName.empty()) std::cout << std::endl << "MockTxOp[" << debugName << "]::rollback()" << std::endl;
+ actual.push_back(ROLLBACK);
+ }
+ MockTxOp& expectPrepare(){
+ expected.push_back(PREPARE);
+ return *this;
+ }
+ MockTxOp& expectCommit(){
+ expected.push_back(COMMIT);
+ return *this;
+ }
+ MockTxOp& expectRollback(){
+ expected.push_back(ROLLBACK);
+ return *this;
+ }
+ void check(){
+ assertEqualVector(expected, actual);
+ }
+ ~MockTxOp(){}
+};
+
+class MockTransactionalStore : public TransactionalStore{
+ const string BEGIN;
+ const string BEGIN2PC;
+ const string PREPARE;
+ const string COMMIT;
+ const string ABORT;
+ std::vector<string> expected;
+ std::vector<string> actual;
+
+ enum states {OPEN = 1, PREPARED = 2, COMMITTED = 3, ABORTED = 4};
+ int state;
+
+ class TestTransactionContext : public TPCTransactionContext{
+ MockTransactionalStore* store;
+ public:
+ TestTransactionContext(MockTransactionalStore* _store) : store(_store) {}
+ void prepare(){
+ if(!store->isOpen()) throw "txn already completed";
+ store->state = PREPARED;
+ }
+
+ void commit(){
+ if(!store->isOpen() && !store->isPrepared()) throw "txn already completed";
+ store->state = COMMITTED;
+ }
+
+ void abort(){
+ if(!store->isOpen() && !store->isPrepared()) throw "txn already completed";
+ store->state = ABORTED;
+ }
+ ~TestTransactionContext(){}
+ };
+
+public:
+ MockTransactionalStore() :
+ BEGIN("BEGIN"), BEGIN2PC("BEGIN2PC"), PREPARE("PREPARE"), COMMIT("COMMIT"), ABORT("ABORT"), state(OPEN){}
+
+ void collectPreparedXids(std::set<std::string>&)
+ {
+ throw "Operation not supported";
+ }
+
+ std::auto_ptr<TPCTransactionContext> begin(const std::string&){
+ actual.push_back(BEGIN2PC);
+ std::auto_ptr<TPCTransactionContext> txn(new TestTransactionContext(this));
+ return txn;
+ }
+ std::auto_ptr<TransactionContext> begin(){
+ actual.push_back(BEGIN);
+ std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this));
+ return txn;
+ }
+ void prepare(TPCTransactionContext& ctxt){
+ actual.push_back(PREPARE);
+ dynamic_cast<TestTransactionContext&>(ctxt).prepare();
+ }
+ void commit(TransactionContext& ctxt){
+ actual.push_back(COMMIT);
+ dynamic_cast<TestTransactionContext&>(ctxt).commit();
+ }
+ void abort(TransactionContext& ctxt){
+ actual.push_back(ABORT);
+ dynamic_cast<TestTransactionContext&>(ctxt).abort();
+ }
+ MockTransactionalStore& expectBegin(){
+ expected.push_back(BEGIN);
+ return *this;
+ }
+ MockTransactionalStore& expectBegin2PC(){
+ expected.push_back(BEGIN2PC);
+ return *this;
+ }
+ MockTransactionalStore& expectPrepare(){
+ expected.push_back(PREPARE);
+ return *this;
+ }
+ MockTransactionalStore& expectCommit(){
+ expected.push_back(COMMIT);
+ return *this;
+ }
+ MockTransactionalStore& expectAbort(){
+ expected.push_back(ABORT);
+ return *this;
+ }
+ void check(){
+ assertEqualVector(expected, actual);
+ }
+
+ bool isPrepared(){
+ return state == PREPARED;
+ }
+
+ bool isCommitted(){
+ return state == COMMITTED;
+ }
+
+ bool isAborted(){
+ return state == ABORTED;
+ }
+
+ bool isOpen() const{
+ return state == OPEN;
+ }
+ ~MockTransactionalStore(){}
+};
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
------------------------------------------------------------------------------
svn:keywords = Rev Date