You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/02/26 18:18:47 UTC

svn commit: r748214 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Queue.cpp Queue.h RecoveryManagerImpl.cpp

Author: gsim
Date: Thu Feb 26 17:18:46 2009
New Revision: 748214

URL: http://svn.apache.org/viewvc?rev=748214&view=rev
Log:
QPID-1695: Make LVQ persist durable messages


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=748214&r1=748213&r2=748214&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 26 17:18:46 2009
@@ -176,7 +176,7 @@
 
 
 void Queue::recover(boost::intrusive_ptr<Message>& msg){
-    push(msg);
+    push(msg, true);
     msg->enqueueComplete(); // mark the message as enqueued
     mgntEnqStats(msg);
 
@@ -545,7 +545,7 @@
     ++dequeueTracker;
 }
 
-void Queue::push(boost::intrusive_ptr<Message>& msg){
+void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     QueueListeners::NotificationSet copy;
     {
         Mutex::ScopedLock locker(messageLock);   
@@ -566,7 +566,13 @@
                 boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
                 if (!old) old = i->second;
                 i->second->setReplacementMessage(msg,this);
-                dequeued(QueuedMessage(qm.queue, old, qm.position));
+                if (isRecovery) {
+                    //can't issue new requests for the store until
+                    //recovery is complete
+                    pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
+                } else {
+                    dequeue(0, QueuedMessage(qm.queue, old, qm.position));
+                }
             }		 
         }else {
             messages.push_back(qm);
@@ -664,7 +670,7 @@
         msg->addTraceId(traceId);
     }
 
-    if (msg->isPersistent() && store && !lastValueQueue) {
+    if (msg->isPersistent() && store) {
         msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
@@ -683,7 +689,7 @@
             dequeued(msg);
         }
     }
-    if (msg.payload->isPersistent() && store && !lastValueQueue) {
+    if (msg.payload->isPersistent() && store) {
         msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
         store->dequeue(ctxt, pmsg, *this);
@@ -976,3 +982,10 @@
 {
     eventMgr = &mgr;
 }
+
+void Queue::recoveryComplete()
+{
+    //process any pending dequeues
+    for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+    pendingDequeues.clear();
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=748214&r1=748213&r2=748214&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Feb 26 17:18:46 2009
@@ -85,6 +85,7 @@
             std::vector<std::string> traceExclude;
             QueueListeners listeners;
             Messages messages;
+            Messages pendingDequeues;//used to avoid dequeuing during recovery
             LVQ lvq;
             mutable qpid::sys::Mutex consumerLock;
             mutable qpid::sys::Mutex messageLock;
@@ -101,7 +102,7 @@
             int eventMode;
             QueueEvents* eventMgr;
 
-            void push(boost::intrusive_ptr<Message>& msg);
+            void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
             bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
             bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
@@ -290,6 +291,11 @@
             void setPosition(framing::SequenceNumber pos);
             int getEventMode();
             void setQueueEventManager(QueueEvents&);
+
+            /**
+             * Notify queue that recovery has completed.
+             */
+            void recoveryComplete();
         };
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=748214&r1=748213&r2=748214&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Thu Feb 26 17:18:46 2009
@@ -149,7 +149,8 @@
 
 void RecoveryManagerImpl::recoveryComplete()
 {
-    //TODO (finalise binding setup etc)
+    //notify all queues
+    queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1));
 }
 
 bool RecoverableMessageImpl::loadContent(uint64_t available)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org