You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2009/07/08 18:10:29 UTC
svn commit: r792208 - in /qpid/trunk/qpid/cpp/src/qpid/broker:
PersistableMessage.cpp PersistableMessage.h Queue.cpp Queue.h
Author: cctrieloff
Date: Wed Jul 8 16:10:29 2009
New Revision: 792208
URL: http://svn.apache.org/viewvc?rev=792208&view=rev
Log:
More tests and complete fix for svn791672 commit -- correct requeue
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=792208&r1=792207&r2=792208&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Wed Jul 8 16:10:29 2009
@@ -90,6 +90,16 @@
}
}
+bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
+ if (store && (queue->getPersistenceId()!=0)) {
+ for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
+ PersistableQueue::shared_ptr q(i->lock());
+ if (q && q->getPersistenceId() == queue->getPersistenceId()) return true;
+ }
+ }
+ return false;
+}
+
void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=792208&r1=792207&r2=792208&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Wed Jul 8 16:10:29 2009
@@ -111,6 +111,8 @@
MessageStore* _store);
QPID_BROKER_EXTERN void dequeueAsync();
+
+ bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=792208&r1=792207&r2=792208&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jul 8 16:10:29 2009
@@ -99,8 +99,7 @@
eventMode(0),
eventMgr(0),
insertSeqNo(0),
- broker(b),
- lastForcedPosition(0)
+ broker(b)
{
if (parent != 0 && broker != 0)
{
@@ -211,6 +210,14 @@
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
+
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
+ msg.payload->forcePersistent();
+ if (msg.payload->isForcedPersistent() ){
+ enqueue(0, msg.payload);
+ }
+ }
}
copy.notify();
}
@@ -660,7 +667,6 @@
void Queue::clearLastNodeFailure()
{
inLastNodeFailure = false;
- lastForcedPosition = sequence;
}
void Queue::setLastNodeFailure()
@@ -669,19 +675,19 @@
Mutex::ScopedLock locker(messageLock);
for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
// don't force a message twice to disk.
- if(i->position > lastForcedPosition) {
+ if(!i->payload->isStoredOnQueue(shared_from_this())) {
if (lastValueQueue) checkLvqReplace(*i);
i->payload->forcePersistent();
if (i->payload->isForcedPersistent() ){
enqueue(0, i->payload);
}
- lastForcedPosition = i->position;
}
}
inLastNodeFailure = true;
}
}
+
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=792208&r1=792207&r2=792208&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Jul 8 16:10:29 2009
@@ -106,7 +106,6 @@
bool insertSeqNo;
std::string seqNoKey;
Broker* broker;
- framing::SequenceNumber lastForcedPosition;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org