You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/03 12:37:02 UTC

svn commit: r1066782 - in /qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker: QueueFlowLimit.cpp QueueFlowLimit.h

Author: kgiusti
Date: Thu Feb  3 11:37:02 2011
New Revision: 1066782

URL: http://svn.apache.org/viewvc?rev=1066782&view=rev
Log:
release all pending msgs when flow enabled

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1066782&r1=1066781&r2=1066782&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Thu Feb  3 11:37:02 2011
@@ -136,9 +136,9 @@ bool QueueFlowLimit::consume(const Queue
         QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position);
     }
 
-    if (flowStopped || !pendingFlow.empty()) {
+    if (flowStopped || !index.empty()) {
         msg.payload->getReceiveCompletion().startCompleter();    // don't complete until flow resumes
-        pendingFlow.push_back(msg.payload);
+        //pendingFlow.push_back(msg.payload);
         index.insert(msg.payload);
     }
 
@@ -176,33 +176,30 @@ bool QueueFlowLimit::replenish(const Que
         QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." );
     }
 
-    if (!flowStopped && !pendingFlow.empty()) {
-        // if msg is flow controlled, release it.
-        std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
-        if (itr != index.end()) {
-            (*itr)->getReceiveCompletion().finishCompleter();
-            index.erase(itr);
-            // stupid:
-            std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
-                                                                             pendingFlow.end(),
-                                                                             msg.payload);
-            if (itr2 == pendingFlow.end()) {
-                QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position);
-            } else {
-                pendingFlow.erase(itr2);
+    if (!index.empty()) {
+        if (!flowStopped) {
+            // flow enabled - release all pending msgs
+            while (!index.empty()) {
+                std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+                (*itr)->getReceiveCompletion().finishCompleter();
+                index.erase(itr);
             }
-        }
-
-        // for now, just release the oldest also
-        if (!pendingFlow.empty()) {
-            pendingFlow.front()->getReceiveCompletion().finishCompleter();
-            itr = index.find(pendingFlow.front());
-            if (itr == index.end()) {
-                QPID_LOG(error, "Queue \"" << queueName << "\": msg missing in index: " << pendingFlow.front());
-            } else {
+        } else {
+            // even if flow controlled, we must release this msg as it is being dequeued
+            std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
+            if (itr != index.end()) {       // this msg is flow controlled, release it:
+                (*itr)->getReceiveCompletion().finishCompleter();
                 index.erase(itr);
+                //// stupid: (hopefully this is the next pending msg)
+                //std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
+                //                                                                 pendingFlow.end(),
+                //                                                                 msg.payload);
+                //if (itr2 == pendingFlow.end()) {
+                //    QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position);
+                //} else {
+                //    pendingFlow.erase(itr2);
+                //}
             }
-            pendingFlow.pop_front();
         }
     }
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1066782&r1=1066781&r2=1066782&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Thu Feb  3 11:37:02 2011
@@ -85,8 +85,9 @@ class QueueFlowLimit
 
  protected:
     // msgs waiting for flow to become available.
-    std::list< boost::intrusive_ptr<Message> > pendingFlow;     // ordered, oldest @front
     std::set< boost::intrusive_ptr<Message> > index;
+    // KAG: is this necessary?  Not if we release all pending when level < low (?)
+    // std::list< boost::intrusive_ptr<Message> > pendingFlow;     // ordered, oldest @front
     qpid::sys::Mutex pendingFlowLock;
 
     QueueFlowLimit(Queue *queue,



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org