You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/11/09 17:56:45 UTC

svn commit: r1407543 - in /qpid/trunk/qpid/cpp/src/qpid/sys: AggregateOutput.cpp AggregateOutput.h

Author: aconway
Date: Fri Nov  9 16:56:44 2012
New Revision: 1407543

URL: http://svn.apache.org/viewvc?rev=1407543&view=rev
Log:
QPID-4430: HA QMF queue events do not propagate to backups under load (Jason Dillaman)

In a stress tests QMF events were not being propagated to backups.  Discovered
that the inter-broker link had hundreds of thousands of enqueued OutputTasks --
representing only a few unique consumers.  There should only be only a single
output task for a given consumer.  This appears to have stalled the delivery of
QMF messages to the backup broker

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp?rev=1407543&r1=1407542&r2=1407543&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp Fri Nov  9 16:56:44 2012
@@ -51,6 +51,7 @@ bool AggregateOutput::doOutput() {
     while (!tasks.empty()) {
         OutputTask* t=tasks.front();
         tasks.pop_front();
+        taskSet.erase(t);
         bool didOutput;
         {
             // Allow concurrent call to addOutputTask.
@@ -59,7 +60,9 @@ bool AggregateOutput::doOutput() {
             didOutput = t->doOutput();
         }
         if (didOutput) {
-            tasks.push_back(t);
+            if (taskSet.insert(t).second) {
+                tasks.push_back(t);
+            }
             return true;
         }
     }
@@ -68,12 +71,15 @@ bool AggregateOutput::doOutput() {
   
 void AggregateOutput::addOutputTask(OutputTask* task) {
     Mutex::ScopedLock l(lock);
-    tasks.push_back(task);
+    if (taskSet.insert(task).second) {
+        tasks.push_back(task);
+    }
 }
 
 void AggregateOutput::removeOutputTask(OutputTask* task) {
     Mutex::ScopedLock l(lock);
     while (busy) lock.wait();
+    taskSet.erase(task);
     tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
 }
   
@@ -81,6 +87,7 @@ void AggregateOutput::removeAll()
 {
     Mutex::ScopedLock l(lock);
     while (busy) lock.wait();
+    taskSet.clear();
     tasks.clear();
 }
   

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h?rev=1407543&r1=1407542&r2=1407543&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h Fri Nov  9 16:56:44 2012
@@ -28,6 +28,7 @@
 
 #include <algorithm>
 #include <deque>
+#include <set>
 
 namespace qpid {
 namespace sys {
@@ -44,9 +45,11 @@ namespace sys {
 class QPID_COMMON_CLASS_EXTERN AggregateOutput : public OutputTask, public OutputControl
 {
     typedef std::deque<OutputTask*> TaskList;
+    typedef std::set<OutputTask*> TaskSet;
 
     Monitor lock;
     TaskList tasks;
+    TaskSet taskSet;
     bool busy;
     OutputControl& control;
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org