You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/03/30 17:50:10 UTC

svn commit: r524139 [1/2] - in /incubator/qpid/trunk/qpid/cpp: lib/broker/ tests/

Author: gsim
Date: Fri Mar 30 08:50:07 2007
New Revision: 524139

URL: http://svn.apache.org/viewvc?view=rev&rev=524139
Log:
Refactored the MessageStore interface to restrict visibility of broker core from store implementations.


Added:
    incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
    incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h
    incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp
    incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h
    incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp
    incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp
    incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
    incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp
    incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Broker.cpp Fri Mar 30 08:50:07 2007
@@ -29,6 +29,7 @@
 #include "MessageStoreModule.h"
 #include "NullMessageStore.h"
 #include "ProtocolInitiation.h"
+#include "RecoveryManagerImpl.h"
 #include "Connection.h"
 #include "sys/ConnectionInputHandler.h"
 #include "sys/ConnectionInputHandlerFactory.h"
@@ -61,9 +62,8 @@
     exchanges.declare(amq_match, HeadersExchange::typeName);
 
     if(store.get()) {
-        RecoveryManager recoverer(queues, exchanges);
-        MessageStoreSettings storeSettings = { getStagingThreshold() };
-        store->recover(recoverer, &storeSettings);
+        RecoveryManagerImpl recoverer(queues, exchanges, conf.getStagingThreshold());
+        store->recover(recoverer);
     }
 
     cleaner.start();

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerChannel.cpp Fri Mar 30 08:50:07 2007
@@ -264,7 +264,7 @@
             throw ConnectionException(530, "Received ack for unrecognised delivery tag");
         }else if(i!=j){
             ack_iterator end = ++i;
-            for_each(j, end, mem_fun_ref(&DeliveryRecord::discard));
+            for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
             unacked.erase(unacked.begin(), end);
 
             //recalculate the prefetch:

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp Fri Mar 30 08:50:07 2007
@@ -33,6 +33,7 @@
 #include "AMQMethodBody.h"
 #include "AMQFrame.h"
 #include "framing/ChannelAdapter.h"
+#include "RecoveryManagerImpl.h"
 
 using namespace boost;
 using namespace qpid::broker;
@@ -134,6 +135,9 @@
 
 void BasicMessage::decodeHeader(Buffer& buffer)
 {
+    //don't care about the type here, but want encode/decode to be symmetric
+    RecoveryManagerImpl::decodeMessageType(buffer);    
+
     string exchange;
     string routingKey;
 
@@ -169,40 +173,42 @@
     }
 }
 
-void BasicMessage::encode(Buffer& buffer)
+void BasicMessage::encode(Buffer& buffer) const
 {
     encodeHeader(buffer);
     encodeContent(buffer);
 }
 
-void BasicMessage::encodeHeader(Buffer& buffer)
+void BasicMessage::encodeHeader(Buffer& buffer) const
 {
+    RecoveryManagerImpl::encodeMessageType(*this, buffer);
     buffer.putShortString(getExchange());
     buffer.putShortString(getRoutingKey());    
     buffer.putLong(header->size());
     header->encode(buffer);
 }
 
-void BasicMessage::encodeContent(Buffer& buffer)
+void BasicMessage::encodeContent(Buffer& buffer) const
 {
     Mutex::ScopedLock locker(contentLock);
     if (content.get()) content->encode(buffer);
 }
 
-uint32_t BasicMessage::encodedSize()
+uint32_t BasicMessage::encodedSize() const
 {
     return  encodedHeaderSize() + encodedContentSize();
 }
 
-uint32_t BasicMessage::encodedContentSize()
+uint32_t BasicMessage::encodedContentSize() const
 {
     Mutex::ScopedLock locker(contentLock);
     return content.get() ? content->size() : 0;
 }
 
-uint32_t BasicMessage::encodedHeaderSize()
+uint32_t BasicMessage::encodedHeaderSize() const
 {
-    return getExchange().size() + 1
+    return RecoveryManagerImpl::encodedMessageTypeSize()
+        +getExchange().size() + 1
         + getRoutingKey().size() + 1
         + header->size() + 4;//4 extra bytes for size
 }
@@ -216,7 +222,7 @@
 {
     Mutex::ScopedLock locker(contentLock);
     if (!isPersistent() && getPersistenceId() == 0) {
-        store->stage(this);
+        store->stage(*this);
     }
     if (!content.get() || content->size() > 0) {
         //set content to lazy loading mode (but only if there is

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.h Fri Mar 30 08:50:07 2007
@@ -53,7 +53,7 @@
 class BasicMessage : public Message {
     boost::shared_ptr<framing::AMQHeaderBody> header;
     std::auto_ptr<Content> content;
-    sys::Mutex contentLock;
+    mutable sys::Mutex contentLock;
     uint64_t size;
 
     void sendContent(framing::ChannelAdapter&, uint32_t framesize);
@@ -92,25 +92,25 @@
     void decodeHeader(framing::Buffer& buffer);
     void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
 
-    void encode(framing::Buffer& buffer);
-    void encodeHeader(framing::Buffer& buffer);
-    void encodeContent(framing::Buffer& buffer);
+    void encode(framing::Buffer& buffer) const;
+    void encodeHeader(framing::Buffer& buffer) const;
+    void encodeContent(framing::Buffer& buffer) const;
     /**
      * @returns the size of the buffer needed to encode this
      * message in its entirety
      */
-    uint32_t encodedSize();
+    uint32_t encodedSize() const;
     /**
      * @returns the size of the buffer needed to encode the
      * 'header' of this message (not just the header frame,
      * but other meta data e.g.routing key and exchange)
      */
-    uint32_t encodedHeaderSize();
+    uint32_t encodedHeaderSize() const;
     /**
      * @returns the size of the buffer needed to encode the
      * (possibly partial) content held by this message
      */
-    uint32_t encodedContentSize();
+    uint32_t encodedContentSize() const;
     /**
      * Releases the in-memory content data held by this
      * message. Must pass in a store from which the data can

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageBase.h Fri Mar 30 08:50:07 2007
@@ -25,6 +25,7 @@
 #include <string>
 #include <boost/shared_ptr.hpp>
 #include "Content.h"
+#include "PersistableMessage.h"
 #include "framing/amqp_types.h"
 
 namespace qpid {
@@ -49,7 +50,7 @@
  * abstracting away the operations
  * TODO; AMS: for the moment this is mostly a placeholder
  */
-class Message {
+class Message : public PersistableMessage{
   public:
     typedef boost::shared_ptr<Message> shared_ptr;
     typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
@@ -117,25 +118,25 @@
         return publisher;
     }
 
-    virtual void encode(framing::Buffer& buffer) = 0;
-    virtual void encodeHeader(framing::Buffer& buffer) = 0;
+    virtual void encode(framing::Buffer& buffer) const = 0;
+    virtual void encodeHeader(framing::Buffer& buffer) const = 0;
 
     /**
      * @returns the size of the buffer needed to encode this
      * message in its entirety
      */
-    virtual uint32_t encodedSize() = 0;
+    virtual uint32_t encodedSize() const = 0;
     /**
      * @returns the size of the buffer needed to encode the
      * 'header' of this message (not just the header frame,
      * but other meta data e.g.routing key and exchange)
      */
-    virtual uint32_t encodedHeaderSize() = 0;
+    virtual uint32_t encodedHeaderSize() const = 0;
     /**
      * @returns the size of the buffer needed to encode the
      * (possibly partial) content held by this message
      */
-    virtual uint32_t encodedContentSize() = 0;
+    virtual uint32_t encodedContentSize() const = 0;
     /**
      * If headers have been received, returns the expected
      * content size else returns 0.
@@ -145,6 +146,7 @@
     virtual void decodeHeader(framing::Buffer& buffer) = 0;
     virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0;
 
+    static shared_ptr decode(framing::Buffer& buffer); 
             
     // TODO: AMS 29/1/2007 Don't think these are really part of base class
             

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.cpp Fri Mar 30 08:50:07 2007
@@ -29,6 +29,7 @@
 #include "framing/AMQFrame.h"
 #include "framing/FieldTable.h"
 #include "framing/BasicHeaderProperties.h"
+#include "RecoveryManagerImpl.h"
 
 #include <algorithm>
 
@@ -217,17 +218,17 @@
     return transfer->getDeliveryMode() == PERSISTENT;
 }
 
-uint32_t MessageMessage::encodedSize()
+uint32_t MessageMessage::encodedSize() const
 {
     return encodedHeaderSize() + encodedContentSize();
 }
 
-uint32_t MessageMessage::encodedHeaderSize()
+uint32_t MessageMessage::encodedHeaderSize() const
 {
-    return transfer->size() - transfer->baseSize(); 
+    return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize(); 
 }
 
-uint32_t MessageMessage::encodedContentSize()
+uint32_t MessageMessage::encodedContentSize() const
 {
     return 0;
 }
@@ -237,13 +238,14 @@
     return 0;
 }
 
-void MessageMessage::encode(Buffer& buffer)
+void MessageMessage::encode(Buffer& buffer) const
 {
     encodeHeader(buffer);
 }
 
-void MessageMessage::encodeHeader(Buffer& buffer)
+void MessageMessage::encodeHeader(Buffer& buffer) const
 {
+    RecoveryManagerImpl::encodeMessageType(*this, buffer);
     if (transfer->getBody().isInline()) {
         transfer->encodeContent(buffer);
     } else {
@@ -259,6 +261,9 @@
 
 void MessageMessage::decodeHeader(Buffer& buffer)
 {
+    //don't care about the type here, but want encode/decode to be symmetric
+    RecoveryManagerImpl::decodeMessageType(buffer);    
+
     transfer->decodeContent(buffer);
 }
 
@@ -269,7 +274,7 @@
 
 MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
                                                   const string& destination, 
-                                                  const framing::Content& body)
+                                                  const framing::Content& body) const
 {
     return new MessageTransferBody(version, 
                                    transfer->getTicket(),

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessageMessage.h Fri Mar 30 08:50:07 2007
@@ -71,11 +71,11 @@
     const framing::FieldTable& getApplicationHeaders();
     bool isPersistent();
             
-    void encode(framing::Buffer& buffer);
-    void encodeHeader(framing::Buffer& buffer);
-    uint32_t encodedSize();
-    uint32_t encodedHeaderSize();
-    uint32_t encodedContentSize();
+    void encode(framing::Buffer& buffer) const;
+    void encodeHeader(framing::Buffer& buffer) const;
+    uint32_t encodedSize() const;
+    uint32_t encodedHeaderSize() const;
+    uint32_t encodedContentSize() const;
     uint64_t expectedContentSize();
     void decodeHeader(framing::Buffer& buffer);
     void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
@@ -86,7 +86,7 @@
                          uint32_t framesize);
     framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version,
                                                const std::string& destination, 
-                                               const framing::Content& body);
+                                               const framing::Content& body) const;
   
     framing::RequestId requestId;
     const TransferPtr transfer;

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp Fri Mar 30 08:50:07 2007
@@ -26,6 +26,7 @@
 #include <sys/Monitor.h>
 #include <sys/Time.h>
 #include <iostream>
+#include "QueueRegistry.h"
 
 using namespace qpid::broker;
 using namespace qpid::sys;
@@ -53,7 +54,7 @@
 Queue::~Queue(){}
 
 void Queue::deliver(Message::shared_ptr& msg){
-    enqueue(0, msg, 0);
+    enqueue(0, msg);
     process(msg);
 }
 
@@ -195,17 +196,17 @@
     return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete);
 }
 
-void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
 {
     if (msg->isPersistent() && store) {
-        store->enqueue(ctxt, msg.get(), *this, xid);
+        store->enqueue(ctxt, *msg.get(), *this);
     }
 }
 
-void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
 {
     if (msg->isPersistent() && store) {
-        store->dequeue(ctxt, msg.get(), *this, xid);
+        store->dequeue(ctxt, *msg.get(), *this);
     }
 }
 
@@ -217,8 +218,10 @@
 
 void Queue::create(const FieldTable& settings)
 {
+    //TODO: hold onto settings and persist them as part of encode
+    //      in fact settings should be passed in on construction
     if (store) {
-        store->create(*this, settings);
+        store->create(*this);
     }
     configure(settings);
 }
@@ -246,3 +249,34 @@
 {
     return policy.get();
 }
+
+uint64_t Queue::getPersistenceId() const 
+{ 
+    return persistenceId; 
+}
+
+void Queue::setPersistenceId(uint64_t _persistenceId)
+{ 
+    persistenceId = _persistenceId; 
+}
+
+void Queue::encode(framing::Buffer& buffer) const 
+{
+    buffer.putShortString(name);
+    //TODO store all required properties
+}
+
+uint32_t Queue::encodedSize() const
+{
+    //TODO, revise when storing full set of queue properties
+    return name.size() + 1/*short string size octet*/;
+}
+
+Queue::shared_ptr Queue::decode(QueueRegistry& queues, framing::Buffer& buffer)
+{
+    string name;
+    buffer.getShortString(name);
+    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+    return result.first;
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.h Fri Mar 30 08:50:07 2007
@@ -31,6 +31,7 @@
 #include <BrokerMessage.h>
 #include <FieldTable.h>
 #include <sys/Monitor.h>
+#include "PersistableQueue.h"
 #include <QueuePolicy.h>
 
 // TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to
@@ -39,6 +40,7 @@
 namespace qpid {
     namespace broker {
         class MessageStore;
+        class QueueRegistry;
 
         /**
          * Thrown when exclusive access would be violated.
@@ -51,7 +53,7 @@
          * registered consumers or be stored until dequeued or until one
          * or more consumers registers.
          */
-        class Queue{
+        class Queue : public PersistableQueue{
             typedef std::vector<Consumer*> Consumers;
             typedef std::queue<Message::shared_ptr> Messages;
             
@@ -119,22 +121,28 @@
             inline const string& getName() const { return name; }
             inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
             inline bool hasExclusiveConsumer() const { return exclusive; }
-            inline uint64_t getPersistenceId() const { return persistenceId; }
-            inline void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
 
             bool canAutoDelete() const;
 
-            void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
+            void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
             /**
              * dequeue from store (only done once messages is acknowledged)
              */
-            void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
+            void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
             /**
              * dequeues from memory only
              */
             Message::shared_ptr dequeue();
 
             const QueuePolicy* const getPolicy();
+
+            //PersistableQueue support:
+            uint64_t getPersistenceId() const;
+            void setPersistenceId(uint64_t persistenceId);
+            void encode(framing::Buffer& buffer) const;
+            uint32_t encodedSize() const;
+
+            static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer);
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.cpp Fri Mar 30 08:50:07 2007
@@ -42,12 +42,8 @@
                                                                pull(true){}
 
 
-void DeliveryRecord::discard(TransactionContext* ctxt, const std::string* const xid) const{
-    queue->dequeue(ctxt, msg, xid);
-}
-
-void DeliveryRecord::discard() const{
-    discard(0, 0);
+void DeliveryRecord::discard(TransactionContext* ctxt) const{
+    queue->dequeue(ctxt, msg);
 }
 
 bool DeliveryRecord::matches(uint64_t tag) const{

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/DeliveryRecord.h Fri Mar 30 08:50:07 2007
@@ -46,8 +46,7 @@
             DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const uint64_t deliveryTag);
             DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const uint64_t deliveryTag);
             
-            void discard() const;
-            void discard(TransactionContext* ctxt, const std::string* const xid) const;
+            void discard(TransactionContext* ctxt = 0) const;
             bool matches(uint64_t tag) const;
             bool coveredBy(const AccumulatedAck* const range) const;
             void requeue() const;

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp Fri Mar 30 08:50:07 2007
@@ -27,7 +27,7 @@
 
 LazyLoadedContent::~LazyLoadedContent()
 {
-    store->destroy(msg);
+    store->destroy(*msg);
 }
 
 LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) : 
@@ -35,7 +35,7 @@
 
 void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
 {
-    store->appendContent(msg, data->getData());
+    store->appendContent(*msg, data->getData());
 }
 
 uint32_t LazyLoadedContent::size()
@@ -50,13 +50,13 @@
         {            
             uint64_t remaining = expectedSize - offset;
             string data;
-            store->loadContent(msg, data, offset,
+            store->loadContent(*msg, data, offset,
                                remaining > framesize ? framesize : remaining);
             channel.send(new AMQContentBody(data));
         }
     } else {
         string data;
-        store->loadContent(msg, data, 0, expectedSize);  
+        store->loadContent(*msg, data, 0, expectedSize);  
         channel.send(new AMQContentBody(data));
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h Fri Mar 30 08:50:07 2007
@@ -23,6 +23,7 @@
 
 #include <Content.h>
 #include <MessageStore.h>
+#include "BrokerMessageBase.h"
 
 namespace qpid {
     namespace broker {

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Makefile.am Fri Mar 30 08:50:07 2007
@@ -61,13 +61,20 @@
   NameGenerator.h				\
   NullMessageStore.cpp				\
   NullMessageStore.h				\
+  Persistable.h					\
+  PersistableExchange.h				\
+  PersistableMessage.h				\
+  PersistableQueue.h				\
   Prefetch.h					\
   QueuePolicy.cpp				\
   QueuePolicy.h					\
   QueueRegistry.cpp				\
   QueueRegistry.h				\
-  RecoveryManager.cpp				\
+  RecoverableMessage.h                          \
+  RecoverableQueue.h                            \
   RecoveryManager.h				\
+  RecoveryManagerImpl.cpp			\
+  RecoveryManagerImpl.h				\
   Reference.cpp					\
   Reference.h					\
   ConnectionFactory.cpp				\

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp Fri Mar 30 08:50:07 2007
@@ -56,7 +56,7 @@
     }
     message->setHeader(header);
     if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
-        store->stage(message.get());
+        store->stage(*message);
         message->releaseContent(store);
     } else {
         auto_ptr<Content> content(new InMemoryContent());

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h Fri Mar 30 08:50:07 2007
@@ -21,119 +21,108 @@
 #ifndef _MessageStore_
 #define _MessageStore_
 
-#include <BrokerMessage.h>
-#include <FieldTable.h>
-#include <RecoveryManager.h>
-#include <TransactionalStore.h>
+#include "PersistableExchange.h"
+#include "PersistableMessage.h"
+#include "PersistableQueue.h"
+#include "RecoveryManager.h"
+#include "TransactionalStore.h"
 
 namespace qpid {
-    namespace broker {
-        struct MessageStoreSettings
-        {
-            /**
-             * Messages whose content length is larger than this value
-             * will be staged (i.e. will have thier data written to
-             * disk as it arrives) and will load their data lazily. On
-             * recovery therefore, only the headers should be loaded.
-             */
-            uint64_t stagingThreshold;
-        };
-        /**
-         * An abstraction of the persistent storage for messages. (In
-         * all methods, any pointers/references to queues or messages
-         * are valid only for the duration of the call).
-         */
-        class MessageStore : public TransactionalStore{
-        public:
-            /**
-             * Record the existance of a durable queue
-             */
-            virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0;
-            /**
-             * Destroy a durable queue
-             */
-            virtual void destroy(const Queue& queue) = 0;
+namespace broker {
 
-            /**
-             * Request recovery of queue and message state from store
-             */
-            virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0;
-
-            /**
-             * Stores a messages before it has been enqueued
-             * (enqueueing automatically stores the message so this is
-             * only required if storage is required prior to that
-             * point). If the message has not yet been stored it will
-             * store the headers as well as any content passed in. A
-             * persistence id will be set on the message which can be
-             * used to load the content or to append to it.
-             */
-            virtual void stage(Message* const msg) = 0;
+/**
+ * An abstraction of the persistent storage for messages. (In
+ * all methods, any pointers/references to queues or messages
+ * are valid only for the duration of the call).
+ */
+class MessageStore : public TransactionalStore{
+public:
+    /**
+     * Record the existence of a durable queue
+     */
+    virtual void create(const PersistableQueue& queue) = 0;
+    /**
+     * Destroy a durable queue
+     */
+    virtual void destroy(const PersistableQueue& queue) = 0;
+    
+    /**
+     * Record the existence of a durable exchange
+     */
+    virtual void create(const PersistableExchange& exchange) = 0;
+    /**
+     * Destroy a durable exchange
+     */
+    virtual void destroy(const PersistableExchange& exchange) = 0;
+    
+    /**
+     * Request recovery of queue and message state from store
+     */
+    virtual void recover(RecoveryManager& queues) = 0;
+    
+    /**
+     * Stores a messages before it has been enqueued
+     * (enqueueing automatically stores the message so this is
+     * only required if storage is required prior to that
+     * point). If the message has not yet been stored it will
+     * store the headers as well as any content passed in. A
+     * persistence id will be set on the message which can be
+     * used to load the content or to append to it.
+     */
+    virtual void stage(PersistableMessage& msg) = 0;
             
-            /**
-             * Destroys a previously staged message. This only needs
-             * to be called if the message is never enqueued. (Once
-             * enqueued, deletion will be automatic when the message
-             * is dequeued from all queues it was enqueued onto).
-             */
-            virtual void destroy(Message* const msg) = 0;
-
-            /**
-             * Appends content to a previously staged message
-             */
-            virtual void appendContent(Message* const msg, const std::string& data) = 0;
-
-            /**
-             * Loads (a section) of content data for the specified
-             * message (previously stored through a call to stage or
-             * enqueue) into data. The offset refers to the content
-             * only (i.e. an offset of 0 implies that the start of the
-             * content should be loaded, not the headers or related
-             * meta-data).
-             */
-            virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length) = 0;
-
-            /**
-             * Enqueues a message, storing the message if it has not
-             * been previously stored and recording that the given
-             * message is on the given queue.
-             * 
-             * @param msg the message to enqueue
-             * @param queue the name of the queue onto which it is to be enqueued
-             * @param xid (a pointer to) an identifier of the
-             * distributed transaction in which the operation takes
-             * place or null for 'local' transactions
-             */
-            virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
-            /**
-             * Dequeues a message, recording that the given message is
-             * no longer on the given queue and deleting the message
-             * if it is no longer on any other queue.
-             * 
-             * @param msg the message to dequeue
-             * @param queue the name of th queue from which it is to be dequeued
-             * @param xid (a pointer to) an identifier of the
-             * distributed transaction in which the operation takes
-             * place or null for 'local' transactions
-             */
-            virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
-
-            /**
-             * Treat all enqueue/dequeues where this xid was specified as being prepared.
-             */
-            virtual void prepared(const std::string * const xid) = 0;
-            /**
-             * Treat all enqueue/dequeues where this xid was specified as being committed.
-             */
-            virtual void committed(const std::string * const xid) = 0;
-            /**
-             * Treat all enqueue/dequeues where this xid was specified as being aborted.
-             */
-            virtual void aborted(const std::string * const xid) = 0;
+    /**
+     * Destroys a previously staged message. This only needs
+     * to be called if the message is never enqueued. (Once
+     * enqueued, deletion will be automatic when the message
+     * is dequeued from all queues it was enqueued onto).
+     */
+    virtual void destroy(PersistableMessage& msg) = 0;
+
+    /**
+     * Appends content to a previously staged message
+     */
+    virtual void appendContent(PersistableMessage& msg, const std::string& data) = 0;
+    
+    /**
+     * Loads (a section) of content data for the specified
+     * message (previously stored through a call to stage or
+     * enqueue) into data. The offset refers to the content
+     * only (i.e. an offset of 0 implies that the start of the
+     * content should be loaded, not the headers or related
+     * meta-data).
+     */
+    virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0;
+    
+    /**
+     * Enqueues a message, storing the message if it has not
+     * been previously stored and recording that the given
+     * message is on the given queue.
+     * 
+     * @param msg the message to enqueue
+     * @param queue the name of the queue onto which it is to be enqueued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
+    /**
+     * Dequeues a message, recording that the given message is
+     * no longer on the given queue and deleting the message
+     * if it is no longer on any other queue.
+     * 
+     * @param msg the message to dequeue
+     * @param queue the name of th queue from which it is to be dequeued
+     * @param xid (a pointer to) an identifier of the
+     * distributed transaction in which the operation takes
+     * place or null for 'local' transactions
+     */
+    virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
+    
+    virtual ~MessageStore(){}
+};
 
-            virtual ~MessageStore(){}
-        };
-    }
+}
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp Fri Mar 30 08:50:07 2007
@@ -28,77 +28,82 @@
 {
 }
 
-void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings)
+void MessageStoreModule::create(const PersistableQueue& queue)
 {
-    store->create(queue, settings);
+    store->create(queue);
 }
 
-void MessageStoreModule::destroy(const Queue& queue)
+void MessageStoreModule::destroy(const PersistableQueue& queue)
 {
     store->destroy(queue);
 }
 
-void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings)
+void MessageStoreModule::create(const PersistableExchange& exchange)
 {
-    store->recover(registry, settings);
+    store->create(exchange);
 }
 
-void MessageStoreModule::stage(Message* const msg)
+void MessageStoreModule::destroy(const PersistableExchange& exchange)
+{
+    store->destroy(exchange);
+}
+
+void MessageStoreModule::recover(RecoveryManager& registry)
+{
+    store->recover(registry);
+}
+
+void MessageStoreModule::stage(PersistableMessage& msg)
 {
     store->stage(msg);
 }
 
-void MessageStoreModule::destroy(Message* const msg)
+void MessageStoreModule::destroy(PersistableMessage& msg)
 {
     store->destroy(msg);
 }
 
-void MessageStoreModule::appendContent(Message* const msg, const std::string& data)
+void MessageStoreModule::appendContent(PersistableMessage& msg, const std::string& data)
 {
     store->appendContent(msg, data);
 }
 
-void MessageStoreModule::loadContent(Message* const msg, string& data, uint64_t offset, uint32_t length)
+void MessageStoreModule::loadContent(PersistableMessage& msg, string& data, uint64_t offset, uint32_t length)
 {
     store->loadContent(msg, data, offset, length);
 }
 
-void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
+void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
 {
-    store->enqueue(ctxt, msg, queue, xid);
+    store->enqueue(ctxt, msg, queue);
 }
 
-void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
+void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
 {
-    store->dequeue(ctxt, msg, queue, xid);
+    store->dequeue(ctxt, msg, queue);
 }
 
-void MessageStoreModule::prepared(const string * const xid)
-{
-    store->prepared(xid);
-}
-
-void MessageStoreModule::committed(const string * const xid)
+std::auto_ptr<TransactionContext> MessageStoreModule::begin()
 {
-    store->committed(xid);
+    return store->begin();
 }
 
-void MessageStoreModule::aborted(const string * const xid)
+std::auto_ptr<TPCTransactionContext> MessageStoreModule::begin(const std::string& xid)
 {
-    store->aborted(xid);
+    return store->begin(xid);
 }
 
-std::auto_ptr<TransactionContext> MessageStoreModule::begin()
+void MessageStoreModule::prepare(TPCTransactionContext& txn)
 {
-    return store->begin();
+    store->prepare(txn);
 }
 
-void MessageStoreModule::commit(TransactionContext* ctxt)
+void MessageStoreModule::commit(TransactionContext& ctxt)
 {
     store->commit(ctxt);
 }
 
-void MessageStoreModule::abort(TransactionContext* ctxt)
+void MessageStoreModule::abort(TransactionContext& ctxt)
 {
     store->abort(ctxt);
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h Fri Mar 30 08:50:07 2007
@@ -28,32 +28,39 @@
 #include <sys/Module.h>
 
 namespace qpid {
-    namespace broker {
-        /**
-         * A null implementation of the MessageStore interface
-         */
-        class MessageStoreModule : public MessageStore{
-            qpid::sys::Module<MessageStore> store;
-        public:
-            MessageStoreModule(const std::string& name);
-            void create(const Queue& queue, const qpid::framing::FieldTable& settings);
-            void destroy(const Queue& queue);
-            void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
-            void stage(Message* const msg);
-            void destroy(Message* const msg);
-            void appendContent(Message* const msg, const std::string& data);
-            void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length);
-            void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
-            void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
-            void prepared(const std::string * const xid);
-            void committed(const std::string * const xid);
-            void aborted(const std::string * const xid);
-            std::auto_ptr<TransactionContext> begin();
-            void commit(TransactionContext* ctxt);
-            void abort(TransactionContext* ctxt);
-            ~MessageStoreModule(){}
-        };
-    }
+namespace broker {
+
+/**
+ * A null implementation of the MessageStore interface
+ */
+class MessageStoreModule : public MessageStore
+{
+    qpid::sys::Module<MessageStore> store;
+public:
+    MessageStoreModule(const std::string& name);
+
+    std::auto_ptr<TransactionContext> begin();
+    std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
+    void prepare(TPCTransactionContext& txn);
+    void commit(TransactionContext& txn);
+    void abort(TransactionContext& txn);
+
+    void create(const PersistableQueue& queue);
+    void destroy(const PersistableQueue& queue);
+    void create(const PersistableExchange& exchange);
+    void destroy(const PersistableExchange& exchange);
+    void recover(RecoveryManager& queues);
+    void stage(PersistableMessage& msg);
+    void destroy(PersistableMessage& msg);
+    void appendContent(PersistableMessage& msg, const std::string& data);
+    void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
+    void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+    void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+
+    ~MessageStoreModule(){}
+};
+
+}
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp Fri Mar 30 08:50:07 2007
@@ -21,7 +21,6 @@
 
 #include <NullMessageStore.h>
 
-#include <BrokerQueue.h>
 #include <RecoveryManager.h>
 
 #include <iostream>
@@ -30,75 +29,77 @@
 
 NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
 
-void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&)
+void NullMessageStore::create(const PersistableQueue& queue)
 {
     if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::destroy(const Queue& queue)
+void NullMessageStore::destroy(const PersistableQueue& queue)
 {
     if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const)
+void NullMessageStore::create(const PersistableExchange&)
+{
+}
+
+void NullMessageStore::destroy(const PersistableExchange&)
+{
+}
+
+void NullMessageStore::recover(RecoveryManager&)
 {
     if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
 }
 
-void NullMessageStore::stage(Message* const)
+void NullMessageStore::stage(PersistableMessage&)
 {
     if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::destroy(Message* const)
+void NullMessageStore::destroy(PersistableMessage&)
 {
     if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::appendContent(Message* const, const string&)
+void NullMessageStore::appendContent(PersistableMessage&, const string&)
 {
     if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::loadContent(Message* const, string&, uint64_t, uint32_t)
+void NullMessageStore::loadContent(PersistableMessage&, string&, uint64_t, uint32_t)
 {
     if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::enqueue(TransactionContext*, Message* const, const Queue& queue, const string * const)
+void NullMessageStore::enqueue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue)
 {
     if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::dequeue(TransactionContext*, Message* const, const Queue& queue, const string * const)
+void NullMessageStore::dequeue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue)
 {
     if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::prepared(const string * const)
-{
-    if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
-}
-
-void NullMessageStore::committed(const string * const)
+std::auto_ptr<TransactionContext> NullMessageStore::begin()
 {
-    if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+    return std::auto_ptr<TransactionContext>();
 }
 
-void NullMessageStore::aborted(const string * const)
+std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string&)
 {
-    if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+    return std::auto_ptr<TPCTransactionContext>();
 }
 
-std::auto_ptr<TransactionContext> NullMessageStore::begin()
+void NullMessageStore::prepare(TPCTransactionContext&)
 {
-    return std::auto_ptr<TransactionContext>();
 }
 
-void NullMessageStore::commit(TransactionContext*)
+void NullMessageStore::commit(TransactionContext&)
 {
 }
 
-void NullMessageStore::abort(TransactionContext*)
+void NullMessageStore::abort(TransactionContext&)
 {
 }

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h Fri Mar 30 08:50:07 2007
@@ -26,33 +26,38 @@
 #include <BrokerQueue.h>
 
 namespace qpid {
-    namespace broker {
+namespace broker {
 
-        /**
-         * A null implementation of the MessageStore interface
-         */
-        class NullMessageStore : public MessageStore{
-            const bool warn;
-        public:
-            NullMessageStore(bool warn = false);
-            virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
-            virtual void destroy(const Queue& queue);
-            virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
-            virtual void stage(Message* const msg);
-            virtual void destroy(Message* const msg);
-            virtual void appendContent(Message* const msg, const std::string& data);
-            virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length);
-            virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
-            virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
-            virtual void prepared(const std::string * const xid);
-            virtual void committed(const std::string * const xid);
-            virtual void aborted(const std::string * const xid);
-            virtual std::auto_ptr<TransactionContext> begin();
-            virtual void commit(TransactionContext* ctxt);
-            virtual void abort(TransactionContext* ctxt);
-            ~NullMessageStore(){}
-        };
-    }
+/**
+ * A null implementation of the MessageStore interface
+ */
+class NullMessageStore : public MessageStore
+{
+    const bool warn;
+public:
+    NullMessageStore(bool warn = false);
+
+    virtual std::auto_ptr<TransactionContext> begin();
+    virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
+    virtual void prepare(TPCTransactionContext& txn);
+    virtual void commit(TransactionContext& txn);
+    virtual void abort(TransactionContext& txn);
+
+    virtual void create(const PersistableQueue& queue);
+    virtual void destroy(const PersistableQueue& queue);
+    virtual void create(const PersistableExchange& exchange);
+    virtual void destroy(const PersistableExchange& exchange);
+    virtual void recover(RecoveryManager& queues);
+    virtual void stage(PersistableMessage& msg);
+    virtual void destroy(PersistableMessage& msg);
+    virtual void appendContent(PersistableMessage& msg, const std::string& data);
+    virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
+    virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+    virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
+    ~NullMessageStore(){}
+};
+
+}
 }
 
 

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,62 @@
+#ifndef _broker_Persistable_h
+#define _broker_Persistable_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "framing/amqp_types.h"
+#include "framing/Buffer.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for all persistable objects
+ */
+class Persistable 
+{
+public:
+    /**
+     * Allows the store to attach its own identifier to this object
+     */
+    virtual void setPersistenceId(uint64_t id) = 0;
+    /**
+     * Returns any identifier the store may have attached to this
+     * object
+     */
+    virtual uint64_t getPersistenceId() const = 0;
+    /**
+     * Encodes the persistable state of this object into the supplied
+     * buffer
+     */
+    virtual void encode(framing::Buffer& buffer) const = 0;
+    /**
+     * @returns the size of the buffer needed to encode this object
+     */
+    virtual uint32_t encodedSize() const = 0;
+
+    virtual ~Persistable() {};
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/Persistable.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,44 @@
+#ifndef _broker_PersistableExchange_h
+#define _broker_PersistableExchange_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "Persistable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface exchanges must expose to the MessageStore in order to be
+ * persistable.
+ */
+class PersistableExchange : public Persistable
+{
+public:
+    virtual ~PersistableExchange() {};
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,53 @@
+#ifndef _broker_PersistableMessage_h
+#define _broker_PersistableMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "Persistable.h"
+#include "framing/amqp_types.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface messages must expose to the MessageStore in order to
+ * be persistable.
+ */
+    class PersistableMessage : public Persistable
+{
+public:
+    typedef boost::shared_ptr<PersistableMessage> shared_ptr;
+
+    /**
+     * @returns the size of the headers when encoded
+     */
+    virtual uint32_t encodedHeaderSize() const = 0;
+
+    virtual ~PersistableMessage() {};
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableMessage.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,45 @@
+#ifndef _broker_PersistableQueue_h
+#define _broker_PersistableQueue_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "Persistable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface queues must expose to the MessageStore in order to be
+ * persistable.
+ */
+class PersistableQueue : public Persistable
+{
+public:
+    virtual const std::string& getName() const = 0;
+    virtual ~PersistableQueue() {};
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/PersistableQueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,57 @@
+#ifndef _broker_RecoverableMessage_h
+#define _broker_RecoverableMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include "framing/amqp_types.h"
+#include "framing/Buffer.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which messages are reloaded on recovery.
+ */
+class RecoverableMessage
+{
+public:
+    typedef boost::shared_ptr<RecoverableMessage> shared_ptr;
+    /**
+     * Used by store to determine whether to load content on recovery
+     * or let message load its own content as and when it requires it.
+     * 
+     * @returns true if the content of the message should be loaded
+     */
+    virtual bool loadContent(uint64_t available) = 0;
+    /**
+     * Loads the content held in the supplied buffer (may do checking
+     * of length as necessary)
+     */
+    virtual void decodeContent(framing::Buffer& buffer) = 0;
+    virtual ~RecoverableMessage() {};
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableMessage.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,49 @@
+#ifndef _broker_RecoverableQueue_h
+#define _broker_RecoverableQueue_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "RecoverableMessage.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * The interface through which messages are added back to queues on
+ * recovery.
+ */
+class RecoverableQueue
+{
+public:
+    typedef boost::shared_ptr<RecoverableQueue> shared_ptr;
+    /**
+     * Used during recovery to add stored messages back to the queue
+     */
+    virtual void recover(RecoverableMessage::shared_ptr msg) = 0;
+    virtual ~RecoverableQueue() {};
+};
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoverableQueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManager.h Fri Mar 30 08:50:07 2007
@@ -21,20 +21,20 @@
 #ifndef _RecoveryManager_
 #define _RecoveryManager_
 
-#include <ExchangeRegistry.h>
-#include <QueueRegistry.h>
+#include "RecoverableQueue.h"
+#include "RecoverableMessage.h"
+#include "framing/Buffer.h"
 
 namespace qpid {
 namespace broker {
 
     class RecoveryManager{
-        QueueRegistry& queues;
-        ExchangeRegistry& exchanges;
     public:
-        RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges);
-        ~RecoveryManager();
-        Queue::shared_ptr recoverQueue(const std::string& name);
-        Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type);
+        virtual ~RecoveryManager(){}
+        virtual void recoverExchange(framing::Buffer& buffer) = 0;
+        virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0;
+        virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
+        virtual void recoveryComplete() = 0;
     };
 
     

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp Fri Mar 30 08:50:07 2007
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <RecoveryManagerImpl.h>
+
+#include "BrokerMessage.h"
+#include "BrokerMessageMessage.h"
+#include "BrokerQueue.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using boost::dynamic_pointer_cast;
+
+
+static const uint8_t BASIC = 1;
+static const uint8_t MESSAGE = 2;
+
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, uint64_t _stagingThreshold) 
+    : queues(_queues), exchanges(_exchanges), stagingThreshold(_stagingThreshold) {}
+
+RecoveryManagerImpl::~RecoveryManagerImpl() {}
+
+class RecoverableMessageImpl : public RecoverableMessage
+{
+    Message::shared_ptr msg;
+    const uint64_t stagingThreshold;
+public:
+    RecoverableMessageImpl(Message::shared_ptr& _msg, uint64_t _stagingThreshold) 
+        : msg(_msg), stagingThreshold(_stagingThreshold) {}
+    ~RecoverableMessageImpl() {};
+    bool loadContent(uint64_t available);
+    void decodeContent(framing::Buffer& buffer);
+    void recover(Queue::shared_ptr queue);
+};
+
+class RecoverableQueueImpl : public RecoverableQueue
+{
+    Queue::shared_ptr queue;
+public:
+    RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {}
+    ~RecoverableQueueImpl() {};
+    void recover(RecoverableMessage::shared_ptr msg);
+};
+
+void RecoveryManagerImpl::recoverExchange(framing::Buffer&)
+{
+    //TODO
+}
+
+RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
+{
+    Queue::shared_ptr queue = Queue::decode(queues, buffer);
+    try {
+        Exchange::shared_ptr exchange = exchanges.getDefault();
+        if (exchange) {
+            exchange->bind(queue, queue->getName(), 0);
+        }
+    } catch (ChannelException& e) {
+        //assume no default exchange has been declared
+    }
+    return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue));
+}
+
+RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
+{
+    buffer.record();
+    //peek at type:
+    Message::shared_ptr message(decodeMessageType(buffer) == MESSAGE ?  
+                                ((Message*) new MessageMessage()) : 
+                                ((Message*) new BasicMessage()));
+    buffer.restore();
+    message->decodeHeader(buffer);
+    return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));    
+}
+
+void RecoveryManagerImpl::recoveryComplete()
+{
+    //TODO (finalise binding setup etc)
+}
+
+uint8_t RecoveryManagerImpl::decodeMessageType(framing::Buffer& buffer)
+{
+    return buffer.getOctet();
+}
+
+void RecoveryManagerImpl::encodeMessageType(const Message& msg, framing::Buffer& buffer)
+{
+    buffer.putOctet(dynamic_cast<const MessageMessage*>(&msg) ? MESSAGE : BASIC);
+}
+
+uint32_t RecoveryManagerImpl::encodedMessageTypeSize()
+{
+    return 1;
+}
+
+bool RecoverableMessageImpl::loadContent(uint64_t available)
+{
+    return !stagingThreshold || available < stagingThreshold;
+}
+
+void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
+{
+    msg->decodeContent(buffer);
+}
+
+void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
+{
+    queue->recover(msg);
+}
+
+void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
+{
+    dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h?view=auto&rev=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h Fri Mar 30 08:50:07 2007
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveryManagerImpl_
+#define _RecoveryManagerImpl_
+
+#include <list>
+#include "ExchangeRegistry.h"
+#include "QueueRegistry.h"
+#include "RecoveryManager.h"
+
+namespace qpid {
+namespace broker {
+
+    class RecoveryManagerImpl : public RecoveryManager{
+        QueueRegistry& queues;
+        ExchangeRegistry& exchanges;
+        const uint64_t stagingThreshold;
+    public:
+        RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold);
+        ~RecoveryManagerImpl();
+
+        void recoverExchange(framing::Buffer& buffer);
+        RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
+        RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
+        void recoveryComplete();
+
+        static uint8_t decodeMessageType(framing::Buffer& buffer);
+        static void encodeMessageType(const Message& msg, framing::Buffer& buffer);
+        static uint32_t encodedMessageTypeSize();
+    };
+
+    
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/broker/RecoveryManagerImpl.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TransactionalStore.h Fri Mar 30 08:50:07 2007
@@ -22,25 +22,35 @@
 #define _TransactionalStore_
 
 #include <memory>
+#include <string>
 
 namespace qpid {
-    namespace broker {
-        struct InvalidTransactionContextException : public std::exception {};
+namespace broker {
 
-        class TransactionContext{
-        public:
-            virtual ~TransactionContext(){}
-        };
-
-        class TransactionalStore{
-        public:
-            virtual std::auto_ptr<TransactionContext> begin() = 0;
-            virtual void commit(TransactionContext*) = 0;
-            virtual void abort(TransactionContext*) = 0;
-
-            virtual ~TransactionalStore(){}
-        };
-    }
+struct InvalidTransactionContextException : public std::exception {};
+
+class TransactionContext {
+public:
+    virtual ~TransactionContext(){}
+};
+
+class TPCTransactionContext : public TransactionContext {
+public:
+    virtual ~TPCTransactionContext(){}
+};
+
+class TransactionalStore {
+public:
+    virtual std::auto_ptr<TransactionContext> begin() = 0;
+    virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) = 0;
+    virtual void prepare(TPCTransactionContext& txn) = 0;
+    virtual void commit(TransactionContext& txn) = 0;
+    virtual void abort(TransactionContext& txn) = 0;
+    
+    virtual ~TransactionalStore(){}
+};
+
+}
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.cpp Fri Mar 30 08:50:07 2007
@@ -25,8 +25,8 @@
 using std::mem_fun_ref;
 using namespace qpid::broker;
 
-TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked, const std::string* const _xid) : 
-    acked(_acked), unacked(_unacked), xid(_xid){
+TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : 
+    acked(_acked), unacked(_unacked){
 
 }
 
@@ -35,7 +35,7 @@
         //dequeue all acked messages from their queues
         for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
             if (i->coveredBy(&acked)) {
-                i->discard(ctxt, xid);
+                i->discard(ctxt);
             }
         }
         return true;

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxAck.h Fri Mar 30 08:50:07 2007
@@ -37,7 +37,6 @@
         class TxAck : public TxOp{
             AccumulatedAck& acked;
             std::list<DeliveryRecord>& unacked;
-            const std::string* const xid;
 
         public:
             /**
@@ -45,7 +44,7 @@
              * acks received
              * @param unacked the record of delivered messages
              */
-            TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked, const std::string* const xid = 0);
+            TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxBuffer.cpp Fri Mar 30 08:50:07 2007
@@ -29,11 +29,11 @@
     if(store) ctxt = store->begin();
     for(op_iterator i = ops.begin(); i < ops.end(); i++){
         if(!(*i)->prepare(ctxt.get())){
-            if(store) store->abort(ctxt.get());
+            if(store) store->abort(*ctxt);
             return false;
         }
     }
-    if(store) store->commit(ctxt.get());
+    if(store) store->commit(*ctxt);
     return true;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.cpp Fri Mar 30 08:50:07 2007
@@ -22,11 +22,11 @@
 
 using namespace qpid::broker;
 
-TxPublish::TxPublish(Message::shared_ptr _msg, const std::string* const _xid) : msg(_msg), xid(_xid) {}
+TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
 
 bool TxPublish::prepare(TransactionContext* ctxt) throw(){
     try{
-        for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, xid));
+        for_each(queues.begin(), queues.end(), Prepare(ctxt, msg));
         return true;
     }catch(...){
         std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
@@ -45,11 +45,11 @@
     queues.push_back(queue);
 }
 
-TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid) 
-    : ctxt(_ctxt), msg(_msg), xid(_xid){}
+TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg) 
+    : ctxt(_ctxt), msg(_msg){}
 
 void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
-    queue->enqueue(ctxt, msg, xid);
+    queue->enqueue(ctxt, msg);
 }
 
 TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}

Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/TxPublish.h Fri Mar 30 08:50:07 2007
@@ -46,9 +46,8 @@
             class Prepare{
                 TransactionContext* ctxt;
                 Message::shared_ptr& msg;
-                const std::string* const xid;
             public:
-                Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const std::string* const xid);
+                Prepare(TransactionContext* ctxt, Message::shared_ptr& msg);
                 void operator()(Queue::shared_ptr& queue);            
             };
 
@@ -60,11 +59,10 @@
             };
 
             Message::shared_ptr msg;
-            const std::string* const xid;
             std::list<Queue::shared_ptr> queues;
 
         public:
-            TxPublish(Message::shared_ptr msg, const std::string* const xid = 0);
+            TxPublish(Message::shared_ptr msg);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();

Modified: incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/BrokerChannelTest.cpp Fri Mar 30 08:50:07 2007
@@ -68,7 +68,7 @@
         struct MethodCall
         {
             const string name;
-            Message* const msg;
+            PersistableMessage* msg;
             const string data;//only needed for appendContent
 
             void check(const MethodCall& other) const
@@ -92,7 +92,7 @@
             }
         }
 
-        void handle(const string& name, Message* msg, const string& data)
+        void handle(const string& name, PersistableMessage* msg, const string& data)
         {
             MethodCall call = {name, msg, data};
             handle(call);
@@ -102,25 +102,25 @@
 
         MockMessageStore() : expectMode(false) {}
 
-        void stage(Message* const msg)
+        void stage(PersistableMessage& msg)
         {
-            if(!expectMode) msg->setPersistenceId(1);
-            MethodCall call = {"stage", msg, ""};
+            if(!expectMode) msg.setPersistenceId(1);
+            MethodCall call = {"stage", &msg, ""};
             handle(call);
         }
 
-        void appendContent(Message* msg, const string& data)
+        void appendContent(PersistableMessage& msg, const string& data)
         {
-            MethodCall call = {"appendContent", msg, data};
+            MethodCall call = {"appendContent", &msg, data};
             handle(call);
         }
 
         // Don't hide overloads.
         using NullMessageStore::destroy;
         
-        void destroy(Message* msg)
+        void destroy(PersistableMessage& msg)
         {
-            MethodCall call = {"destroy", msg, ""};
+            MethodCall call = {"destroy", &msg, ""};
             handle(call);
         }
         
@@ -249,11 +249,11 @@
             MockChannel::basicGetBody());
 
         store.expect();
-        store.stage(msg);
+        store.stage(*msg);
         for (int i = 0; i < 3; i++) {
-            store.appendContent(msg, data[i]);
+            store.appendContent(*msg, data[i]);
         }
-        store.destroy(msg);
+        store.destroy(*msg);
         store.test();
 
         Exchange::shared_ptr exchange  =
@@ -304,8 +304,8 @@
         policy.update(settings);
         
         store.expect();
-        store.stage(msg3.get());
-        store.destroy(msg3.get());
+        store.stage(*msg3);
+        store.destroy(*msg3);
         store.test();
 
         Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0));

Modified: incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/LazyLoadedContentTest.cpp Fri Mar 30 08:50:07 2007
@@ -50,7 +50,7 @@
     public:
         TestMessageStore(const string& _content) : content(_content) {}
 
-        void loadContent(Message* const, string& data, uint64_t offset, uint32_t length)
+        void loadContent(PersistableMessage&, string& data, uint64_t offset, uint32_t length)
         {
             if (offset + length <= content.size()) {
                 data = content.substr(offset, length);

Modified: incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp Fri Mar 30 08:50:07 2007
@@ -51,33 +51,32 @@
         
       public:
 
-        void stage(Message* const msg)
+        void stage(PersistableMessage& msg)
         {
-            if (msg->getPersistenceId() == 0) {
-                header = new Buffer(msg->encodedHeaderSize());
-                msg->encodeHeader(*header);                
+            if (msg.getPersistenceId() == 0) {
+                header = new Buffer(msg.encodedSize());
+                msg.encode(*header);                
                 content = new Buffer(contentBufferSize);
-                msg->setPersistenceId(1);
+                msg.setPersistenceId(1);
             } else {
                 throw qpid::Exception("Message already staged!");
             }
         }
 
-        void appendContent(Message* msg, const string& data)
+        void appendContent(PersistableMessage& msg, const string& data)
         {
-            if (msg) {
+            if (msg.getPersistenceId() == 1) {
                 content->putRawData(data);
             } else {
                 throw qpid::Exception("Invalid message id!");
             }
         }
 
-        // Don't hide overloads.
         using NullMessageStore::destroy;
 
-        void destroy(BasicMessage* msg)
+        void destroy(PersistableMessage& msg)
         {
-            CPPUNIT_ASSERT(msg->getPersistenceId());
+            CPPUNIT_ASSERT(msg.getPersistenceId());
         }
 
         BasicMessage::shared_ptr getRestoredMessage()

Modified: incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp?view=diff&rev=524139&r1=524138&r2=524139
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp Fri Mar 30 08:50:07 2007
@@ -38,11 +38,11 @@
     class TestMessageStore : public NullMessageStore
     {
     public:
-        vector< std::pair<Message*, const string*> > dequeued;
+        vector<PersistableMessage*> dequeued;
 
-        void dequeue(TransactionContext*, Message* const msg, const Queue& /*queue*/, const string * const xid)
+        void dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& /*queue*/)
         {
-            dequeued.push_back(std::pair<Message*, const string*>(msg, xid));
+            dequeued.push_back(&msg);
         }
 
         TestMessageStore() : NullMessageStore() {}
@@ -50,7 +50,6 @@
     };
 
     CPPUNIT_TEST_SUITE(TxAckTest);
-    CPPUNIT_TEST(testPrepare2pc);
     CPPUNIT_TEST(testPrepare);
     CPPUNIT_TEST(testCommit);
     CPPUNIT_TEST_SUITE_END();
@@ -62,12 +61,11 @@
     vector<Message::shared_ptr> messages;
     list<DeliveryRecord> deliveries;
     TxAck op;
-    std::string xid;
 
 
 public:
 
-    TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid)
+    TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
     {
         for(int i = 0; i < 10; i++){
             Message::shared_ptr msg(
@@ -93,17 +91,7 @@
         CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
         int dequeued[] = {0, 1, 2, 3, 4, 6, 8};
         for (int i = 0; i < 7; i++) {
-            CPPUNIT_ASSERT_EQUAL(messages[dequeued[i]].get(), store.dequeued[i].first);
-        }
-    }
-
-    void testPrepare2pc()
-    {
-        xid = "abcdefg";
-        testPrepare();
-        const string expected(xid);
-        for (int i = 0; i < 7; i++) {
-            CPPUNIT_ASSERT_EQUAL(expected, *store.dequeued[i].second);
+            CPPUNIT_ASSERT_EQUAL((PersistableMessage*) messages[dequeued[i]].get(), store.dequeued[i]);
         }
     }