You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/07/31 15:35:56 UTC

svn commit: r1367535 - in /qpid/branches/asyncstore/cpp/src: qpid/asyncStore/ qpid/broker/ tests/storePerftools/asyncPerf/

Author: kpvdr
Date: Tue Jul 31 13:35:53 2012
New Revision: 1367535

URL: http://svn.apache.org/viewvc?rev=1367535&view=rev
Log:
QPID-3858: WIP: Durable transactions fixed

Modified:
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
    qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/EventHandle.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.cpp
    qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.h
    qpid/branches/asyncstore/cpp/src/qpid/broker/TxnOp.h
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
    qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp Tue Jul 31 13:35:53 2012
@@ -81,16 +81,18 @@ AsyncStoreImpl::createTxnHandle(qpid::br
 }
 
 qpid::broker::TxnHandle
-AsyncStoreImpl::createTxnHandle(const std::string& xid)
+AsyncStoreImpl::createTxnHandle(const std::string& xid,
+                                const bool tpcFlag)
 {
-    return qpid::broker::TxnHandle(new TxnHandleImpl(xid));
+    return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag));
 }
 
 qpid::broker::TxnHandle
 AsyncStoreImpl::createTxnHandle(const std::string& xid,
+                                const bool tpcFlag,
                                 qpid::broker::TxnBuffer* tb)
 {
-    return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tb));
+    return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag, tb));
 }
 
 void

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/AsyncStoreImpl.h Tue Jul 31 13:35:53 2012
@@ -60,8 +60,10 @@ public:
 
     qpid::broker::TxnHandle createTxnHandle();
     qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb);
-    qpid::broker::TxnHandle createTxnHandle(const std::string& xid);
     qpid::broker::TxnHandle createTxnHandle(const std::string& xid,
+                                            const bool tpcFlag);
+    qpid::broker::TxnHandle createTxnHandle(const std::string& xid,
+                                            const bool tpcFlag,
                                             qpid::broker::TxnBuffer* tb);
 
     void submitPrepare(qpid::broker::TxnHandle& txnHandle,

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp Tue Jul 31 13:35:53 2012
@@ -23,53 +23,32 @@
 
 #include "TxnHandleImpl.h"
 
-#include "qpid/Exception.h"
-#include "qpid/broker/TxnBuffer.h"
-#include "qpid/log/Statement.h"
-
-#include <uuid/uuid.h>
-
 namespace qpid {
 namespace asyncStore {
 
 TxnHandleImpl::TxnHandleImpl() :
         m_tpcFlag(false),
-        m_asyncOpCnt(0UL),
         m_txnBuffer(0)
-{
-    createLocalXid();
-}
+{}
 
 TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) :
         m_tpcFlag(false),
-        m_asyncOpCnt(0UL),
         m_txnBuffer(tb)
-{
-    createLocalXid();
-}
+{}
 
-TxnHandleImpl::TxnHandleImpl(const std::string& xid) :
+TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag) :
         m_xid(xid),
-        m_tpcFlag(!xid.empty()),
-        m_asyncOpCnt(0UL),
+        m_tpcFlag(tpcFlag),
         m_txnBuffer(0)
-{
-    if (m_xid.empty()) {
-        createLocalXid();
-    }
-}
+{}
 
 TxnHandleImpl::TxnHandleImpl(const std::string& xid,
+                             const bool tpcFlag,
                              qpid::broker::TxnBuffer* tb) :
         m_xid(xid),
-        m_tpcFlag(!xid.empty()),
-        m_asyncOpCnt(0UL),
+        m_tpcFlag(tpcFlag),
         m_txnBuffer(tb)
-{
-    if (m_xid.empty()) {
-        createLocalXid();
-    }
-}
+{}
 
 TxnHandleImpl::~TxnHandleImpl()
 {}
@@ -86,38 +65,4 @@ TxnHandleImpl::is2pc() const
     return m_tpcFlag;
 }
 
-void
-TxnHandleImpl::incrOpCnt()
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex);
-    ++m_asyncOpCnt;
-}
-
-void
-TxnHandleImpl::decrOpCnt()
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_asyncOpCntMutex);
-    if (m_asyncOpCnt == 0UL) {
-        throw qpid::Exception("Transaction async operation count underflow");
-    }
-    if (--m_asyncOpCnt == 0UL && m_txnBuffer) {
-        m_txnBuffer->asyncLocalCommit();
-    }
-}
-
-// private
-void
-TxnHandleImpl::createLocalXid()
-{
-    uuid_t uuid;
-
-    // TODO: This call might not be thread safe - Valgrind's helgrind tool emits warnings for this:
-    ::uuid_generate_random(uuid);
-
-    char uuidStr[37]; // 36-char uuid + trailing '\0'
-    ::uuid_unparse(uuid, uuidStr);
-    m_xid.assign(uuidStr);
-    QPID_LOG(debug, "Local XID created: \"" << m_xid << "\"");
-}
-
 }} // namespace qpid::asyncStore

Modified: qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/asyncStore/TxnHandleImpl.h Tue Jul 31 13:35:53 2012
@@ -43,26 +43,16 @@ class TxnHandleImpl : public virtual qpi
 public:
     TxnHandleImpl();
     TxnHandleImpl(qpid::broker::TxnBuffer* tb);
-    TxnHandleImpl(const std::string& xid);
-    TxnHandleImpl(const std::string& xid, qpid::broker::TxnBuffer* tb);
+    TxnHandleImpl(const std::string& xid, const bool tpcFlag);
+    TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::TxnBuffer* tb);
     virtual ~TxnHandleImpl();
     const std::string& getXid() const;
     bool is2pc() const;
 
-    void submitPrepare();
-    void submitCommit();
-    void submitAbort();
-
-    void incrOpCnt();
-    void decrOpCnt();
 private:
     std::string m_xid;
     bool m_tpcFlag;
-    uint32_t m_asyncOpCnt;
-    qpid::sys::Mutex m_asyncOpCntMutex;
     qpid::broker::TxnBuffer* const m_txnBuffer;
-
-    void createLocalXid();
 };
 
 }} // namespace qpid::asyncStore

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/AsyncStore.h Tue Jul 31 13:35:53 2012
@@ -36,7 +36,6 @@ class AsyncResultHandle;
 class AsyncResultQueue {
 public:
     virtual ~AsyncResultQueue() {}
-    // TODO: Remove boost::shared_ptr<> from this interface
     virtual void submit(boost::shared_ptr<AsyncResultHandle>) = 0;
 };
 
@@ -79,11 +78,12 @@ public:
 
     virtual TxnHandle createTxnHandle() = 0;
     virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0;
-    virtual TxnHandle createTxnHandle(const std::string& xid) = 0;
     virtual TxnHandle createTxnHandle(const std::string& xid,
-                                       TxnBuffer* tb) = 0;
+                                      const bool tpcFlag) = 0;
+    virtual TxnHandle createTxnHandle(const std::string& xid,
+                                      const bool tpcFlag,
+                                      TxnBuffer* tb) = 0;
 
-    // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface
     virtual void submitPrepare(TxnHandle&,
                                boost::shared_ptr<TpcTxnAsyncContext>) = 0; // Distributed txns only
     virtual void submitCommit(TxnHandle&,
@@ -112,8 +112,6 @@ public:
 
     // --- Store async interface ---
 
-    // TODO: Remove boost::shared_ptr<BrokerAsyncContext> from this interface
-
     // TODO: Switch from BrokerAsyncContext (parent class) to ConfigAsyncContext
     // when theses features (and async context classes) are developed.
     virtual void submitCreate(ConfigHandle&,

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.cpp Tue Jul 31 13:35:53 2012
@@ -55,4 +55,6 @@ ConfigHandle::operator=(const ConfigHand
     return PrivateImpl::assign(*this, r);
 }
 
+// --- ConfigHandleImpl methods ---
+
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/ConfigHandle.h Tue Jul 31 13:35:53 2012
@@ -43,7 +43,7 @@ public:
     ~ConfigHandle();
     ConfigHandle& operator=(const ConfigHandle& r);
 
-    // ConfigHandleImpl methods
+    // --- ConfigHandleImpl methods ---
     // <none>
 
 private:

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.cpp Tue Jul 31 13:35:53 2012
@@ -55,4 +55,6 @@ EnqueueHandle::operator=(const EnqueueHa
     return PrivateImpl::assign(*this, r);
 }
 
+// --- EnqueueHandleImpl methods ---
+
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/EnqueueHandle.h Tue Jul 31 13:35:53 2012
@@ -43,7 +43,7 @@ public:
     ~EnqueueHandle();
     EnqueueHandle& operator=(const EnqueueHandle& r);
 
-    // EnqueueHandleImpl methods
+    // --- EnqueueHandleImpl methods ---
     // <none>
 
 private:

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/EventHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/EventHandle.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/EventHandle.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/EventHandle.h Tue Jul 31 13:35:53 2012
@@ -45,7 +45,7 @@ public:
     ~EventHandle();
     EventHandle& operator=(const EventHandle& r);
 
-    // EventHandleImpl methods
+    // --- EventHandleImpl methods ---
     const std::string& getKey() const;
 
 private:

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.cpp Tue Jul 31 13:35:53 2012
@@ -55,4 +55,6 @@ MessageHandle::operator=(const MessageHa
     return PrivateImpl::assign(*this, r);
 }
 
+// --- MessageHandleImpl methods ---
+
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/MessageHandle.h Tue Jul 31 13:35:53 2012
@@ -43,7 +43,7 @@ public:
     ~MessageHandle();
     MessageHandle& operator=(const MessageHandle& r);
 
-    // MessageHandleImpl methods
+    // --- MessageHandleImpl methods ---
     // <none>
 
 private:

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.cpp Tue Jul 31 13:35:53 2012
@@ -29,13 +29,30 @@
 
 namespace qpid {
 namespace broker {
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+                                     AsyncResultCallback rcb,
+                                     AsyncResultQueue* const arq) :
+        m_q(q),
+        m_rcb(rcb),
+        m_arq(arq)
+{}
+
+QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+                                     boost::intrusive_ptr<PersistableMessage> msg,
+                                     AsyncResultCallback rcb,
+                                     AsyncResultQueue* const arq) :
+        m_q(q),
+        m_msg(msg),
+        m_rcb(rcb),
+        m_arq(arq)
+{}
 
 QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
-                                     TxnHandle& th,
+                                     TxnBuffer* tb,
                                      AsyncResultCallback rcb,
                                      AsyncResultQueue* const arq) :
         m_q(q),
-        m_th(th),
+        m_tb(tb),
         m_rcb(rcb),
         m_arq(arq)
 {
@@ -44,12 +61,12 @@ QueueAsyncContext::QueueAsyncContext(boo
 
 QueueAsyncContext::QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
                                      boost::intrusive_ptr<PersistableMessage> msg,
-                                     TxnHandle& th,
+                                     TxnBuffer* tb,
                                      AsyncResultCallback rcb,
                                      AsyncResultQueue* const arq) :
         m_q(q),
         m_msg(msg),
-        m_th(th),
+        m_tb(tb),
         m_rcb(rcb),
         m_arq(arq)
 {
@@ -72,10 +89,9 @@ QueueAsyncContext::getMessage() const
     return m_msg;
 }
 
-TxnHandle
-QueueAsyncContext::getTxnHandle() const
-{
-    return m_th;
+TxnBuffer*
+QueueAsyncContext::getTxnBuffer() const {
+    return m_tb;
 }
 
 AsyncResultQueue*

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueAsyncContext.h Tue Jul 31 13:35:53 2012
@@ -45,18 +45,25 @@ class QueueAsyncContext: public BrokerAs
 {
 public:
     QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
-                      TxnHandle& th,
                       AsyncResultCallback rcb,
                       AsyncResultQueue* const arq);
     QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
                       boost::intrusive_ptr<PersistableMessage> msg,
-                      TxnHandle& th,
+                      AsyncResultCallback rcb,
+                      AsyncResultQueue* const arq);
+    QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+                      TxnBuffer* tb,
+                      AsyncResultCallback rcb,
+                      AsyncResultQueue* const arq);
+    QueueAsyncContext(boost::shared_ptr<PersistableQueue> q,
+                      boost::intrusive_ptr<PersistableMessage> msg,
+                      TxnBuffer* tb,
                       AsyncResultCallback rcb,
                       AsyncResultQueue* const arq);
     virtual ~QueueAsyncContext();
     boost::shared_ptr<PersistableQueue> getQueue() const;
     boost::intrusive_ptr<PersistableMessage> getMessage() const;
-    TxnHandle getTxnHandle() const;
+    TxnBuffer* getTxnBuffer() const;
     AsyncResultQueue* getAsyncResultQueue() const;
     AsyncResultCallback getAsyncResultCallback() const;
     void invokeCallback(const AsyncResultHandle* const arh) const;
@@ -65,7 +72,7 @@ public:
 private:
     boost::shared_ptr<PersistableQueue> m_q;
     boost::intrusive_ptr<PersistableMessage> m_msg;
-    TxnHandle m_th; // TODO: get rid of this when tests::storePerftools::asyncPerf::SimpleQueue has solved its TxnHandle issues.
+    TxnBuffer* m_tb;
     AsyncResultCallback m_rcb;
     AsyncResultQueue* const m_arq;
 };

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.cpp Tue Jul 31 13:35:53 2012
@@ -56,6 +56,7 @@ QueueHandle::operator=(const QueueHandle
 }
 
 // --- QueueHandleImpl methods ---
+
 const std::string&
 QueueHandle::getName() const
 {

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/QueueHandle.h Tue Jul 31 13:35:53 2012
@@ -45,7 +45,7 @@ public:
     ~QueueHandle();
     QueueHandle& operator=(const QueueHandle& r);
 
-    // QueueHandleImpl methods
+    // --- QueueHandleImpl methods ---
     const std::string& getName() const;
 
 private:

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.cpp Tue Jul 31 13:35:53 2012
@@ -29,31 +29,86 @@
 
 #include "qpid/log/Statement.h"
 
+#include <uuid/uuid.h>
+
 namespace qpid {
 namespace broker {
 
+qpid::sys::Mutex TxnBuffer::s_uuidMutex;
+
 TxnBuffer::TxnBuffer(AsyncResultQueue& arq) :
         m_store(0),
         m_resultQueue(arq),
+        m_tpcFlag(false),
+        m_submitOpCnt(0),
+        m_completeOpCnt(0),
         m_state(NONE)
-{}
+{
+    createLocalXid();
+}
 
-TxnBuffer::~TxnBuffer()
-{}
+TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) :
+        m_store(0),
+        m_resultQueue(arq),
+        m_xid(xid),
+        m_tpcFlag(!xid.empty()),
+        m_submitOpCnt(0),
+        m_completeOpCnt(0),
+        m_state(NONE)
+{
+    if (m_xid.empty()) {
+        createLocalXid();
+    }
+}
+
+TxnBuffer::~TxnBuffer() {}
+
+TxnHandle&
+TxnBuffer::getTxnHandle() {
+    return m_txnHandle;
+}
+
+const std::string&
+TxnBuffer::getXid() const {
+    return m_xid;
+}
+
+bool
+TxnBuffer::is2pc() const {
+    return m_tpcFlag;
+}
 
 void
-TxnBuffer::enlist(boost::shared_ptr<TxnOp> op)
-{
+TxnBuffer::incrOpCnt() {
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_submitOpCntMutex);
+    ++m_submitOpCnt;
+}
+
+void
+TxnBuffer::decrOpCnt() {
+    const uint32_t numOps = getNumOps();
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l2(m_completeOpCntMutex);
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l3(m_submitOpCntMutex);
+    if (m_completeOpCnt == m_submitOpCnt) {
+        throw qpid::Exception("Transaction async operation count underflow");
+    }
+    ++m_completeOpCnt;
+    if (numOps == m_submitOpCnt && numOps == m_completeOpCnt) {
+        asyncLocalCommit();
+    }
+}
+
+void
+TxnBuffer::enlist(boost::shared_ptr<TxnOp> op) {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
     m_ops.push_back(op);
 }
 
 bool
-TxnBuffer::prepare(TxnHandle& th)
-{
+TxnBuffer::prepare() {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
     for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
-        if (!(*i)->prepare(th)) {
+        if (!(*i)->prepare(this)) {
             return false;
         }
     }
@@ -61,8 +116,7 @@ TxnBuffer::prepare(TxnHandle& th)
 }
 
 void
-TxnBuffer::commit()
-{
+TxnBuffer::commit() {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
     for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
         (*i)->commit();
@@ -71,8 +125,7 @@ TxnBuffer::commit()
 }
 
 void
-TxnBuffer::rollback()
-{
+TxnBuffer::rollback() {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
     for(std::vector<boost::shared_ptr<TxnOp> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
         (*i)->rollback();
@@ -81,45 +134,44 @@ TxnBuffer::rollback()
 }
 
 bool
-TxnBuffer::commitLocal(AsyncTransactionalStore* const store)
-{
-    if (store) {
-        try {
-            m_store = store;
-            asyncLocalCommit();
-        } catch (std::exception& e) {
-            QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what());
-        } catch (...) {
-            QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)");
-        }
+TxnBuffer::commitLocal(AsyncTransactionalStore* const store) {
+    try {
+        m_store = store;
+        asyncLocalCommit();
+    } catch (std::exception& e) {
+        QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what());
+    } catch (...) {
+        QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)");
     }
     return false;
 }
 
 void
-TxnBuffer::asyncLocalCommit()
-{
-    assert(m_store != 0);
+TxnBuffer::asyncLocalCommit() {
     switch(m_state) {
     case NONE:
         m_state = PREPARE;
-        m_txnHandle = m_store->createTxnHandle(this);
-        prepare(m_txnHandle);
-        break;
+        if (m_store) {
+            m_txnHandle = m_store->createTxnHandle(this);
+        }
+        prepare(/*shared_from_this()*/);
+        if (m_store) {
+            break;
+        }
     case PREPARE:
         m_state = COMMIT;
-        {
+        if (m_store) {
             boost::shared_ptr<TxnAsyncContext> tac(new TxnAsyncContext(this,
                                                                        &handleAsyncCommitResult,
                                                                        &m_resultQueue));
             m_store->testOp();
             m_store->submitCommit(m_txnHandle, tac);
+            break;
         }
-        break;
     case COMMIT:
         commit();
         m_state = COMPLETE;
-        delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+        delete this;
         break;
     case COMPLETE:
     default: ;
@@ -142,8 +194,7 @@ TxnBuffer::handleAsyncCommitResult(const
 }
 
 void
-TxnBuffer::asyncLocalAbort()
-{
+TxnBuffer::asyncLocalAbort() {
     assert(m_store != 0);
     switch (m_state) {
     case NONE:
@@ -160,7 +211,7 @@ TxnBuffer::asyncLocalAbort()
     case ROLLBACK:
         rollback();
         m_state = COMPLETE;
-        delete this; // TODO: ugly! Find a better way to handle the life cycle of this class
+        delete this;
     default: ;
     }
 }
@@ -171,11 +222,33 @@ TxnBuffer::handleAsyncAbortResult(const 
     if (arh) {
         boost::shared_ptr<TxnAsyncContext> tac = boost::dynamic_pointer_cast<TxnAsyncContext>(arh->getBrokerAsyncContext());
         if (arh->getErrNo()) {
-            QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo()
-                    << " (" << arh->getErrMsg() << ")");
+            QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr()
+                     << " failed: err=" << arh->getErrNo() << " (" << arh->getErrMsg() << ")");
         }
         tac->getTxnBuffer()->asyncLocalAbort();
     }
 }
 
+// private
+uint32_t
+TxnBuffer::getNumOps() const {
+    qpid::sys::ScopedLock<qpid::sys::Mutex> l(m_opsMutex);
+    return m_ops.size();
+}
+
+// private
+void
+TxnBuffer::createLocalXid()
+{
+    uuid_t uuid;
+    {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(s_uuidMutex);
+        ::uuid_generate_random(uuid); // Not thread-safe
+    }
+    char uuidStr[37]; // 36-char uuid + trailing '\0'
+    ::uuid_unparse(uuid, uuidStr);
+    m_xid.assign(uuidStr);
+    QPID_LOG(debug, "Local XID created: \"" << m_xid << "\"");
+}
+
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxnBuffer.h Tue Jul 31 13:35:53 2012
@@ -42,10 +42,16 @@ class TxnOp;
 class TxnBuffer {
 public:
     TxnBuffer(AsyncResultQueue& arq);
+    TxnBuffer(AsyncResultQueue& arq, std::string& xid);
     virtual ~TxnBuffer();
+    TxnHandle& getTxnHandle();
+    const std::string& getXid() const;
+    bool is2pc() const;
+    void incrOpCnt();
+    void decrOpCnt();
 
     void enlist(boost::shared_ptr<TxnOp> op);
-    bool prepare(TxnHandle& th);
+    bool prepare();
     void commit();
     void rollback();
     bool commitLocal(AsyncTransactionalStore* const store);
@@ -57,14 +63,25 @@ public:
     static void handleAsyncAbortResult(const AsyncResultHandle* const arh);
 
 private:
+    mutable qpid::sys::Mutex m_opsMutex;
+    mutable qpid::sys::Mutex m_submitOpCntMutex;
+    mutable qpid::sys::Mutex m_completeOpCntMutex;
+    static qpid::sys::Mutex s_uuidMutex;
+
     std::vector<boost::shared_ptr<TxnOp> > m_ops;
-    qpid::sys::Mutex m_opsMutex;
     TxnHandle m_txnHandle;
     AsyncTransactionalStore* m_store;
     AsyncResultQueue& m_resultQueue;
+    std::string m_xid;
+    bool m_tpcFlag;
+    uint32_t m_submitOpCnt;
+    uint32_t m_completeOpCnt;
 
     typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState;
     e_txnState m_state;
+
+    uint32_t getNumOps() const;
+    void createLocalXid();
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.cpp Tue Jul 31 13:35:53 2012
@@ -57,28 +57,5 @@ TxnHandle::operator=(const TxnHandle& r)
 
 // --- TxnHandleImpl methods ---
 
-const std::string&
-TxnHandle::getXid() const
-{
-    return impl->getXid();
-}
-
-bool
-TxnHandle::is2pc() const
-{
-    return impl->is2pc();
-}
-
-void
-TxnHandle::incrOpCnt()
-{
-    impl->incrOpCnt();
-}
-
-void
-TxnHandle::decrOpCnt()
-{
-    impl->decrOpCnt();
-}
 
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxnHandle.h Tue Jul 31 13:35:53 2012
@@ -45,11 +45,8 @@ public:
     ~TxnHandle();
     TxnHandle& operator=(const TxnHandle& r);
 
-    // TxnHandleImpl methods
-    const std::string& getXid() const;
-    bool is2pc() const;
-    void incrOpCnt();
-    void decrOpCnt();
+    // --- TxnHandleImpl methods ---
+    // <none>
 
 private:
     friend class PrivateImplRef<TxnHandle>;

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxnOp.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxnOp.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxnOp.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxnOp.h Tue Jul 31 13:35:53 2012
@@ -24,15 +24,17 @@
 #ifndef qpid_broker_TxnOp_h_
 #define qpid_broker_TxnOp_h_
 
+#include <boost/shared_ptr.hpp>
+
 namespace qpid {
 namespace broker {
 
-class TxnHandle;
+class TxnBuffer;
 
 class TxnOp{
 public:
     virtual ~TxnOp() {}
-    virtual bool prepare(TxnHandle& th) throw() = 0;
+    virtual bool prepare(qpid::broker::TxnBuffer*) throw() = 0;
     virtual void commit()  throw() = 0;
     virtual void rollback()  throw() = 0;
 };

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.cpp Tue Jul 31 13:35:53 2012
@@ -82,9 +82,9 @@ DeliveryRecord::isRedundant() const
 }
 
 void
-DeliveryRecord::dequeue(qpid::broker::TxnHandle& txn)
+DeliveryRecord::dequeue(qpid::broker::TxnBuffer* tb)
 {
-    m_queuedMessage->getQueue()->dequeue(txn, m_queuedMessage);
+    m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage);
 }
 
 void

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/DeliveryRecord.h Tue Jul 31 13:35:53 2012
@@ -30,7 +30,7 @@
 
 namespace qpid  {
 namespace broker {
-class TxnHandle;
+class TxnBuffer;
 }}
 
 namespace tests {
@@ -51,7 +51,7 @@ public:
     bool setEnded();
     bool isEnded() const;
     bool isRedundant() const;
-    void dequeue(qpid::broker::TxnHandle& txn);
+    void dequeue(qpid::broker::TxnBuffer* tb);
     void committed() const;
     boost::shared_ptr<QueuedMessage> getQueuedMessage() const;
 private:

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageConsumer.cpp Tue Jul 31 13:35:53 2012
@@ -47,22 +47,18 @@ MessageConsumer::MessageConsumer(const T
         m_queue(queue)
 {}
 
-MessageConsumer::~MessageConsumer()
-{}
+MessageConsumer::~MessageConsumer() {}
 
 void
-MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr)
-{
+MessageConsumer::record(boost::shared_ptr<DeliveryRecord> dr) {
     m_unacked.push_back(dr);
 }
 
 void
-MessageConsumer::commitComplete()
-{}
+MessageConsumer::commitComplete() {}
 
 void*
-MessageConsumer::runConsumers()
-{
+MessageConsumer::runConsumers() {
     const bool useTxns = m_perfTestParams.m_deqTxnBlockSize > 0U;
     uint16_t opsInTxnCnt = 0U;
     qpid::broker::TxnBuffer* tb = 0;
@@ -78,17 +74,13 @@ MessageConsumer::runConsumers()
             ++numMsgs;
             if (useTxns) {
                 // --- Transactional dequeue ---
+                boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
+                m_unacked.clear();
+                tb->enlist(ta);
                 if (++opsInTxnCnt >= m_perfTestParams.m_deqTxnBlockSize) {
-                    if (m_perfTestParams.m_durable) {
-                        boost::shared_ptr<TxnAccept> ta(new TxnAccept(m_unacked));
-                        m_unacked.clear();
-                        tb->enlist(ta);
-                        tb->commitLocal(m_store);
-                        if (numMsgs < m_perfTestParams.m_numMsgs) {
-                            tb = new qpid::broker::TxnBuffer(m_resultQueue);
-                        }
-                    } else {
-                        tb->commit();
+                    tb->commitLocal(m_store);
+                    if (numMsgs < m_perfTestParams.m_numMsgs) {
+                        tb = new qpid::broker::TxnBuffer(m_resultQueue);
                     }
                     opsInTxnCnt = 0U;
                 }
@@ -105,11 +97,7 @@ MessageConsumer::runConsumers()
     }
 
     if (opsInTxnCnt) {
-        if (m_perfTestParams.m_durable) {
-            tb->commitLocal(m_store);
-        } else {
-            tb->commit();
-        }
+        tb->commitLocal(m_store);
     }
 
     return reinterpret_cast<void*>(0);
@@ -117,8 +105,7 @@ MessageConsumer::runConsumers()
 
 //static
 void*
-MessageConsumer::startConsumers(void* ptr)
-{
+MessageConsumer::startConsumers(void* ptr) {
     return reinterpret_cast<MessageConsumer*>(ptr)->runConsumers();
 }
 

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.cpp Tue Jul 31 13:35:53 2012
@@ -49,12 +49,10 @@ MessageProducer::MessageProducer(const T
         m_queue(queue)
 {}
 
-MessageProducer::~MessageProducer()
-{}
+MessageProducer::~MessageProducer() {}
 
 void*
-MessageProducer::runProducers()
-{
+MessageProducer::runProducers() {
     const bool useTxns = m_perfTestParams.m_enqTxnBlockSize > 0U;
     uint16_t recsInTxnCnt = 0U;
     qpid::broker::TxnBuffer* tb = 0;
@@ -68,17 +66,13 @@ MessageProducer::runProducers()
             op->deliverTo(m_queue);
             tb->enlist(op);
             if (++recsInTxnCnt >= m_perfTestParams.m_enqTxnBlockSize) {
-                if (m_perfTestParams.m_durable) {
-                    tb->commitLocal(m_store);
+                tb->commitLocal(m_store);
 
-                    // TxnBuffer instance tb carries async state that precludes it being re-used for the next
-                    // transaction until the current commit cycle completes. So use another instance. This
-                    // instance should auto-delete when the async commit cycle completes.
-                    if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
-                        tb = new qpid::broker::TxnBuffer(m_resultQueue);
-                    }
-                } else {
-                    tb->commit();
+                // TxnBuffer instance tb carries async state that precludes it being re-used for the next
+                // transaction until the current commit cycle completes. So use another instance. This
+                // instance should auto-delete when the async commit cycle completes.
+                if ((numMsgs + 1) < m_perfTestParams.m_numMsgs) {
+                    tb = new qpid::broker::TxnBuffer(m_resultQueue);
                 }
                 recsInTxnCnt = 0U;
             }
@@ -87,11 +81,7 @@ MessageProducer::runProducers()
         }
     }
     if (recsInTxnCnt) {
-        if (m_perfTestParams.m_durable) {
-            tb->commitLocal(m_store);
-        } else {
-            tb->commit();
-        }
+        tb->commitLocal(m_store);
     }
     return reinterpret_cast<void*>(0);
 }

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/MessageProducer.h Tue Jul 31 13:35:53 2012
@@ -32,6 +32,7 @@ class AsyncStoreImpl;
 }
 namespace broker {
 class AsyncResultQueue;
+class TxnBuffer;
 }}
 
 namespace tests {
@@ -40,7 +41,6 @@ namespace asyncPerf {
 
 class SimpleQueue;
 class TestOptions;
-class TxnBuffer;
 
 class MessageProducer
 {

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.cpp Tue Jul 31 13:35:53 2012
@@ -89,9 +89,9 @@ QueuedMessage::enqHandle()
 }
 
 void
-QueuedMessage::prepareEnqueue(qpid::broker::TxnHandle& th)
+QueuedMessage::prepareEnqueue(qpid::broker::TxnBuffer* tb)
 {
-    m_queue->enqueue(th, shared_from_this());
+    m_queue->enqueue(tb, shared_from_this());
 }
 
 void

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/QueuedMessage.h Tue Jul 31 13:35:53 2012
@@ -59,7 +59,7 @@ public:
     qpid::broker::EnqueueHandle& enqHandle();
 
     // --- Transaction handling ---
-    void prepareEnqueue(qpid::broker::TxnHandle& th);
+    void prepareEnqueue(qpid::broker::TxnBuffer* tb);
     void commitEnqueue();
     void abortEnqueue();
 

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.cpp Tue Jul 31 13:35:53 2012
@@ -31,7 +31,7 @@
 
 #include "qpid/broker/AsyncResultHandle.h"
 #include "qpid/broker/QueueAsyncContext.h"
-#include "qpid/broker/TxnHandle.h"
+#include "qpid/broker/TxnBuffer.h"
 
 #include <string.h> // memcpy()
 
@@ -65,33 +65,27 @@ SimpleQueue::SimpleQueue(const std::stri
     }
 }
 
-SimpleQueue::~SimpleQueue()
-{}
+SimpleQueue::~SimpleQueue() {}
 
 const qpid::broker::QueueHandle&
-SimpleQueue::getHandle() const
-{
+SimpleQueue::getHandle() const {
     return m_queueHandle;
 }
 
 qpid::broker::QueueHandle&
-SimpleQueue::getHandle()
-{
+SimpleQueue::getHandle() {
     return m_queueHandle;
 }
 
 qpid::broker::AsyncStore*
-SimpleQueue::getStore()
-{
+SimpleQueue::getStore() {
     return m_store;
 }
 
 void
-SimpleQueue::asyncCreate()
-{
+SimpleQueue::asyncCreate() {
     if (m_store) {
         boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
-                                                                                                   s_nullTxnHandle,
                                                                                                    &handleAsyncCreateResult,
                                                                                                    &m_resultQueue));
         m_store->submitCreate(m_queueHandle, this, qac);
@@ -123,7 +117,6 @@ SimpleQueue::asyncDestroy(const bool del
     if (m_store) {
         if (deleteQueue) {
             boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
-                                                                                                       s_nullTxnHandle,
                                                                                                        &handleAsyncDestroyResult,
                                                                                                        &m_resultQueue));
             m_store->submitDestroy(m_queueHandle, qac);
@@ -151,16 +144,14 @@ SimpleQueue::handleAsyncDestroyResult(co
 }
 
 void
-SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg)
-{
+SimpleQueue::deliver(boost::intrusive_ptr<SimpleMessage> msg) {
     boost::shared_ptr<QueuedMessage> qm(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
-    enqueue(s_nullTxnHandle, qm);
+    enqueue(qm);
     push(qm);
 }
 
 bool
-SimpleQueue::dispatch(MessageConsumer& mc)
-{
+SimpleQueue::dispatch(MessageConsumer& mc) {
     boost::shared_ptr<QueuedMessage> qm;
     if (m_messages->consume(qm)) {
         boost::shared_ptr<DeliveryRecord> dr(new DeliveryRecord(qm, mc, false));
@@ -171,110 +162,95 @@ SimpleQueue::dispatch(MessageConsumer& m
 }
 
 bool
-SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm)
-{
-    return enqueue(s_nullTxnHandle, qm);
+SimpleQueue::enqueue(boost::shared_ptr<QueuedMessage> qm) {
+    return enqueue(0, qm);
 }
 
 bool
-SimpleQueue::enqueue(qpid::broker::TxnHandle& th,
-                     boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::enqueue(qpid::broker::TxnBuffer* tb,
+                     boost::shared_ptr<QueuedMessage> qm) {
     ScopedUse u(m_barrier);
     if (!u.m_acquired) {
         return false;
     }
     if (qm->payload()->isPersistent() && m_store) {
         qm->payload()->enqueueAsync(shared_from_this(), m_store);
-        return asyncEnqueue(th, qm);
+        return asyncEnqueue(tb, qm);
     }
     return false;
 }
 
 bool
-SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm)
-{
-    return dequeue(s_nullTxnHandle, qm);
+SimpleQueue::dequeue(boost::shared_ptr<QueuedMessage> qm) {
+    return dequeue(0, qm);
 }
 
 bool
-SimpleQueue::dequeue(qpid::broker::TxnHandle& th,
-                     boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::dequeue(qpid::broker::TxnBuffer* tb,
+                     boost::shared_ptr<QueuedMessage> qm) {
     ScopedUse u(m_barrier);
     if (!u.m_acquired) {
         return false;
     }
     if (qm->payload()->isPersistent() && m_store) {
         qm->payload()->dequeueAsync(shared_from_this(), m_store);
-        return asyncDequeue(th, qm);
+        return asyncDequeue(tb, qm);
     }
     return true;
 }
 
 void
-SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg)
-{
+SimpleQueue::process(boost::intrusive_ptr<SimpleMessage> msg) {
     push(boost::shared_ptr<QueuedMessage>(new QueuedMessage(this, msg)));
 }
 
 void
-SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage> /*msg*/)
-{}
+SimpleQueue::enqueueAborted(boost::intrusive_ptr<SimpleMessage>) {}
 
 void
-SimpleQueue::encode(qpid::framing::Buffer& buffer) const
-{
+SimpleQueue::encode(qpid::framing::Buffer& buffer) const {
     buffer.putShortString(m_name);
 }
 
 uint32_t
-SimpleQueue::encodedSize() const
-{
+SimpleQueue::encodedSize() const {
     return m_name.size() + 1;
 }
 
 uint64_t
-SimpleQueue::getPersistenceId() const
-{
+SimpleQueue::getPersistenceId() const {
     return m_persistenceId;
 }
 
 void
-SimpleQueue::setPersistenceId(uint64_t persistenceId) const
-{
+SimpleQueue::setPersistenceId(uint64_t persistenceId) const {
     m_persistenceId = persistenceId;
 }
 
 void
-SimpleQueue::flush()
-{
+SimpleQueue::flush() {
     //if(m_store) m_store->flush(*this);
 }
 
 const std::string&
-SimpleQueue::getName() const
-{
+SimpleQueue::getName() const {
     return m_name;
 }
 
 void
-SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst)
-{
+SimpleQueue::setExternalQueueStore(qpid::broker::ExternalQueueStore* inst) {
     if (externalQueueStore != inst && externalQueueStore)
         delete externalQueueStore;
     externalQueueStore = inst;
 }
 
 uint64_t
-SimpleQueue::getSize()
-{
+SimpleQueue::getSize() {
     return m_persistableData.size();
 }
 
 void
-SimpleQueue::write(char* target)
-{
+SimpleQueue::write(char* target) {
     ::memcpy(target, m_persistableData.data(), m_persistableData.size());
 }
 
@@ -344,21 +320,20 @@ SimpleQueue::push(boost::shared_ptr<Queu
 
 // private
 bool
-SimpleQueue::asyncEnqueue(qpid::broker::TxnHandle& th,
-                          boost::shared_ptr<QueuedMessage> qm)
-{
+SimpleQueue::asyncEnqueue(qpid::broker::TxnBuffer* tb,
+                          boost::shared_ptr<QueuedMessage> qm) {
     assert(qm.get());
-//    qm.payload()->setPersistenceId(m_store->getNextRid()); // TODO: rid is set by store itself - find way to do this
     boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
                                                                                                qm->payload(),
-                                                                                               th,
+                                                                                               tb,
                                                                                                &handleAsyncEnqueueResult,
                                                                                                &m_resultQueue));
-    // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
-    if (th.isValid()) {
-        th.incrOpCnt();
+    if (tb) {
+        tb->incrOpCnt();
+        m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac);
+    } else {
+        m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac);
     }
-    m_store->submitEnqueue(qm->enqHandle(), th, qac);
     ++m_asyncOpCounter;
     return true;
 }
@@ -382,22 +357,21 @@ SimpleQueue::handleAsyncEnqueueResult(co
 
 // private
 bool
-SimpleQueue::asyncDequeue(qpid::broker::TxnHandle& th,
+SimpleQueue::asyncDequeue(/*boost::shared_ptr<qpid::broker::TxnBuffer>*/qpid::broker::TxnBuffer* tb,
                           boost::shared_ptr<QueuedMessage> qm)
 {
     assert(qm.get());
     boost::shared_ptr<qpid::broker::QueueAsyncContext> qac(new qpid::broker::QueueAsyncContext(shared_from_this(),
                                                                                                qm->payload(),
-                                                                                               th,
+                                                                                               tb,
                                                                                                &handleAsyncDequeueResult,
                                                                                                &m_resultQueue));
-    // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
-    if (th.isValid()) {
-        th.incrOpCnt();
-    }
-    m_store->submitDequeue(qm->enqHandle(),
-                           th,
-                           qac);
+    if (tb) {
+        tb->incrOpCnt();
+        m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac);
+    } else {
+        m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac);
+    }
     ++m_asyncOpCounter;
     return true;
 }
@@ -420,8 +394,7 @@ SimpleQueue::handleAsyncDequeueResult(co
 
 // private
 void
-SimpleQueue::destroyCheck(const std::string& opDescr) const
-{
+SimpleQueue::destroyCheck(const std::string& opDescr) const {
     if (m_destroyPending || m_destroyed) {
         std::ostringstream oss;
         oss << opDescr << " on queue \"" << m_name << "\" after call to destroy";
@@ -431,55 +404,54 @@ SimpleQueue::destroyCheck(const std::str
 
 // private
 void
-SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
-    assert(qc->getQueue().get() == this);
+SimpleQueue::createComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+    if (qc.get()) {
+        assert(qc->getQueue().get() == this);
+    }
     --m_asyncOpCounter;
 }
 
 // private
 void
-SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
-    assert(qc->getQueue().get() == this);
+SimpleQueue::flushComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+    if (qc.get()) {
+        assert(qc->getQueue().get() == this);
+    }
     --m_asyncOpCounter;
 }
 
 // private
 void
-SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
-    assert(qc->getQueue().get() == this);
+SimpleQueue::destroyComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+    if (qc.get()) {
+        assert(qc->getQueue().get() == this);
+    }
     --m_asyncOpCounter;
     m_destroyed = true;
 }
 
 // private
 void
-SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
-    assert(qc->getQueue().get() == this);
-    --m_asyncOpCounter;
-
-    // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
-    qpid::broker::TxnHandle th = qc->getTxnHandle();
-    if (th.isValid()) { // transactional enqueue
-        th.decrOpCnt();
+SimpleQueue::enqueueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+    if (qc.get()) {
+        assert(qc->getQueue().get() == this);
+        if (qc->getTxnBuffer()) { // transactional enqueue
+            qc->getTxnBuffer()->decrOpCnt();
+        }
     }
+    --m_asyncOpCounter;
 }
 
 // private
 void
-SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc)
-{
-    assert(qc->getQueue().get() == this);
-    --m_asyncOpCounter;
-
-    // TODO : This must be done from inside store, not here (the txn handle is opaque outside the store)
-    qpid::broker::TxnHandle th = qc->getTxnHandle();
-    if (th.isValid()) { // transactional enqueue
-        th.decrOpCnt();
+SimpleQueue::dequeueComplete(const boost::shared_ptr<qpid::broker::QueueAsyncContext> qc) {
+    if (qc.get()) {
+        assert(qc->getQueue().get() == this);
+        if (qc->getTxnBuffer()) { // transactional enqueue
+            qc->getTxnBuffer()->decrOpCnt();
+        }
     }
+    --m_asyncOpCounter;
 }
 
 }}} // namespace tests::storePerftools::asyncPerf

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/SimpleQueue.h Tue Jul 31 13:35:53 2012
@@ -38,6 +38,7 @@ namespace qpid {
 namespace broker {
 class AsyncResultQueue;
 class QueueAsyncContext;
+class TxnBuffer;
 }
 namespace framing {
 class FieldTable;
@@ -76,10 +77,10 @@ public:
     void deliver(boost::intrusive_ptr<SimpleMessage> msg);
     bool dispatch(MessageConsumer& mc);
     bool enqueue(boost::shared_ptr<QueuedMessage> qm);
-    bool enqueue(qpid::broker::TxnHandle& th,
+    bool enqueue(qpid::broker::TxnBuffer* tb,
                  boost::shared_ptr<QueuedMessage> qm);
     bool dequeue(boost::shared_ptr<QueuedMessage> qm);
-    bool dequeue(qpid::broker::TxnHandle& th,
+    bool dequeue(qpid::broker::TxnBuffer* tb,
                  boost::shared_ptr<QueuedMessage> qm);
     void process(boost::intrusive_ptr<SimpleMessage> msg);
     void enqueueAborted(boost::intrusive_ptr<SimpleMessage> msg);
@@ -134,10 +135,10 @@ private:
               bool isRecovery = false);
 
     // -- Async ops ---
-    bool asyncEnqueue(qpid::broker::TxnHandle& th,
+    bool asyncEnqueue(qpid::broker::TxnBuffer* tb,
                       boost::shared_ptr<QueuedMessage> qm);
     static void handleAsyncEnqueueResult(const qpid::broker::AsyncResultHandle* const arh);
-    bool asyncDequeue(qpid::broker::TxnHandle& th,
+    bool asyncDequeue(qpid::broker::TxnBuffer* tb,
                       boost::shared_ptr<QueuedMessage> qm);
     static void handleAsyncDequeueResult(const qpid::broker::AsyncResultHandle* const arh);
 

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.cpp Tue Jul 31 13:35:53 2012
@@ -35,18 +35,17 @@ TxnAccept::TxnAccept(std::deque<boost::s
         m_ops(ops)
 {}
 
-TxnAccept::~TxnAccept()
-{}
+TxnAccept::~TxnAccept() {}
 
 // --- Interface TxnOp ---
 
 bool
-TxnAccept::prepare(qpid::broker::TxnHandle& th) throw()
-{
+TxnAccept::prepare(qpid::broker::TxnBuffer* tb) throw() {
     try {
         for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) {
-            (*i)->dequeue(th);
+            (*i)->dequeue(tb);
         }
+        return true;
     } catch (const std::exception& e) {
         QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what());
     } catch (...) {
@@ -56,8 +55,7 @@ TxnAccept::prepare(qpid::broker::TxnHand
 }
 
 void
-TxnAccept::commit()  throw()
-{
+TxnAccept::commit() throw() {
     try {
         for (std::deque<boost::shared_ptr<DeliveryRecord> >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) {
             (*i)->committed();
@@ -71,7 +69,6 @@ TxnAccept::commit()  throw()
 }
 
 void
-TxnAccept::rollback()  throw()
-{}
+TxnAccept::rollback() throw() {}
 
 }}} // namespace tests::storePerftools::asyncPerf

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnAccept.h Tue Jul 31 13:35:53 2012
@@ -41,9 +41,9 @@ public:
     virtual ~TxnAccept();
 
     // --- Interface TxnOp ---
-    bool prepare(qpid::broker::TxnHandle& th) throw();
-    void commit()  throw();
-    void rollback()  throw();
+    bool prepare(qpid::broker::TxnBuffer* tb) throw();
+    void commit() throw();
+    void rollback() throw();
 private:
     std::deque<boost::shared_ptr<DeliveryRecord> > m_ops;
 };

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.cpp Tue Jul 31 13:35:53 2012
@@ -38,15 +38,13 @@ TxnPublish::TxnPublish(boost::intrusive_
         m_msg(msg)
 {}
 
-TxnPublish::~TxnPublish()
-{}
+TxnPublish::~TxnPublish() {}
 
 bool
-TxnPublish::prepare(qpid::broker::TxnHandle& th) throw()
-{
-    try{
+TxnPublish::prepare(qpid::broker::TxnBuffer* tb) throw() {
+    try {
         while (!m_queues.empty()) {
-            m_queues.front()->prepareEnqueue(th);
+            m_queues.front()->prepareEnqueue(tb);
             m_prepared.push_back(m_queues.front());
             m_queues.pop_front();
         }
@@ -60,8 +58,7 @@ TxnPublish::prepare(qpid::broker::TxnHan
 }
 
 void
-TxnPublish::commit() throw()
-{
+TxnPublish::commit() throw() {
     try {
         for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
             (*i)->commitEnqueue();
@@ -74,8 +71,7 @@ TxnPublish::commit() throw()
 }
 
 void
-TxnPublish::rollback() throw()
-{
+TxnPublish::rollback() throw() {
     try {
         for (std::list<boost::shared_ptr<QueuedMessage> >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) {
             (*i)->abortEnqueue();
@@ -88,21 +84,18 @@ TxnPublish::rollback() throw()
 }
 
 uint64_t
-TxnPublish::contentSize()
-{
+TxnPublish::contentSize() {
     return m_msg->contentSize();
 }
 
 void
-TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue)
-{
+TxnPublish::deliverTo(const boost::shared_ptr<SimpleQueue>& queue) {
     m_queues.push_back(boost::shared_ptr<QueuedMessage>(new QueuedMessage(queue.get(), m_msg)));
     m_delivered = true;
 }
 
 SimpleMessage&
-TxnPublish::getMessage()
-{
+TxnPublish::getMessage() {
     return *m_msg;
 }
 

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h?rev=1367535&r1=1367534&r2=1367535&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/TxnPublish.h Tue Jul 31 13:35:53 2012
@@ -48,7 +48,7 @@ public:
     virtual ~TxnPublish();
 
     // --- Interface TxOp ---
-    bool prepare(qpid::broker::TxnHandle& th) throw();
+    bool prepare(qpid::broker::TxnBuffer* tb) throw();
     void commit() throw();
     void rollback() throw();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org