You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/11/09 17:56:45 UTC
svn commit: r1407543 - in /qpid/trunk/qpid/cpp/src/qpid/sys:
AggregateOutput.cpp AggregateOutput.h
Author: aconway
Date: Fri Nov 9 16:56:44 2012
New Revision: 1407543
URL: http://svn.apache.org/viewvc?rev=1407543&view=rev
Log:
QPID-4430: HA QMF queue events do not propagate to backups under load (Jason Dillaman)
In a stress tests QMF events were not being propagated to backups. Discovered
that the inter-broker link had hundreds of thousands of enqueued OutputTasks --
representing only a few unique consumers. There should only be only a single
output task for a given consumer. This appears to have stalled the delivery of
QMF messages to the backup broker
Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp?rev=1407543&r1=1407542&r2=1407543&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp Fri Nov 9 16:56:44 2012
@@ -51,6 +51,7 @@ bool AggregateOutput::doOutput() {
while (!tasks.empty()) {
OutputTask* t=tasks.front();
tasks.pop_front();
+ taskSet.erase(t);
bool didOutput;
{
// Allow concurrent call to addOutputTask.
@@ -59,7 +60,9 @@ bool AggregateOutput::doOutput() {
didOutput = t->doOutput();
}
if (didOutput) {
- tasks.push_back(t);
+ if (taskSet.insert(t).second) {
+ tasks.push_back(t);
+ }
return true;
}
}
@@ -68,12 +71,15 @@ bool AggregateOutput::doOutput() {
void AggregateOutput::addOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
- tasks.push_back(task);
+ if (taskSet.insert(task).second) {
+ tasks.push_back(task);
+ }
}
void AggregateOutput::removeOutputTask(OutputTask* task) {
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.erase(task);
tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
}
@@ -81,6 +87,7 @@ void AggregateOutput::removeAll()
{
Mutex::ScopedLock l(lock);
while (busy) lock.wait();
+ taskSet.clear();
tasks.clear();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h?rev=1407543&r1=1407542&r2=1407543&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h Fri Nov 9 16:56:44 2012
@@ -28,6 +28,7 @@
#include <algorithm>
#include <deque>
+#include <set>
namespace qpid {
namespace sys {
@@ -44,9 +45,11 @@ namespace sys {
class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
{
typedef std::deque<OutputTask*> TaskList;
+ typedef std::set<OutputTask*> TaskSet;
Monitor lock;
TaskList tasks;
+ TaskSet taskSet;
bool busy;
OutputControl& control;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org