You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/11/06 17:45:31 UTC

svn commit: r592485 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/sys: Dispatcher.cpp Dispatcher.h

Author: gsim
Date: Tue Nov  6 08:45:30 2007
New Revision: 592485

URL: http://svn.apache.org/viewvc?rev=592485&view=rev
Log:
Temporary fix to issue that results in an assertion from Dispatcher.cpp. Where an interest in write is signalled just as a readable event is triggered it is possible for a writeable (or read-writeable) event to be triggered before the earlier event is processed. This change ensures they are processed serially by queueing them up for the first thread to handle.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?rev=592485&r1=592484&r2=592485&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Tue Nov  6 08:45:30 2007
@@ -43,7 +43,10 @@
 
         // If can read/write then dispatch appropriate callbacks        
         if (h) {
-            h->dispatchCallbacks(event.type);
+            //TODO: this is a temporary fix to ensure that if two
+            //events are being processed concurrently, the first thread
+            //will call dispatchCallbacks serially for each one
+            h->handle(event.type);
         } else {
             // Handle shutdown
             switch (event.type) {
@@ -422,8 +425,59 @@
     case DELAYED_DELETE:
         break;
     }
-    }            
-    delete this;
+    }      
+    //TODO: this is a temporary fix to mark the handle as deleted,
+    //but delay deletion until the handle loop below ends
+    deleted = true;
+}
+
+/**
+ * TODO: The following are part of a temporary fix to ensure that
+ * where a new event is generated for the same handle while an
+ * earlier one is still being processed (due to an interest in
+ * writeability being declared) the events are processed serially
+ * by the first thread.
+ */
+void DispatchHandle::handle(Poller::EventType type)
+{
+    if (start(type)) {
+        dispatchCallbacks(type);
+        drain();
+        if (deleted) delete this;
+    }       
+}
+
+bool DispatchHandle::start(Poller::EventType type) 
+{
+    Mutex::ScopedLock l(processLock);
+    if (processing) {
+        events.push(type);
+        return false;
+    } else {
+        processing = true;
+        return true;
+    }
+}
+
+void DispatchHandle::drain()
+{
+    Poller::EventType type;
+    while (next(type)) {
+        dispatchCallbacks(type);
+    }
+}
+
+bool DispatchHandle::next(Poller::EventType& type)
+{
+    Mutex::ScopedLock l(processLock);
+    if (events.empty()) {
+        processing = false; 
+        return false;          
+    } else {
+        type = events.front();
+        events.pop();
+        return true;
+    }
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?rev=592485&r1=592484&r2=592485&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Tue Nov  6 08:45:30 2007
@@ -27,6 +27,7 @@
 #include "Mutex.h"
 
 #include <memory>
+#include <queue>
 #include <boost/function.hpp>
 
 #include <assert.h>
@@ -53,13 +54,33 @@
         DELAYED_DELETE
     } state;
 
+    /**
+     * TODO: The following are part of a temporary fix to ensure that
+     * where a new event is generated for the same handle while an
+     * earlier one is still being processed (due to an interest in
+     * writeability being declared) the events are processed serially
+     * by the first thread.
+     */
+    Mutex processLock;
+    bool processing;
+    bool deleted;
+    std::queue<Poller::EventType> events;
+
+    bool start(Poller::EventType type);
+    void handle(Poller::EventType type);
+    void drain();
+    bool next(Poller::EventType& type);
+    /**************************************************************/
+
 public:
     DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) :
       PollerHandle(s),
       readableCallback(rCb),
       writableCallback(wCb),
       disconnectedCallback(dCb),
-      state(IDLE)
+      state(IDLE),
+      processing(false),
+      deleted(false)
     {}
 
     ~DispatchHandle();