You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/12/14 17:25:49 UTC

svn commit: r604215 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: Message.cpp Message.h MessageStore.h MessageStoreModule.cpp MessageStoreModule.h NullMessageStore.cpp NullMessageStore.h

Author: gsim
Date: Fri Dec 14 08:25:47 2007
New Revision: 604215

URL: http://svn.apache.org/viewvc?rev=604215&view=rev
Log:
Some fixes for 'flow to disk' (i.e. dropping message content from memory, and loading it from disk for delivery)


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Dec 14 08:25:47 2007
@@ -28,6 +28,7 @@
 #include "qpid/framing/SendContent.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/framing/TypeFilter.h"
+#include "qpid/log/Statement.h"
 
 using namespace qpid::broker;
 using namespace qpid::framing;
@@ -35,7 +36,18 @@
 
 TransferAdapter Message::TRANSFER;
 
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
+
+Message::~Message()
+{
+    if (staged) {
+        if (store) {
+            store->destroy(*this);
+        } else {
+            QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
+        }
+    }
+}
 
 std::string Message::getRoutingKey() const
 {
@@ -152,6 +164,7 @@
         if (!getPersistenceId()) {
             intrusive_ptr<PersistableMessage> pmsg(this);
             store->stage(pmsg);
+            staged = true;
         }
         //remove any content frames from the frameset
         frames.remove(TypeFilter<CONTENT_BODY>());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Dec 14 08:25:47 2007
@@ -48,7 +48,8 @@
     typedef boost::intrusive_ptr<Message> shared_ptr;
 
     Message(const framing::SequenceNumber& id = framing::SequenceNumber());
-            
+    ~Message();
+        
     uint64_t getPersistenceId() const { return persistenceId; }
     void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
 
@@ -127,6 +128,7 @@
     mutable uint64_t persistenceId;
     bool redelivered;
     bool loaded;
+    bool staged;
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Fri Dec 14 08:25:47 2007
@@ -104,7 +104,7 @@
      * enqueued, deletion will be automatic when the message
      * is dequeued from all queues it was enqueued onto).
      */
-    virtual void destroy(intrusive_ptr<PersistableMessage>& msg) = 0;
+    virtual void destroy(PersistableMessage& msg) = 0;
 
     /**
      * Appends content to a previously staged message

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Fri Dec 14 08:25:47 2007
@@ -78,7 +78,7 @@
     TRANSFER_EXCEPTION(store->stage(msg));
 }
 
-void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg)
+void MessageStoreModule::destroy(PersistableMessage& msg)
 {
     TRANSFER_EXCEPTION(store->destroy(msg));
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Fri Dec 14 08:25:47 2007
@@ -56,7 +56,7 @@
                 const std::string& key, const framing::FieldTable& args);
     void recover(RecoveryManager& queues);
     void stage(intrusive_ptr<PersistableMessage>& msg);
-    void destroy(intrusive_ptr<PersistableMessage>& msg);
+    void destroy(PersistableMessage& msg);
     void appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data);
     void loadContent(const qpid::broker::PersistableQueue& queue, 
 	          intrusive_ptr<const PersistableMessage>& msg, std::string& data,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Fri Dec 14 08:25:47 2007
@@ -84,7 +84,7 @@
     QPID_LOG(info, "Can't stage message. Persistence not enabled.");
 }
 
-void NullMessageStore::destroy(intrusive_ptr<PersistableMessage>&)
+void NullMessageStore::destroy(PersistableMessage&)
 {
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Fri Dec 14 08:25:47 2007
@@ -57,7 +57,7 @@
                         const std::string& key, const framing::FieldTable& args);
     virtual void recover(RecoveryManager& queues);
     virtual void stage(intrusive_ptr<PersistableMessage>& msg);
-    virtual void destroy(intrusive_ptr<PersistableMessage>& msg);
+    virtual void destroy(PersistableMessage& msg);
     virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg,
                                const std::string& data);
     virtual void loadContent(const qpid::broker::PersistableQueue& queue,