You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/04/19 19:56:23 UTC

svn commit: r530500 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ tests/

Author: gsim
Date: Thu Apr 19 10:56:21 2007
New Revision: 530500

URL: http://svn.apache.org/viewvc?view=rev&rev=530500
Log:
Some dtx related updates.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
    incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Apr 19 10:56:21 2007
@@ -112,6 +112,10 @@
   qpid/broker/DeliverableMessage.cpp \
   qpid/broker/DeliveryRecord.cpp \
   qpid/broker/DirectExchange.cpp \
+  qpid/broker/DtxBuffer.cpp \
+  qpid/broker/DtxHandlerImpl.cpp \
+  qpid/broker/DtxManager.cpp \
+  qpid/broker/DtxWorkRecord.cpp \
   qpid/broker/ExchangeRegistry.cpp \
   qpid/broker/FanOutExchange.cpp \
   qpid/broker/HeadersExchange.cpp \
@@ -161,6 +165,10 @@
   qpid/broker/Deliverable.h \
   qpid/broker/DeliverableMessage.h \
   qpid/broker/DirectExchange.h \
+  qpid/broker/DtxBuffer.h \
+  qpid/broker/DtxHandlerImpl.h \
+  qpid/broker/DtxManager.h \
+  qpid/broker/DtxWorkRecord.h \
   qpid/broker/ExchangeRegistry.h \
   qpid/broker/FanOutExchange.h \
   qpid/broker/HandlerImpl.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Apr 19 10:56:21 2007
@@ -53,7 +53,8 @@
     timeout(30000),
     stagingThreshold(0),
     cleaner(&queues, timeout/10),
-    factory(*this)
+    factory(*this),
+    dtxManager(store.get())
 {
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
     exchanges.declare(amq_direct, DirectExchange::typeName);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Apr 19 10:56:21 2007
@@ -32,6 +32,7 @@
 #include "ExchangeRegistry.h"
 #include "ConnectionToken.h"
 #include "DirectExchange.h"
+#include "DtxManager.h"
 #include "qpid/framing/OutputHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "QueueRegistry.h"
@@ -83,6 +84,7 @@
     uint32_t getTimeout() { return timeout; }
     uint64_t getStagingThreshold() { return stagingThreshold; }
     AutoDelete& getCleaner() { return cleaner; }
+    DtxManager& getDtxManager() { return dtxManager; }
     
   private:
     Broker(const Configuration& config);
@@ -97,6 +99,7 @@
     uint64_t stagingThreshold;
     AutoDelete cleaner;
     ConnectionFactory factory;
+    DtxManager dtxManager;
 
     static MessageStore* createStore(const Configuration& config);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Thu Apr 19 10:56:21 2007
@@ -42,7 +42,8 @@
     exchangeHandler(*this),
     messageHandler(*this),
     queueHandler(*this),
-    txHandler(*this)    
+    txHandler(*this),
+    dtxHandler(*this)
 {}
 
 
@@ -336,7 +337,7 @@
 } 
 
 void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
-    channel.begin();
+    channel.startTx();
     client.selectOk(context.getRequestId());
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Thu Apr 19 10:56:21 2007
@@ -21,6 +21,7 @@
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "HandlerImpl.h"
 #include "MessageHandlerImpl.h"
+#include "DtxHandlerImpl.h"
 #include "qpid/Exception.h"
 
 namespace qpid {
@@ -76,6 +77,9 @@
     TunnelHandler* getTunnelHandler() {
         throw ConnectionException(540, "Tunnel class not implemented"); }
 
+    DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
+    DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
+
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
 
   private:
@@ -213,7 +217,7 @@
     MessageHandlerImpl messageHandler;
     QueueHandlerImpl queueHandler;
     TxHandlerImpl txHandler;
-        
+    DtxHandlerImpl dtxHandler;        
 };
 }} // namespace qpid::broker
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Thu Apr 19 10:56:21 2007
@@ -54,7 +54,6 @@
     ChannelAdapter(id, &con.getOutput(), con.getVersion()),
     connection(con),
     currentDeliveryTag(1),
-    transactional(false),
     prefetchSize(0),
     prefetchCount(0),
     framesize(_framesize),
@@ -104,24 +103,38 @@
     recover(true);
 }
 
-void Channel::begin(){
-    transactional = true;
+void Channel::startTx(){
+    txBuffer = TxBuffer::shared_ptr(new TxBuffer());
 }
 
 void Channel::commit(){
     TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
-    txBuffer.enlist(txAck);
-    if(txBuffer.prepare(store)){
-        txBuffer.commit();
+    txBuffer->enlist(txAck);
+    if (txBuffer->commitLocal(store)) {
+        accumulatedAck.clear();
     }
-    accumulatedAck.clear();
 }
 
 void Channel::rollback(){
-    txBuffer.rollback();
+    txBuffer->rollback();
     accumulatedAck.clear();
 }
 
+void Channel::startDtx(const std::string& xid, DtxManager& mgr){
+    dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer());
+    txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
+    mgr.start(xid, dtxBuffer);
+}
+
+void Channel::endDtx(){
+    TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+    dtxBuffer->enlist(txAck);    
+    dtxBuffer->markEnded();
+    
+    dtxBuffer.reset();
+    txBuffer.reset();
+}
+
 void Channel::deliver(
     Message::shared_ptr& msg, const string& consumerTag,
     Queue::shared_ptr& queue, bool ackExpected)
@@ -180,23 +193,8 @@
         queue->dispatch();
 }
 
-void Channel::handleInlineTransfer(Message::shared_ptr msg)
-{
-    Exchange::shared_ptr exchange =
-        connection.broker.getExchanges().get(msg->getExchange());
-    if(transactional){
-        TxPublish* deliverable(new TxPublish(msg));
-        TxOp::shared_ptr op(deliverable);
-        exchange->route(
-            *deliverable, msg->getRoutingKey(),
-            &(msg->getApplicationHeaders()));
-        txBuffer.enlist(op);
-    }else{
-        DeliverableMessage deliverable(msg);
-        exchange->route(
-            deliverable, msg->getRoutingKey(),
-            &(msg->getApplicationHeaders()));
-    }
+void Channel::handleInlineTransfer(Message::shared_ptr msg){
+    complete(msg);
 }
 
 void Channel::handlePublish(Message* _message){
@@ -222,12 +220,12 @@
     Exchange::shared_ptr exchange =
         connection.broker.getExchanges().get(msg->getExchange());
     assert(exchange.get());
-    if(transactional) {
+    if (txBuffer) {
         TxPublish* deliverable(new TxPublish(msg));
         TxOp::shared_ptr op(deliverable);
         exchange->route(*deliverable, msg->getRoutingKey(),
                         &(msg->getApplicationHeaders()));
-        txBuffer.enlist(op);
+        txBuffer->enlist(op);
     } else {
         DeliverableMessage deliverable(msg);
         exchange->route(deliverable, msg->getRoutingKey(),
@@ -236,24 +234,24 @@
 }
 
 void Channel::ack(){
-	ack(getFirstAckRequest(), getLastAckRequest());
+    ack(getFirstAckRequest(), getLastAckRequest());
 }
 
 // Used by Basic
 void Channel::ack(uint64_t deliveryTag, bool multiple){
-	if (multiple)
-		ack(0, deliveryTag);
-	else
-		ack(deliveryTag, deliveryTag);
+    if (multiple)
+        ack(0, deliveryTag);
+    else
+        ack(deliveryTag, deliveryTag);
 }
 
 void Channel::ack(uint64_t firstTag, uint64_t lastTag){
-    if(transactional){
+    if (txBuffer) {
         accumulatedAck.update(firstTag, lastTag);
 
         //TODO: I think the outstanding prefetch size & count should be updated at this point...
         //TODO: ...this may then necessitate dispatching to consumers
-    }else{
+    } else {
         Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
     
         ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Thu Apr 19 10:56:21 2007
@@ -31,6 +31,8 @@
 #include "AccumulatedAck.h"
 #include "Consumer.h"
 #include "DeliveryRecord.h"
+#include "DtxBuffer.h"
+#include "DtxManager.h"
 #include "MessageBuilder.h"
 #include "NameGenerator.h"
 #include "Prefetch.h"
@@ -80,7 +82,6 @@
     Connection& connection;
     uint64_t currentDeliveryTag;
     Queue::shared_ptr defaultQueue;
-    bool transactional;
     ConsumerImplMap consumers;
     uint32_t prefetchSize;    
     uint16_t prefetchCount;    
@@ -89,7 +90,8 @@
     NameGenerator tagGenerator;
     std::list<DeliveryRecord> unacked;
     sys::Mutex deliveryLock;
-    TxBuffer txBuffer;
+    TxBuffer::shared_ptr txBuffer;
+    DtxBuffer::shared_ptr dtxBuffer;
     AccumulatedAck accumulatedAck;
     MessageStore* const store;
     MessageBuilder messageBuilder;//builder for in-progress message
@@ -113,7 +115,7 @@
     ~Channel();
 
     bool isOpen() const { return opened; }
-    BrokerAdapter& getAdatper() { return *adapter; }
+    BrokerAdapter& getAdapter() { return *adapter; }
     
     void open() { opened = true; }
     void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
@@ -131,10 +133,12 @@
                  const framing::FieldTable* = 0);
     void cancel(const string& tag);
     bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
-    void begin();
     void close();
+    void startTx();
     void commit();
     void rollback();
+    void startDtx(const std::string& xid, DtxManager& mgr);
+    void endDtx();
     void ack();
     void ack(uint64_t deliveryTag, bool multiple);
     void ack(uint64_t deliveryTag, uint64_t endTag);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Apr 19 10:56:21 2007
@@ -82,7 +82,7 @@
     string mechanisms("PLAIN");
     string locales("en_US");
     getChannel(0).init(0, *out, getVersion());
-    client = &getChannel(0).getAdatper().getProxy().getConnection();
+    client = &getChannel(0).getAdapter().getProxy().getConnection();
     client->start(
         header.getMajor(), header.getMinor(),
         properties, mechanisms, locales);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Thu Apr 19 10:56:21 2007
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxBuffer.h"
+
+using namespace qpid::broker;
+using qpid::sys::Mutex;
+
+DtxBuffer::DtxBuffer() : ended(false) {}
+
+DtxBuffer::~DtxBuffer() {}
+
+void DtxBuffer::markEnded() 
+{ 
+    Mutex::ScopedLock locker(lock); 
+    ended = true; 
+}
+
+bool DtxBuffer::isEnded() 
+{ 
+    Mutex::ScopedLock locker(lock); 
+    return ended; 
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h Thu Apr 19 10:56:21 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxBuffer_
+#define _DtxBuffer_
+
+#include "TxBuffer.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+    namespace broker {
+        class DtxBuffer : public TxBuffer{
+            sys::Mutex lock;
+            bool ended;
+        public:
+            typedef boost::shared_ptr<DtxBuffer> shared_ptr;
+
+            DtxBuffer();
+            ~DtxBuffer();
+            void markEnded();
+            bool isEnded();
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Thu Apr 19 10:56:21 2007
@@ -0,0 +1,124 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "DtxHandlerImpl.h"
+
+#include "Broker.h"
+#include "BrokerChannel.h"
+
+using namespace qpid::broker;
+using qpid::framing::FieldTable;
+using qpid::framing::MethodContext;
+using std::string;
+
+DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {}
+
+
+// DtxDemarcationHandler:
+
+void DtxHandlerImpl::end(const MethodContext& /*context*/,
+                         u_int16_t /*ticket*/,
+                         const string& /*xid*/,
+                         bool /*fail*/,
+                         bool /*suspend*/ )
+{
+    channel.endDtx();
+    //send end-ok
+    //TODO: handle fail and suspend
+    //TODO: check xid is as expected?
+}
+
+
+void DtxHandlerImpl::select(const MethodContext& /*context*/ )
+{
+    //don't need to do anything here really
+    //send select-ok
+}
+
+
+void DtxHandlerImpl::start(const MethodContext& /*context*/,
+                           u_int16_t /*ticket*/,
+                           const string& xid,
+                           bool /*join*/,
+                           bool /*resume*/ )
+{
+    channel.startDtx(xid, broker.getDtxManager());
+    //send start-ok
+    //TODO: handle join and resume
+}
+
+// DtxCoordinationHandler:
+
+void DtxHandlerImpl::prepare(const MethodContext& /*context*/,
+                             u_int16_t /*ticket*/,
+                             const string& xid )
+{
+    broker.getDtxManager().prepare(xid);
+    //send prepare-ok
+}
+
+void DtxHandlerImpl::commit(const MethodContext& /*context*/,
+                            u_int16_t /*ticket*/,
+                            const string& xid,
+                            bool /*onePhase*/ )
+{
+    broker.getDtxManager().commit(xid);
+    //send commit-ok
+    //TODO use onePhase flag to validate correct sequence
+}
+
+
+void DtxHandlerImpl::rollback(const MethodContext& /*context*/,
+                              u_int16_t /*ticket*/,
+                              const string& xid )
+{
+    broker.getDtxManager().rollback(xid);
+    //send rollback-ok
+}
+
+void DtxHandlerImpl::recover(const MethodContext& /*context*/,
+                             u_int16_t /*ticket*/,
+                             bool /*startscan*/,
+                             u_int32_t /*endscan*/ )
+{
+    //TODO
+}
+
+void DtxHandlerImpl::forget(const MethodContext& /*context*/,
+                            u_int16_t /*ticket*/,
+                            const string& /*xid*/ )
+{
+    //TODO
+}
+
+void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/,
+                                const string& /*xid*/ )
+{
+    //TODO
+}
+
+
+void DtxHandlerImpl::setTimeout(const MethodContext& /*context*/,
+                                u_int16_t /*ticket*/,
+                                const string& /*xid*/,
+                                u_int32_t /*timeout*/ )
+{
+    //TODO
+}
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h Thu Apr 19 10:56:21 2007
@@ -0,0 +1,98 @@
+#ifndef _broker_DtxHandlerImpl_h
+#define _broker_DtxHandlerImpl_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "HandlerImpl.h"
+
+namespace qpid {
+namespace broker {
+
+/* dummy interfaces until real ones are generated from published spec */
+class DtxCoordinationHandler{};
+class DtxDemarcationHandler{};
+
+class DtxHandlerImpl 
+    : public CoreRefs,
+      public /*framing::AMQP_ServerOperations::*/DtxCoordinationHandler,
+      public /*framing::AMQP_ServerOperations::*/DtxDemarcationHandler
+{    
+public:
+    DtxHandlerImpl(CoreRefs& parent);
+
+    // DtxCoordinationHandler:
+
+    void commit(const framing::MethodContext& context,
+                        u_int16_t ticket,
+                        const std::string& xid,
+                        bool onePhase );
+
+    void forget(const framing::MethodContext& context,
+                        u_int16_t ticket,
+                        const std::string& xid );
+
+    void getTimeout(const framing::MethodContext& context,
+                            const std::string& xid );
+
+    void prepare(const framing::MethodContext& context,
+                         u_int16_t ticket,
+                         const std::string& xid );
+
+    void recover(const framing::MethodContext& context,
+                         u_int16_t ticket,
+                         bool startscan,
+                         u_int32_t endscan );
+
+    void recoverOk(const framing::MethodContext& context,
+                           const framing::FieldTable& xids );
+
+    void rollback(const framing::MethodContext& context,
+                          u_int16_t ticket,
+                          const std::string& xid );
+
+    void setTimeout(const framing::MethodContext& context,
+                            u_int16_t ticket,
+                            const std::string& xid,
+                            u_int32_t timeout );
+
+    // DtxDemarcationHandler:
+
+    void end(const framing::MethodContext& context,
+                     u_int16_t ticket,
+                     const std::string& xid,
+                     bool fail,
+                     bool suspend );
+
+    void select(const framing::MethodContext& context );
+
+    void start(const framing::MethodContext& context,
+                       u_int16_t ticket,
+                       const std::string& xid,
+                       bool join,
+                       bool resume );
+};
+
+
+}} // namespace qpid::broker
+
+
+
+#endif  /*!_broker_DtxHandlerImpl_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Thu Apr 19 10:56:21 2007
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxManager.h"
+#include <boost/format.hpp>
+
+using namespace qpid::broker;
+
+DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {}
+
+DtxManager::~DtxManager() {}
+
+void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops)
+{
+    WorkMap::iterator i = work.find(xid);
+    if (i == work.end()) {
+        i = work.insert(xid, new DtxWorkRecord(xid, store)).first;
+    }
+    i->add(ops);
+}
+
+void DtxManager::prepare(const std::string& xid) 
+{ 
+    getWork(xid)->prepare();
+}
+
+void DtxManager::commit(const std::string& xid) 
+{ 
+    getWork(xid)->commit();
+}
+
+void DtxManager::rollback(const std::string& xid) 
+{ 
+    getWork(xid)->rollback();
+}
+
+DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
+{
+    WorkMap::iterator i = work.find(xid);
+    if (i == work.end()) throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+    return i;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Thu Apr 19 10:56:21 2007
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxManager_
+#define _DtxManager_
+
+#include <boost/ptr_container/ptr_map.hpp>
+#include "DtxBuffer.h"
+#include "DtxWorkRecord.h"
+#include "TransactionalStore.h"
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxManager{
+    typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
+
+    WorkMap work;
+    TransactionalStore* const store;
+
+    WorkMap::iterator getWork(const std::string& xid);
+
+public:
+    DtxManager(TransactionalStore* const store);
+    ~DtxManager();
+    void start(std::string xid, DtxBuffer::shared_ptr work);
+    void prepare(const std::string& xid);
+    void commit(const std::string& xid);
+    void rollback(const std::string& xid);
+};
+
+}
+}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Thu Apr 19 10:56:21 2007
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxWorkRecord.h"
+#include <boost/format.hpp>
+#include <boost/mem_fn.hpp>
+using boost::mem_fn;
+
+using namespace qpid::broker;
+
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store) {}
+
+DtxWorkRecord::~DtxWorkRecord() {}
+
+bool DtxWorkRecord::prepare()
+{
+    checkCompletion();
+    txn = store->begin(xid);
+    if (prepare(txn.get())) {
+        store->prepare(*txn);
+        return true;
+    } else {
+        abort();
+        return false;
+    }
+}
+
+bool DtxWorkRecord::prepare(TransactionContext* _txn)
+{
+    bool succeeded(true);
+    for (Work::iterator i = work.begin(); succeeded && i != work.end(); i++) {
+        succeeded = (*i)->prepare(_txn);
+    }
+    return succeeded;
+}
+
+void DtxWorkRecord::commit()
+{
+    checkCompletion();
+    if (txn.get()) {
+        //already prepared
+        store->commit(*txn);
+        txn.reset();
+
+        for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
+    } else {
+        //1pc commit optimisation, don't need a 2pc transaction context:
+        std::auto_ptr<TransactionContext> localtxn = store->begin();
+        if (prepare(localtxn.get())) {
+            store->commit(*localtxn);
+        } else {
+            store->abort(*localtxn);
+            abort();
+        }
+    }
+}
+
+void DtxWorkRecord::rollback()
+{
+    checkCompletion();
+    abort();
+}
+
+void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
+{
+    work.push_back(ops);
+}
+
+void DtxWorkRecord::checkCompletion()
+{
+    if (!completed) {
+        //iterate through all DtxBuffers and ensure they are all ended
+        for (Work::iterator i = work.begin(); i != work.end(); i++) {
+            if (!(*i)->isEnded()) {
+                throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid);
+            }
+        }
+        completed = true;
+    }
+}
+
+void DtxWorkRecord::abort()
+{
+    if (txn.get()) {
+        store->abort(*txn);
+        txn.reset();
+    }
+    for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
+
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Thu Apr 19 10:56:21 2007
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxWorkRecord_
+#define _DtxWorkRecord_
+
+#include <algorithm>
+#include <functional>
+#include <vector>
+#include "DtxBuffer.h"
+#include "TransactionalStore.h"
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxWorkRecord
+{
+    typedef std::vector<DtxBuffer::shared_ptr> Work;
+
+    const std::string xid;
+    TransactionalStore* const store;
+    bool completed;
+    Work work;
+    std::auto_ptr<TPCTransactionContext> txn;
+
+    void checkCompletion();
+    void abort();
+    bool prepare(TransactionContext* txn);
+public:
+    DtxWorkRecord(const std::string& xid, TransactionalStore* const store);
+    ~DtxWorkRecord();
+    bool prepare();
+    void commit();
+    void rollback();
+    void add(DtxBuffer::shared_ptr ops);
+};
+
+}
+}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Thu Apr 19 10:56:21 2007
@@ -24,17 +24,13 @@
 using boost::mem_fn;
 using namespace qpid::broker;
 
-bool TxBuffer::prepare(TransactionalStore* const store)
+bool TxBuffer::prepare(TransactionContext* const ctxt)
 {
-    std::auto_ptr<TransactionContext> ctxt;
-    if(store) ctxt = store->begin();
     for(op_iterator i = ops.begin(); i < ops.end(); i++){
-        if(!(*i)->prepare(ctxt.get())){
-            if(store) store->abort(*ctxt);
+        if(!(*i)->prepare(ctxt)){
             return false;
         }
     }
-    if(store) store->commit(*ctxt);
     return true;
 }
 
@@ -53,4 +49,19 @@
 void TxBuffer::enlist(TxOp::shared_ptr op)
 {
     ops.push_back(op);
+}
+
+bool TxBuffer::commitLocal(TransactionalStore* const store)
+{
+    std::auto_ptr<TransactionContext> ctxt;
+    if(store) ctxt = store->begin();
+    if (prepare(ctxt.get())) {
+        if(store) store->commit(*ctxt);
+        commit();
+        return true;
+    } else {
+        if(store) store->abort(*ctxt);
+        rollback();
+        return false;
+    }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Thu Apr 19 10:56:21 2007
@@ -61,44 +61,52 @@
         class TxBuffer{
             typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
             std::vector<TxOp::shared_ptr> ops;
+        protected:
+
         public:
+            typedef boost::shared_ptr<TxBuffer> shared_ptr;
+            /**
+             * Adds an operation to the transaction.
+             */
+            void enlist(TxOp::shared_ptr op);
+
             /**
              * Requests that all ops are prepared. This should
              * primarily involve making sure that a persistent record
              * of the operations is stored where necessary.
-             * 
-             * All ops will be prepared under a transaction on the
-             * specified store. If any operation fails on prepare,
-             * this transaction will be rolled back. 
-             * 
+             *
              * Once prepared, a transaction can be committed (or in
              * the 2pc case, rolled back).
              * 
              * @returns true if all the operations prepared
              * successfully, false if not.
              */
-            bool prepare(TransactionalStore* const store);
+            bool prepare(TransactionContext* const ctxt);
+
             /**
-             * Signals that the ops all prepared all completed
-             * successfully and can now commit, i.e. the operation can
-             * now be fully carried out.
+             * Signals that the ops all prepared successfully and can
+             * now commit, i.e. the operation can now be fully carried
+             * out.
              * 
              * Should only be called after a call to prepare() returns
              * true.
              */
             void commit();
+
             /**
-             * Rolls back all the operations.
+             * Signals that all ops can be rolled back.
              * 
              * Should only be called either after a call to prepare()
              * returns true (2pc) or instead of a prepare call
              * ('server-local')
              */
             void rollback();
+
             /**
-             * Adds an operation to the transaction.
+             * Helper method for managing the process of server local
+             * commit
              */
-            void enlist(TxOp::shared_ptr op);
+            bool commitLocal(TransactionalStore* const store);
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp Thu Apr 19 10:56:21 2007
@@ -161,7 +161,9 @@
     };
 
     CPPUNIT_TEST_SUITE(TxBufferTest);
-    CPPUNIT_TEST(testPrepareAndCommit);
+    CPPUNIT_TEST(testCommitLocal);
+    CPPUNIT_TEST(testFailOnCommitLocal);
+    CPPUNIT_TEST(testPrepare);
     CPPUNIT_TEST(testFailOnPrepare);
     CPPUNIT_TEST(testRollback);
     CPPUNIT_TEST(testBufferIsClearedAfterRollback);
@@ -170,14 +172,14 @@
 
   public:
 
-    void testPrepareAndCommit(){
+    void testCommitLocal(){
         MockTransactionalStore store;
         store.expectBegin().expectCommit();
 
         MockTxOp::shared_ptr opA(new MockTxOp());
         opA->expectPrepare().expectCommit();
         MockTxOp::shared_ptr opB(new MockTxOp());
-        opB->expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order
+        opB->expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test relative order
         MockTxOp::shared_ptr opC(new MockTxOp());
         opC->expectPrepare().expectCommit();
 
@@ -187,8 +189,7 @@
         buffer.enlist(static_pointer_cast<TxOp>(opB));//opB enlisted twice
         buffer.enlist(static_pointer_cast<TxOp>(opC));
 
-        CPPUNIT_ASSERT(buffer.prepare(&store));
-        buffer.commit();
+        CPPUNIT_ASSERT(buffer.commitLocal(&store));
         store.check();
         CPPUNIT_ASSERT(store.isCommitted());
         opA->check();
@@ -196,11 +197,51 @@
         opC->check();
     }
 
-    void testFailOnPrepare(){
+    void testFailOnCommitLocal(){
         MockTransactionalStore store;
         store.expectBegin().expectAbort();
 
         MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectPrepare().expectRollback();
+        MockTxOp::shared_ptr opB(new MockTxOp(true));
+        opB->expectPrepare().expectRollback();
+        MockTxOp::shared_ptr opC(new MockTxOp());//will never get prepare as b will fail
+        opC->expectRollback();
+
+        TxBuffer buffer;
+        buffer.enlist(static_pointer_cast<TxOp>(opA));
+        buffer.enlist(static_pointer_cast<TxOp>(opB));
+        buffer.enlist(static_pointer_cast<TxOp>(opC));
+
+        CPPUNIT_ASSERT(!buffer.commitLocal(&store));
+        CPPUNIT_ASSERT(store.isAborted());
+        store.check();
+        opA->check();
+        opB->check();
+        opC->check();
+    }
+
+    void testPrepare(){
+        MockTxOp::shared_ptr opA(new MockTxOp());
+        opA->expectPrepare();
+        MockTxOp::shared_ptr opB(new MockTxOp());
+        opB->expectPrepare();
+        MockTxOp::shared_ptr opC(new MockTxOp());
+        opC->expectPrepare();
+
+        TxBuffer buffer;
+        buffer.enlist(static_pointer_cast<TxOp>(opA));
+        buffer.enlist(static_pointer_cast<TxOp>(opB));
+        buffer.enlist(static_pointer_cast<TxOp>(opC));
+
+        CPPUNIT_ASSERT(buffer.prepare(0));
+        opA->check();
+        opB->check();
+        opC->check();
+    }
+
+    void testFailOnPrepare(){
+        MockTxOp::shared_ptr opA(new MockTxOp());
         opA->expectPrepare();
         MockTxOp::shared_ptr opB(new MockTxOp(true));
         opB->expectPrepare();
@@ -211,9 +252,7 @@
         buffer.enlist(static_pointer_cast<TxOp>(opB));
         buffer.enlist(static_pointer_cast<TxOp>(opC));
 
-        CPPUNIT_ASSERT(!buffer.prepare(&store));
-        store.check();
-        CPPUNIT_ASSERT(store.isAborted());
+        CPPUNIT_ASSERT(!buffer.prepare(0));
         opA->check();
         opB->check();
         opC->check();