You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/12/14 17:25:49 UTC
svn commit: r604215 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker:
Message.cpp Message.h MessageStore.h MessageStoreModule.cpp
MessageStoreModule.h NullMessageStore.cpp NullMessageStore.h
Author: gsim
Date: Fri Dec 14 08:25:47 2007
New Revision: 604215
URL: http://svn.apache.org/viewvc?rev=604215&view=rev
Log:
Some fixes for 'flow to disk' (i.e. dropping message content from memory, and loading it from disk for delivery)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Dec 14 08:25:47 2007
@@ -28,6 +28,7 @@
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
+#include "qpid/log/Statement.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -35,7 +36,18 @@
TransferAdapter Message::TRANSFER;
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
+
+Message::~Message()
+{
+ if (staged) {
+ if (store) {
+ store->destroy(*this);
+ } else {
+ QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
+ }
+ }
+}
std::string Message::getRoutingKey() const
{
@@ -152,6 +164,7 @@
if (!getPersistenceId()) {
intrusive_ptr<PersistableMessage> pmsg(this);
store->stage(pmsg);
+ staged = true;
}
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Dec 14 08:25:47 2007
@@ -48,7 +48,8 @@
typedef boost::intrusive_ptr<Message> shared_ptr;
Message(const framing::SequenceNumber& id = framing::SequenceNumber());
-
+ ~Message();
+
uint64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
@@ -127,6 +128,7 @@
mutable uint64_t persistenceId;
bool redelivered;
bool loaded;
+ bool staged;
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Fri Dec 14 08:25:47 2007
@@ -104,7 +104,7 @@
* enqueued, deletion will be automatic when the message
* is dequeued from all queues it was enqueued onto).
*/
- virtual void destroy(intrusive_ptr<PersistableMessage>& msg) = 0;
+ virtual void destroy(PersistableMessage& msg) = 0;
/**
* Appends content to a previously staged message
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Fri Dec 14 08:25:47 2007
@@ -78,7 +78,7 @@
TRANSFER_EXCEPTION(store->stage(msg));
}
-void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg)
+void MessageStoreModule::destroy(PersistableMessage& msg)
{
TRANSFER_EXCEPTION(store->destroy(msg));
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Fri Dec 14 08:25:47 2007
@@ -56,7 +56,7 @@
const std::string& key, const framing::FieldTable& args);
void recover(RecoveryManager& queues);
void stage(intrusive_ptr<PersistableMessage>& msg);
- void destroy(intrusive_ptr<PersistableMessage>& msg);
+ void destroy(PersistableMessage& msg);
void appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data);
void loadContent(const qpid::broker::PersistableQueue& queue,
intrusive_ptr<const PersistableMessage>& msg, std::string& data,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Fri Dec 14 08:25:47 2007
@@ -84,7 +84,7 @@
QPID_LOG(info, "Can't stage message. Persistence not enabled.");
}
-void NullMessageStore::destroy(intrusive_ptr<PersistableMessage>&)
+void NullMessageStore::destroy(PersistableMessage&)
{
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=604215&r1=604214&r2=604215&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Fri Dec 14 08:25:47 2007
@@ -57,7 +57,7 @@
const std::string& key, const framing::FieldTable& args);
virtual void recover(RecoveryManager& queues);
virtual void stage(intrusive_ptr<PersistableMessage>& msg);
- virtual void destroy(intrusive_ptr<PersistableMessage>& msg);
+ virtual void destroy(PersistableMessage& msg);
virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg,
const std::string& data);
virtual void loadContent(const qpid::broker::PersistableQueue& queue,