You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2009/07/08 18:10:29 UTC

svn commit: r792208 - in /qpid/trunk/qpid/cpp/src/qpid/broker: PersistableMessage.cpp PersistableMessage.h Queue.cpp Queue.h

Author: cctrieloff
Date: Wed Jul  8 16:10:29 2009
New Revision: 792208

URL: http://svn.apache.org/viewvc?rev=792208&view=rev
Log:
More tests and complete fix for svn791672 commit -- correct requeue

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=792208&r1=792207&r2=792208&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Wed Jul  8 16:10:29 2009
@@ -90,6 +90,16 @@
     }
 }
 
+bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){
+    if (store && (queue->getPersistenceId()!=0)) {
+        for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
+            PersistableQueue::shared_ptr q(i->lock());
+            if (q && q->getPersistenceId() == queue->getPersistenceId())  return true;
+        } 
+    }            
+    return false;
+}
+
 void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { 
     if (_store){
         sys::ScopedLock<sys::Mutex> l(storeLock);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=792208&r1=792207&r2=792208&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Wed Jul  8 16:10:29 2009
@@ -111,6 +111,8 @@
                                          MessageStore* _store);
 
     QPID_BROKER_EXTERN void dequeueAsync();
+    
+    bool isStoredOnQueue(PersistableQueue::shared_ptr queue);
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=792208&r1=792207&r2=792208&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jul  8 16:10:29 2009
@@ -99,8 +99,7 @@
     eventMode(0),
     eventMgr(0),
     insertSeqNo(0),
-    broker(b),
-    lastForcedPosition(0)
+    broker(b)
 {
     if (parent != 0 && broker != 0)
     {
@@ -211,6 +210,14 @@
         msg.payload->enqueueComplete(); // mark the message as enqueued
         messages.push_front(msg);
         listeners.populate(copy);
+
+        // for persistLastNode - don't force a message twice to disk, but force it if no force before 
+        if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
+            msg.payload->forcePersistent();
+            if (msg.payload->isForcedPersistent() ){
+            	enqueue(0, msg.payload);
+            }
+        }
     }
     copy.notify();
 }
@@ -660,7 +667,6 @@
 void Queue::clearLastNodeFailure()
 {
     inLastNodeFailure = false;
-    lastForcedPosition = sequence;
 }
 
 void Queue::setLastNodeFailure()
@@ -669,19 +675,19 @@
         Mutex::ScopedLock locker(messageLock);
     	for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
             // don't force a message twice to disk.
-            if(i->position > lastForcedPosition) {
+            if(!i->payload->isStoredOnQueue(shared_from_this())) {
                 if (lastValueQueue) checkLvqReplace(*i);
                 i->payload->forcePersistent();
                 if (i->payload->isForcedPersistent() ){
             	    enqueue(0, i->payload);
                 }
-                lastForcedPosition = i->position;
             }
     	}
         inLastNodeFailure = true;
     }
 }
 
+
 // return true if store exists, 
 bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=792208&r1=792207&r2=792208&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Jul  8 16:10:29 2009
@@ -106,7 +106,6 @@
             bool insertSeqNo;
             std::string seqNoKey;
             Broker* broker;
-            framing::SequenceNumber lastForcedPosition;
 
             void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org