You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/03 12:37:02 UTC
svn commit: r1066782 - in /qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker:
QueueFlowLimit.cpp QueueFlowLimit.h
Author: kgiusti
Date: Thu Feb 3 11:37:02 2011
New Revision: 1066782
URL: http://svn.apache.org/viewvc?rev=1066782&view=rev
Log:
release all pending msgs when flow enabled
Modified:
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1066782&r1=1066781&r2=1066782&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Thu Feb 3 11:37:02 2011
@@ -136,9 +136,9 @@ bool QueueFlowLimit::consume(const Queue
QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position);
}
- if (flowStopped || !pendingFlow.empty()) {
+ if (flowStopped || !index.empty()) {
msg.payload->getReceiveCompletion().startCompleter(); // don't complete until flow resumes
- pendingFlow.push_back(msg.payload);
+ //pendingFlow.push_back(msg.payload);
index.insert(msg.payload);
}
@@ -176,33 +176,30 @@ bool QueueFlowLimit::replenish(const Que
QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." );
}
- if (!flowStopped && !pendingFlow.empty()) {
- // if msg is flow controlled, release it.
- std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
- if (itr != index.end()) {
- (*itr)->getReceiveCompletion().finishCompleter();
- index.erase(itr);
- // stupid:
- std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
- pendingFlow.end(),
- msg.payload);
- if (itr2 == pendingFlow.end()) {
- QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position);
- } else {
- pendingFlow.erase(itr2);
+ if (!index.empty()) {
+ if (!flowStopped) {
+ // flow enabled - release all pending msgs
+ while (!index.empty()) {
+ std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+ (*itr)->getReceiveCompletion().finishCompleter();
+ index.erase(itr);
}
- }
-
- // for now, just release the oldest also
- if (!pendingFlow.empty()) {
- pendingFlow.front()->getReceiveCompletion().finishCompleter();
- itr = index.find(pendingFlow.front());
- if (itr == index.end()) {
- QPID_LOG(error, "Queue \"" << queueName << "\": msg missing in index: " << pendingFlow.front());
- } else {
+ } else {
+ // even if flow controlled, we must release this msg as it is being dequeued
+ std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
+ if (itr != index.end()) { // this msg is flow controlled, release it:
+ (*itr)->getReceiveCompletion().finishCompleter();
index.erase(itr);
+ //// stupid: (hopefully this is the next pending msg)
+ //std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
+ // pendingFlow.end(),
+ // msg.payload);
+ //if (itr2 == pendingFlow.end()) {
+ // QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position);
+ //} else {
+ // pendingFlow.erase(itr2);
+ //}
}
- pendingFlow.pop_front();
}
}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1066782&r1=1066781&r2=1066782&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Thu Feb 3 11:37:02 2011
@@ -85,8 +85,9 @@ class QueueFlowLimit
protected:
// msgs waiting for flow to become available.
- std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front
std::set< boost::intrusive_ptr<Message> > index;
+ // KAG: is this necessary? Not if we release all pending when level < low (?)
+ // std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front
qpid::sys::Mutex pendingFlowLock;
QueueFlowLimit(Queue *queue,
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org