You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2007/11/06 20:43:55 UTC
svn commit: r592530 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker:
Message.cpp Message.h MessageBuilder.cpp PersistableMessage.h
Author: cctrieloff
Date: Tue Nov 6 11:43:54 2007
New Revision: 592530
URL: http://svn.apache.org/viewvc?rev=592530&view=rev
Log:
- clean up between base & subclasses
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=592530&r1=592529&r2=592530&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Nov 6 11:43:54 2007
@@ -36,7 +36,7 @@
TransferAdapter Message::TRANSFER;
PublishAdapter Message::PUBLISH;
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), adapter(0) {}
std::string Message::getRoutingKey() const
{
@@ -131,12 +131,15 @@
void Message::releaseContent(MessageStore* _store)
{
- store = _store;
+ if (!store){
+ store = _store;
+ }
if (!getPersistenceId()) {
store->stage(*this);
}
//remove any content frames from the frameset
frames.remove(TypeFilter(CONTENT_BODY));
+ setContentReleased();
}
void Message::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=592530&r1=592529&r2=592530&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Nov 6 11:43:54 2007
@@ -125,14 +125,12 @@
mutable uint64_t persistenceId;
bool redelivered;
ConnectionToken* publisher;
- MessageStore* store;
mutable MessageAdapter* adapter;
static TransferAdapter TRANSFER;
static PublishAdapter PUBLISH;
MessageAdapter& getAdapter() const;
- bool isContentReleased() const { return store; }
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=592530&r1=592529&r2=592530&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Tue Nov 6 11:43:54 2007
@@ -54,7 +54,6 @@
message->getFrames().append(frame);
//have we reached the staging limit? if so stage message and release content
if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) {
- store->stage(*message);
message->releaseContent(store);
staging = true;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=592530&r1=592529&r2=592530&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Tue Nov 6 11:43:54 2007
@@ -66,6 +66,10 @@
typedef std::list<PersistableQueue*> syncList;
syncList synclist;
MessageStore* store;
+ bool contentReleased;
+
+ inline void setContentReleased() {contentReleased = true; }
+
public:
typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -79,11 +83,14 @@
PersistableMessage():
asyncEnqueueCounter(0),
asyncDequeueCounter(0),
- store(0)
+ store(0),
+ contentReleased(false)
{}
void flush();
+ inline bool isContentReleased()const {return contentReleased; }
+
inline void waitForEnqueueComplete() {
sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
while (asyncEnqueueCounter > 0) {