You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/04/05 13:48:07 UTC

svn commit: r525801 - in /incubator/qpid/trunk/qpid/cpp/src: ./ broker/ tests/

Author: gsim
Date: Thu Apr  5 04:48:05 2007
New Revision: 525801

URL: http://svn.apache.org/viewvc?view=rev&rev=525801
Log:
* Further (minor) changes to the interface between store and broker.
* TxBuffer now uses shared_ptr to TxOps (removed DeletingTxOp)
* Queue now persists the field table of settings 


Removed:
    incubator/qpid/trunk/qpid/cpp/src/broker/DeletingTxOp.cpp
    incubator/qpid/trunk/qpid/cpp/src/broker/DeletingTxOp.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageBase.h
    incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/broker/Persistable.h
    incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableQueue.h
    incubator/qpid/trunk/qpid/cpp/src/broker/RecoveryManagerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.h
    incubator/qpid/trunk/qpid/cpp/src/broker/TxOp.h
    incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Apr  5 04:48:05 2007
@@ -136,8 +136,6 @@
   $(broker)/ConnectionToken.h				\
   $(broker)/Consumer.h					\
   $(broker)/Content.h					\
-  $(broker)/DeletingTxOp.cpp				\
-  $(broker)/DeletingTxOp.h				\
   $(broker)/Deliverable.h					\
   $(broker)/DeliverableMessage.cpp			\
   $(broker)/DeliverableMessage.h				\

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerChannel.cpp?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerChannel.cpp Thu Apr  5 04:48:05 2007
@@ -28,7 +28,6 @@
 #include <boost/bind.hpp>
 
 #include "BrokerChannel.h"
-#include "DeletingTxOp.h"
 #include "../framing/ChannelAdapter.h"
 #include "../QpidError.h"
 #include "DeliverableMessage.h"
@@ -110,8 +109,8 @@
 }
 
 void Channel::commit(){
-    TxAck txAck(accumulatedAck, unacked);
-    txBuffer.enlist(&txAck);
+    TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+    txBuffer.enlist(txAck);
     if(txBuffer.prepare(store)){
         txBuffer.commit();
     }
@@ -186,11 +185,12 @@
     Exchange::shared_ptr exchange =
         connection.broker.getExchanges().get(msg->getExchange());
     if(transactional){
-        TxPublish* deliverable = new TxPublish(msg);
+        TxPublish* deliverable(new TxPublish(msg));
+        TxOp::shared_ptr op(deliverable);
         exchange->route(
             *deliverable, msg->getRoutingKey(),
             &(msg->getApplicationHeaders()));
-        txBuffer.enlist(new DeletingTxOp(deliverable));
+        txBuffer.enlist(op);
     }else{
         DeliverableMessage deliverable(msg);
         exchange->route(
@@ -223,10 +223,11 @@
         connection.broker.getExchanges().get(msg->getExchange());
     assert(exchange.get());
     if(transactional) {
-        std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
+        TxPublish* deliverable(new TxPublish(msg));
+        TxOp::shared_ptr op(deliverable);
         exchange->route(*deliverable, msg->getRoutingKey(),
                         &(msg->getApplicationHeaders()));
-        txBuffer.enlist(new DeletingTxOp(deliverable.release()));
+        txBuffer.enlist(op);
     } else {
         DeliverableMessage deliverable(msg);
         exchange->route(deliverable, msg->getRoutingKey(),

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageBase.h?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerMessageBase.h Thu Apr  5 04:48:05 2007
@@ -89,7 +89,7 @@
     
     void setRouting(const std::string& _exchange, const std::string& _routingKey)
     { exchange = _exchange; routingKey = _routingKey; } 
-    void setPersistenceId(uint64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
+    void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
     void redeliver() { redelivered = true; }
 
     /**
@@ -171,7 +171,7 @@
     std::string routingKey;
     const bool mandatory;
     const bool immediate;
-    uint64_t persistenceId;
+    mutable uint64_t persistenceId;
     bool redelivered;
     AMQMethodBodyPtr respondTo;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.cpp?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.cpp Thu Apr  5 04:48:05 2007
@@ -216,19 +216,20 @@
     const std::string qpidMaxCount("qpid.max_count");
 }
 
-void Queue::create(const FieldTable& settings)
+void Queue::create(const FieldTable& _settings)
 {
+    settings = _settings;
     //TODO: hold onto settings and persist them as part of encode
     //      in fact settings should be passed in on construction
     if (store) {
         store->create(*this);
     }
-    configure(settings);
+    configure(_settings);
 }
 
-void Queue::configure(const FieldTable& settings)
+void Queue::configure(const FieldTable& _settings)
 {
-    std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(settings));
+    std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
     if (_policy->getMaxCount() || _policy->getMaxSize()) 
         setPolicy(_policy);
 }
@@ -255,7 +256,7 @@
     return persistenceId; 
 }
 
-void Queue::setPersistenceId(uint64_t _persistenceId)
+void Queue::setPersistenceId(uint64_t _persistenceId) const
 { 
     persistenceId = _persistenceId; 
 }
@@ -263,13 +264,12 @@
 void Queue::encode(framing::Buffer& buffer) const 
 {
     buffer.putShortString(name);
-    //TODO store all required properties
+    buffer.putFieldTable(settings);
 }
 
 uint32_t Queue::encodedSize() const
 {
-    //TODO, revise when storing full set of queue properties
-    return name.size() + 1/*short string size octet*/;
+    return name.size() + 1/*short string size octet*/ + settings.size();
 }
 
 Queue::shared_ptr Queue::decode(QueueRegistry& queues, framing::Buffer& buffer)
@@ -277,6 +277,8 @@
     string name;
     buffer.getShortString(name);
     std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+    buffer.getFieldTable(result.first->settings);
+    result.first->configure(result.first->settings);
     return result.first;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.h?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/BrokerQueue.h Thu Apr  5 04:48:05 2007
@@ -70,6 +70,7 @@
             int64_t lastUsed;
             Consumer* exclusive;
             mutable uint64_t persistenceId;
+            framing::FieldTable settings;
             std::auto_ptr<QueuePolicy> policy;            
 
             void pop();
@@ -138,7 +139,7 @@
 
             //PersistableQueue support:
             uint64_t getPersistenceId() const;
-            void setPersistenceId(uint64_t persistenceId);
+            void setPersistenceId(uint64_t persistenceId) const;
             void encode(framing::Buffer& buffer) const;
             uint32_t encodedSize() const;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/Persistable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/Persistable.h?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/Persistable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/Persistable.h Thu Apr  5 04:48:05 2007
@@ -37,7 +37,7 @@
     /**
      * Allows the store to attach its own identifier to this object
      */
-    virtual void setPersistenceId(uint64_t id) = 0;
+    virtual void setPersistenceId(uint64_t id) const = 0;
     /**
      * Returns any identifier the store may have attached to this
      * object

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableMessage.h?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableMessage.h Thu Apr  5 04:48:05 2007
@@ -36,6 +36,7 @@
 {
 public:
     typedef boost::shared_ptr<RecoverableMessage> shared_ptr;
+    virtual void setPersistenceId(uint64_t id) = 0;
     /**
      * Used by store to determine whether to load content on recovery
      * or let message load its own content as and when it requires it.

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableQueue.h?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/RecoverableQueue.h Thu Apr  5 04:48:05 2007
@@ -36,6 +36,8 @@
 {
 public:
     typedef boost::shared_ptr<RecoverableQueue> shared_ptr;
+
+    virtual void setPersistenceId(uint64_t id) = 0;
     /**
      * Used during recovery to add stored messages back to the queue
      */

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/RecoveryManagerImpl.cpp?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/RecoveryManagerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/RecoveryManagerImpl.cpp Thu Apr  5 04:48:05 2007
@@ -45,6 +45,7 @@
     RecoverableMessageImpl(Message::shared_ptr& _msg, uint64_t _stagingThreshold) 
         : msg(_msg), stagingThreshold(_stagingThreshold) {}
     ~RecoverableMessageImpl() {};
+    void setPersistenceId(uint64_t id);
     bool loadContent(uint64_t available);
     void decodeContent(framing::Buffer& buffer);
     void recover(Queue::shared_ptr queue);
@@ -56,6 +57,7 @@
 public:
     RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {}
     ~RecoverableQueueImpl() {};
+    void setPersistenceId(uint64_t id);
     void recover(RecoverableMessage::shared_ptr msg);
 };
 
@@ -125,7 +127,17 @@
     queue->recover(msg);
 }
 
+void RecoverableMessageImpl::setPersistenceId(uint64_t id)
+{
+    msg->setPersistenceId(id);
+}
+
 void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
 {
     dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
+}
+
+void RecoverableQueueImpl::setPersistenceId(uint64_t id)
+{
+    queue->setPersistenceId(id);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.cpp?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.cpp Thu Apr  5 04:48:05 2007
@@ -20,7 +20,8 @@
  */
 #include "TxBuffer.h"
 
-using std::mem_fun;
+#include <boost/mem_fn.hpp>
+using boost::mem_fn;
 using namespace qpid::broker;
 
 bool TxBuffer::prepare(TransactionalStore* const store)
@@ -39,17 +40,17 @@
 
 void TxBuffer::commit()
 {
-    for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
+    for_each(ops.begin(), ops.end(), mem_fn(&TxOp::commit));
     ops.clear();
 }
 
 void TxBuffer::rollback()
 {
-    for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
+    for_each(ops.begin(), ops.end(), mem_fn(&TxOp::rollback));
     ops.clear();
 }
 
-void TxBuffer::enlist(TxOp* const op)
+void TxBuffer::enlist(TxOp::shared_ptr op)
 {
     ops.push_back(op);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.h?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/TxBuffer.h Thu Apr  5 04:48:05 2007
@@ -59,8 +59,8 @@
 namespace qpid {
     namespace broker {
         class TxBuffer{
-            typedef std::vector<TxOp*>::iterator op_iterator;
-            std::vector<TxOp*> ops;
+            typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
+            std::vector<TxOp::shared_ptr> ops;
         public:
             /**
              * Requests that all ops are prepared. This should
@@ -98,7 +98,7 @@
             /**
              * Adds an operation to the transaction.
              */
-            void enlist(TxOp* const op);
+            void enlist(TxOp::shared_ptr op);
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/broker/TxOp.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/broker/TxOp.h?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/broker/TxOp.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/broker/TxOp.h Thu Apr  5 04:48:05 2007
@@ -22,11 +22,14 @@
 #define _TxOp_
 
 #include "TransactionalStore.h"
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
     namespace broker {
         class TxOp{
         public:
+            typedef boost::shared_ptr<TxOp> shared_ptr;
+
             virtual bool prepare(TransactionContext*) throw() = 0;
             virtual void commit()  throw() = 0;
             virtual void rollback()  throw() = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FieldTableTest.cpp Thu Apr  5 04:48:05 2007
@@ -28,6 +28,7 @@
 {
     CPPUNIT_TEST_SUITE(FieldTableTest);
     CPPUNIT_TEST(testMe);
+    CPPUNIT_TEST(testAssignment);
     CPPUNIT_TEST_SUITE_END();
 
   public:
@@ -45,6 +46,38 @@
         buffer.getFieldTable(ft2);
         CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft2.getString("A"));
 
+    }
+
+    void testAssignment()
+    {
+        FieldTable a;
+        FieldTable b;
+
+        a.setString("A", "BBBB");
+        a.setInt("B", 1234);
+        b = a;
+        a.setString("A", "CCCC");
+        
+        CPPUNIT_ASSERT_EQUAL(std::string("CCCC"), a.getString("A"));
+        CPPUNIT_ASSERT_EQUAL(std::string("BBBB"), b.getString("A"));
+        CPPUNIT_ASSERT_EQUAL(1234, a.getInt("B"));
+        CPPUNIT_ASSERT_EQUAL(1234, b.getInt("B"));
+
+        FieldTable d;
+        {
+            FieldTable c;
+            c = a;
+            
+            Buffer buffer(c.size());
+            buffer.putFieldTable(c);
+            buffer.flip();     
+            buffer.getFieldTable(d);
+            CPPUNIT_ASSERT_EQUAL(c, d);
+            CPPUNIT_ASSERT_EQUAL(std::string("CCCC"), c.getString("A"));
+            CPPUNIT_ASSERT_EQUAL(1234, c.getInt("B"));
+        }
+        CPPUNIT_ASSERT_EQUAL(std::string("CCCC"), d.getString("A"));
+        CPPUNIT_ASSERT_EQUAL(1234, d.getInt("B"));
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp?view=diff&rev=525801&r1=525800&r2=525801
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp Thu Apr  5 04:48:05 2007
@@ -24,6 +24,7 @@
 #include <vector>
 
 using namespace qpid::broker;
+using boost::static_pointer_cast;
 
 template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){
     unsigned int i = 0;
@@ -43,6 +44,8 @@
         std::vector<int> actual;
         bool failOnPrepare;
     public:
+        typedef boost::shared_ptr<MockTxOp> shared_ptr;
+
         MockTxOp() : failOnPrepare(false) {}
         MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
 
@@ -166,100 +169,100 @@
         MockTransactionalStore store;
         store.expectBegin().expectCommit();
 
-        MockTxOp opA;
-        opA.expectPrepare().expectCommit();
-        MockTxOp opB;
-        opB.expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order
-        MockTxOp opC;
-        opC.expectPrepare().expectCommit();
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectPrepare().expectCommit();
+        MockTxOp::shared_ptr opB(new MockTxOp());
+        opB->expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order
+        MockTxOp::shared_ptr opC(new MockTxOp());
+        opC->expectPrepare().expectCommit();
 
         TxBuffer buffer;
-        buffer.enlist(&opA);
-        buffer.enlist(&opB);
-        buffer.enlist(&opB);//opB enlisted twice
-        buffer.enlist(&opC);
+        buffer.enlist(static_pointer_cast<TxOp>(opA));
+        buffer.enlist(static_pointer_cast<TxOp>(opB));
+        buffer.enlist(static_pointer_cast<TxOp>(opB));//opB enlisted twice
+        buffer.enlist(static_pointer_cast<TxOp>(opC));
 
         CPPUNIT_ASSERT(buffer.prepare(&store));
         buffer.commit();
         store.check();
         CPPUNIT_ASSERT(store.isCommitted());
-        opA.check();
-        opB.check();
-        opC.check();
+        opA->check();
+        opB->check();
+        opC->check();
     }
 
     void testFailOnPrepare(){
         MockTransactionalStore store;
         store.expectBegin().expectAbort();
 
-        MockTxOp opA;
-        opA.expectPrepare();
-        MockTxOp opB(true);
-        opB.expectPrepare();
-        MockTxOp opC;//will never get prepare as b will fail
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectPrepare();
+        MockTxOp::shared_ptr opB(new MockTxOp(true));
+        opB->expectPrepare();
+        MockTxOp::shared_ptr opC(new MockTxOp());//will never get prepare as b will fail
 
         TxBuffer buffer;
-        buffer.enlist(&opA);
-        buffer.enlist(&opB);
-        buffer.enlist(&opC);
+        buffer.enlist(static_pointer_cast<TxOp>(opA));
+        buffer.enlist(static_pointer_cast<TxOp>(opB));
+        buffer.enlist(static_pointer_cast<TxOp>(opC));
 
         CPPUNIT_ASSERT(!buffer.prepare(&store));
         store.check();
         CPPUNIT_ASSERT(store.isAborted());
-        opA.check();
-        opB.check();
-        opC.check();
+        opA->check();
+        opB->check();
+        opC->check();
     }
 
     void testRollback(){
-        MockTxOp opA;
-        opA.expectRollback();
-        MockTxOp opB(true);
-        opB.expectRollback();
-        MockTxOp opC;
-        opC.expectRollback();
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectRollback();
+        MockTxOp::shared_ptr opB(new MockTxOp(true));
+        opB->expectRollback();
+        MockTxOp::shared_ptr opC(new MockTxOp());
+        opC->expectRollback();
 
         TxBuffer buffer;
-        buffer.enlist(&opA);
-        buffer.enlist(&opB);
-        buffer.enlist(&opC);
+        buffer.enlist(static_pointer_cast<TxOp>(opA));
+        buffer.enlist(static_pointer_cast<TxOp>(opB));
+        buffer.enlist(static_pointer_cast<TxOp>(opC));
 
         buffer.rollback();
-        opA.check();
-        opB.check();
-        opC.check();
+        opA->check();
+        opB->check();
+        opC->check();
     }
 
     void testBufferIsClearedAfterRollback(){
-        MockTxOp opA;
-        opA.expectRollback();
-        MockTxOp opB;
-        opB.expectRollback();
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectRollback();
+        MockTxOp::shared_ptr opB(new MockTxOp());
+        opB->expectRollback();
 
         TxBuffer buffer;
-        buffer.enlist(&opA);
-        buffer.enlist(&opB);
+        buffer.enlist(static_pointer_cast<TxOp>(opA));
+        buffer.enlist(static_pointer_cast<TxOp>(opB));
 
         buffer.rollback();
         buffer.commit();//second call should not reach ops
-        opA.check();
-        opB.check();
+        opA->check();
+        opB->check();
     }
 
     void testBufferIsClearedAfterCommit(){
-        MockTxOp opA;
-        opA.expectCommit();
-        MockTxOp opB;
-        opB.expectCommit();
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectCommit();
+        MockTxOp::shared_ptr opB(new MockTxOp());
+        opB->expectCommit();
 
         TxBuffer buffer;
-        buffer.enlist(&opA);
-        buffer.enlist(&opB);
+        buffer.enlist(static_pointer_cast<TxOp>(opA));
+        buffer.enlist(static_pointer_cast<TxOp>(opB));
 
         buffer.commit();
         buffer.rollback();//second call should not reach ops
-        opA.check();
-        opB.check();
+        opA->check();
+        opB->check();
     }
 };