You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/11/08 18:45:58 UTC
svn commit: r593237 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/sys:
Dispatcher.cpp Dispatcher.h
Author: astitcher
Date: Thu Nov 8 09:45:54 2007
New Revision: 593237
URL: http://svn.apache.org/viewvc?rev=593237&view=rev
Log:
Improved Fix for the race condition where you've got a competing read and write
for the same handle. This can happen if we've just got a read event then before
handling it we watch for write events and get one immediately.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?rev=593237&r1=593236&r2=593237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Thu Nov 8 09:45:54 2007
@@ -46,7 +46,7 @@
//TODO: this is a temporary fix to ensure that if two
//events are being processed concurrently, the first thread
//will call dispatchCallbacks serially for each one
- h->handle(event.type);
+ h->dispatchCallbacks(event.type);
} else {
// Handle shutdown
switch (event.type) {
@@ -363,6 +363,18 @@
case ACTIVE_RW:
state = DELAYED_RW;
break;
+ // Can only get here in a DELAYED_* state in the rare case
+ // that we're already here for reading and we get activated for
+ // writing and we can write (it might be possible the other way
+ // round too). In this case we're already processing the handle
+ // in a different thread in this function so return right away
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ case DELAYED_IDLE:
+ case DELAYED_DELETE:
+ return;
default:
assert(false);
}
@@ -426,58 +438,7 @@
break;
}
}
- //TODO: this is a temporary fix to mark the handle as deleted,
- //but delay deletion until the handle loop below ends
- deleted = true;
-}
-
-/**
- * TODO: The following are part of a temporary fix to ensure that
- * where a new event is generated for the same handle while an
- * earlier one is still being processed (due to an interest in
- * writeability being declared) the events are processed serially
- * by the first thread.
- */
-void DispatchHandle::handle(Poller::EventType type)
-{
- if (start(type)) {
- dispatchCallbacks(type);
- drain();
- if (deleted) delete this;
- }
-}
-
-bool DispatchHandle::start(Poller::EventType type)
-{
- Mutex::ScopedLock l(processLock);
- if (processing) {
- events.push(type);
- return false;
- } else {
- processing = true;
- return true;
- }
-}
-
-void DispatchHandle::drain()
-{
- Poller::EventType type;
- while (next(type)) {
- dispatchCallbacks(type);
- }
-}
-
-bool DispatchHandle::next(Poller::EventType& type)
-{
- Mutex::ScopedLock l(processLock);
- if (events.empty()) {
- processing = false;
- return false;
- } else {
- type = events.front();
- events.pop();
- return true;
- }
+ delete this;
}
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?rev=593237&r1=593236&r2=593237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Thu Nov 8 09:45:54 2007
@@ -54,33 +54,13 @@
DELAYED_DELETE
} state;
- /**
- * TODO: The following are part of a temporary fix to ensure that
- * where a new event is generated for the same handle while an
- * earlier one is still being processed (due to an interest in
- * writeability being declared) the events are processed serially
- * by the first thread.
- */
- Mutex processLock;
- bool processing;
- bool deleted;
- std::queue<Poller::EventType> events;
-
- bool start(Poller::EventType type);
- void handle(Poller::EventType type);
- void drain();
- bool next(Poller::EventType& type);
- /**************************************************************/
-
public:
DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) :
PollerHandle(s),
readableCallback(rCb),
writableCallback(wCb),
disconnectedCallback(dCb),
- state(IDLE),
- processing(false),
- deleted(false)
+ state(IDLE)
{}
~DispatchHandle();