You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/05/04 17:55:22 UTC
svn commit: r771338 - in /qpid/trunk/qpid/cpp/src: qpid/cluster/ qpid/sys/
qpid/sys/epoll/ qpid/sys/posix/ qpid/sys/windows/ tests/
Author: astitcher
Date: Mon May 4 15:55:21 2009
New Revision: 771338
URL: http://svn.apache.org/viewvc?rev=771338&view=rev
Log:
Refactored the DispatchHandle/Poller code to remove a long standing
set of race conditions.
- Changed Poller naming for better clarity with
new semantics.
- Changed Poller semantics to avoid DispatchHandle
keeping so much state
- Changed Poller so that it will never re-enable a
Handle until Poller::wait is called again on the same thread
that returned the Handle.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Mon May 4 15:55:21 2009
@@ -36,6 +36,10 @@
)
{}
+PollerDispatch::~PollerDispatch() {
+ dispatchHandle.stopWatch();
+}
+
void PollerDispatch::start() {
dispatchHandle.startWatch(poller);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h Mon May 4 15:55:21 2009
@@ -37,6 +37,9 @@
public:
PollerDispatch(Cpg&, boost::shared_ptr<sys::Poller> poller,
boost::function<void()> onError) ;
+
+ ~PollerDispatch();
+
void start();
private:
@@ -47,7 +50,7 @@
Cpg& cpg;
boost::shared_ptr<sys::Poller> poller;
boost::function<void()> onError;
- sys::DispatchHandle dispatchHandle;
+ sys::DispatchHandleRef dispatchHandle;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Mon May 4 15:55:21 2009
@@ -30,6 +30,16 @@
namespace qpid {
namespace sys {
+DispatchHandle::DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
+ PollerHandle(h),
+ readableCallback(rCb),
+ writableCallback(wCb),
+ disconnectedCallback(dCb),
+ state(IDLE)
+{
+}
+
+
DispatchHandle::~DispatchHandle() {
}
@@ -38,123 +48,56 @@
bool w = writableCallback;
ScopedLock<Mutex> lock(stateLock);
- assert(state == IDLE || state == DELAYED_IDLE);
-
- // If no callbacks set then do nothing (that is what we were asked to do!)
- // TODO: Maybe this should be an assert instead
- if (!r && !w) {
- switch (state) {
- case IDLE:
- state = INACTIVE;
- return;
- case DELAYED_IDLE:
- state = DELAYED_INACTIVE;
- return;
- default:
- assert(state == IDLE || state == DELAYED_IDLE);
- }
- }
-
- Poller::Direction d = r ?
- (w ? Poller::INOUT : Poller::INPUT) :
- Poller::OUTPUT;
+ assert(state == IDLE);
poller = poller0;
- poller->addFd(*this, d);
-
- switch (state) {
- case IDLE:
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
- return;
- case DELAYED_IDLE:
- state = r ?
- (w ? DELAYED_RW : DELAYED_R) :
- DELAYED_W;
- return;
- default:
- assert(state == IDLE || state == DELAYED_IDLE);
- }
+ poller->registerHandle(*this);
+ state = WAITING;
+ Poller::Direction dir = r ?
+ ( w ? Poller::INOUT : Poller::INPUT ) :
+ ( w ? Poller::OUTPUT : Poller::NONE );
+ poller->monitorHandle(*this, dir);
}
void DispatchHandle::rewatch() {
bool r = readableCallback;
bool w = writableCallback;
+ if (!r && !w) {
+ return;
+ }
+ Poller::Direction dir = r ?
+ ( w ? Poller::INOUT : Poller::INPUT ) :
+ ( w ? Poller::OUTPUT : Poller::NONE );
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_INACTIVE:
- state = r ?
- (w ? DELAYED_RW : DELAYED_R) :
- DELAYED_W;
- break;
- case DELAYED_DELETE:
- break;
- case INACTIVE:
- case ACTIVE_R:
- case ACTIVE_W: {
- assert(poller);
- Poller::Direction d = r ?
- (w ? Poller::INOUT : Poller::INPUT) :
- Poller::OUTPUT;
- poller->modFd(*this, d);
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
- break;
- }
- case DELAYED_RW:
- case ACTIVE_RW:
- // Don't need to do anything already waiting for readable/writable
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
}
+ assert(poller);
+ poller->monitorHandle(*this, dir);
}
void DispatchHandle::rewatchRead() {
if (!readableCallback) {
return;
}
-
+
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_RW:
- case DELAYED_DELETE:
- break;
- case DELAYED_W:
- state = DELAYED_RW;
- break;
- case DELAYED_INACTIVE:
- state = DELAYED_R;
- break;
- case ACTIVE_R:
- case ACTIVE_RW:
- // Nothing to do: already waiting for readable
- break;
- case INACTIVE:
- assert(poller);
- poller->modFd(*this, Poller::INPUT);
- state = ACTIVE_R;
- break;
- case ACTIVE_W:
- assert(poller);
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
}
+ assert(poller);
+ poller->monitorHandle(*this, Poller::INPUT);
}
void DispatchHandle::rewatchWrite() {
@@ -165,35 +108,14 @@
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_DELETE:
- break;
- case DELAYED_R:
- state = DELAYED_RW;
- break;
- case DELAYED_INACTIVE:
- state = DELAYED_W;
- break;
- case INACTIVE:
- assert(poller);
- poller->modFd(*this, Poller::OUTPUT);
- state = ACTIVE_W;
- break;
- case ACTIVE_R:
- assert(poller);
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_W:
- case ACTIVE_RW:
- // Nothing to do: already waiting for writable
- break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
- }
+ }
+ assert(poller);
+ poller->monitorHandle(*this, Poller::OUTPUT);
}
void DispatchHandle::unwatchRead() {
@@ -204,34 +126,14 @@
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_RW:
- state = DELAYED_W;
- break;
- case DELAYED_W:
- case DELAYED_INACTIVE:
- case DELAYED_DELETE:
- break;
- case ACTIVE_R:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- case ACTIVE_RW:
- assert(poller);
- poller->modFd(*this, Poller::OUTPUT);
- state = ACTIVE_W;
- break;
- case ACTIVE_W:
- case INACTIVE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
}
+ assert(poller);
+ poller->unmonitorHandle(*this, Poller::INPUT);
}
void DispatchHandle::unwatchWrite() {
@@ -242,95 +144,62 @@
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_W:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_RW:
- state = DELAYED_R;
- break;
- case DELAYED_R:
- case DELAYED_INACTIVE:
- case DELAYED_DELETE:
- break;
- case ACTIVE_W:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
- break;
- case ACTIVE_RW:
- assert(poller);
- poller->modFd(*this, Poller::INPUT);
- state = ACTIVE_R;
- break;
- case ACTIVE_R:
- case INACTIVE:
+ case STOPPING:
+ case DELETING:
+ return;
+ default:
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
- }
+ }
+ assert(poller);
+ poller->unmonitorHandle(*this, Poller::OUTPUT);
}
void DispatchHandle::unwatch() {
ScopedLock<Mutex> lock(stateLock);
- switch (state) {
+ switch(state) {
case IDLE:
- case DELAYED_IDLE:
- break;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- state = DELAYED_INACTIVE;
- break;
- case DELAYED_DELETE:
- break;
+ case STOPPING:
+ case DELETING:
+ return;
default:
- assert(poller);
- poller->modFd(*this, Poller::NONE);
- state = INACTIVE;
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
- }
+ }
+ assert(poller);
+ poller->unmonitorHandle(*this, Poller::INOUT);
}
void DispatchHandle::stopWatch() {
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
- case DELAYED_IDLE:
- case DELAYED_DELETE:
+ assert(state != IDLE);
+ return;
+ case STOPPING:
+ assert(state != STOPPING);
return;
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- state = DELAYED_IDLE;
+ case CALLING:
+ state = STOPPING;
break;
- default:
+ case WAITING:
state = IDLE;
break;
- case ACTIVE_DELETE:
- assert(state != ACTIVE_DELETE);
+ case DELETING:
+ return;
}
assert(poller);
- poller->delFd(*this);
+ poller->unregisterHandle(*this);
poller.reset();
}
-// If we are already in the IDLE state we can't do the callback as we might
-// race to delete and callback at the same time
-// TODO: might be able to fix this by adding a new state, but would make
-// the state machine even more complex
+// If we are in the IDLE/STOPPING state we can't do the callback as we've
+// not/no longer got the fd registered in any poller
void DispatchHandle::call(Callback iCb) {
assert(iCb);
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
- case ACTIVE_DELETE:
- assert(false);
+ case STOPPING:
+ case DELETING:
return;
default:
interruptedCallbacks.push(iCb);
@@ -347,27 +216,24 @@
ScopedLock<Mutex> lock(stateLock);
// Ensure that we're no longer watching anything
switch (state) {
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- assert(poller);
- poller->delFd(*this);
- poller.reset();
- // Fallthrough
- case DELAYED_IDLE:
- state = DELAYED_DELETE;
- // Fallthrough
- case DELAYED_DELETE:
- case ACTIVE_DELETE:
- return;
case IDLE:
+ state = DELETING;
break;
- default:
- state = ACTIVE_DELETE;
+ case STOPPING:
+ state = DELETING;
+ return;
+ case WAITING:
+ state = DELETING;
assert(poller);
(void) poller->interrupt(*this);
- poller->delFd(*this);
+ poller->unregisterHandle(*this);
+ return;
+ case CALLING:
+ state = DELETING;
+ assert(poller);
+ poller->unregisterHandle(*this);
+ return;
+ case DELETING:
return;
}
}
@@ -378,43 +244,28 @@
void DispatchHandle::processEvent(Poller::EventType type) {
CallbackQueue callbacks;
- // Note that we are now doing the callbacks
+ // Phase I
{
ScopedLock<Mutex> lock(stateLock);
- // Set up to wait for same events next time unless reset
switch(state) {
- case ACTIVE_R:
- state = DELAYED_R;
- break;
- case ACTIVE_W:
- state = DELAYED_W;
- break;
- case ACTIVE_RW:
- state = DELAYED_RW;
+ case IDLE:
+ // Can get here if a non connection thread stops watching
+ // whilst we were stuck in the above lock
+ return;
+ case WAITING:
+ state = CALLING;
break;
- case ACTIVE_DELETE:
+ case CALLING:
+ assert(state!=CALLING);
+ return;
+ case STOPPING:
+ assert(state!=STOPPING);
+ return;
+ case DELETING:
// Need to make sure we clean up any pending callbacks in this case
std::swap(callbacks, interruptedCallbacks);
goto saybyebye;
- // Can get here in idle if we are stopped in a different thread
- // just after we return with this handle in Poller::wait
- case IDLE:
- // Can get here in INACTIVE if a non connection thread unwatches
- // whilst we were stuck in the above lock
- case INACTIVE:
- // Can only get here in a DELAYED_* state in the rare case
- // that we're already here for reading and we get activated for
- // writing and we can write (it might be possible the other way
- // round too). In this case we're already processing the handle
- // in a different thread in this function so return right away
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case DELAYED_INACTIVE:
- case DELAYED_IDLE:
- case DELAYED_DELETE:
- return;
}
std::swap(callbacks, interruptedCallbacks);
@@ -434,10 +285,9 @@
readableCallback(*this);
writableCallback(*this);
break;
- case Poller::DISCONNECTED:
- {
+ case Poller::DISCONNECTED: {
ScopedLock<Mutex> lock(stateLock);
- state = DELAYED_INACTIVE;
+ poller->unmonitorHandle(*this, Poller::INOUT);
}
if (disconnectedCallback) {
disconnectedCallback(*this);
@@ -466,32 +316,20 @@
{
ScopedLock<Mutex> lock(stateLock);
- // If any of the callbacks re-enabled reading/writing then actually
- // do it now
switch (state) {
- case DELAYED_R:
- poller->modFd(*this, Poller::INPUT);
- state = ACTIVE_R;
- return;
- case DELAYED_W:
- poller->modFd(*this, Poller::OUTPUT);
- state = ACTIVE_W;
- return;
- case DELAYED_RW:
- poller->modFd(*this, Poller::INOUT);
- state = ACTIVE_RW;
- return;
- case DELAYED_INACTIVE:
- state = INACTIVE;
- return;
- case DELAYED_IDLE:
- state = IDLE;
- return;
- default:
- // This should be impossible
- assert(false);
+ case IDLE:
+ assert(state!=IDLE);
+ return;
+ case STOPPING:
+ state = IDLE;
+ return;
+ case WAITING:
+ assert(state!=WAITING);
+ return;
+ case CALLING:
+ state = WAITING;
return;
- case DELAYED_DELETE:
+ case DELETING:
break;
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/DispatchHandle.h Mon May 4 15:55:21 2009
@@ -64,10 +64,11 @@
Poller::shared_ptr poller;
Mutex stateLock;
enum {
- IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
- ACTIVE_DELETE,
- DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
- DELAYED_DELETE
+ IDLE,
+ STOPPING,
+ WAITING,
+ CALLING,
+ DELETING
} state;
public:
@@ -83,14 +84,7 @@
*@param wCb Callback called when the handle is writable.
*@param dCb Callback called when the handle is disconnected.
*/
- QPID_COMMON_EXTERN DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
- PollerHandle(h),
- readableCallback(rCb),
- writableCallback(wCb),
- disconnectedCallback(dCb),
- state(IDLE)
- {}
-
+ QPID_COMMON_EXTERN DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb);
QPID_COMMON_EXTERN ~DispatchHandle();
/** Add this DispatchHandle to the poller to be watched. */
@@ -122,7 +116,6 @@
QPID_COMMON_EXTERN void call(Callback iCb);
protected:
- /** Override to get extra processing done when the DispatchHandle is deleted. */
QPID_COMMON_EXTERN void doDelete();
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Mon May 4 15:55:21 2009
@@ -94,10 +94,10 @@
// Poller run loop
QPID_COMMON_EXTERN void run();
- QPID_COMMON_EXTERN void addFd(PollerHandle& handle, Direction dir);
- QPID_COMMON_EXTERN void delFd(PollerHandle& handle);
- QPID_COMMON_EXTERN void modFd(PollerHandle& handle, Direction dir);
- QPID_COMMON_EXTERN void rearmFd(PollerHandle& handle);
+ QPID_COMMON_EXTERN void registerHandle(PollerHandle& handle);
+ QPID_COMMON_EXTERN void unregisterHandle(PollerHandle& handle);
+ QPID_COMMON_EXTERN void monitorHandle(PollerHandle& handle, Direction dir);
+ QPID_COMMON_EXTERN void unmonitorHandle(PollerHandle& handle, Direction dir);
QPID_COMMON_EXTERN Event wait(Duration timeout = TIME_INFINITE);
};
@@ -108,6 +108,7 @@
class PollerHandlePrivate;
class PollerHandle {
friend class Poller;
+ friend class PollerPrivate;
friend struct Poller::Event;
PollerHandlePrivate* const impl;
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon May 4 15:55:21 2009
@@ -46,6 +46,7 @@
class PollerHandlePrivate {
friend class Poller;
+ friend class PollerPrivate;
friend class PollerHandle;
enum FDStat {
@@ -55,6 +56,7 @@
HUNGUP,
MONITORED_HUNGUP,
INTERRUPTED,
+ INTERRUPTED_HUNGUP,
DELETED
};
@@ -76,7 +78,9 @@
}
void setActive() {
- stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED;
+ stat = (stat == HUNGUP || stat == INTERRUPTED_HUNGUP)
+ ? MONITORED_HUNGUP
+ : MONITORED;
}
bool isInactive() const {
@@ -96,7 +100,10 @@
}
bool isHungup() const {
- return stat == MONITORED_HUNGUP || stat == HUNGUP;
+ return
+ stat == MONITORED_HUNGUP ||
+ stat == HUNGUP ||
+ stat == INTERRUPTED_HUNGUP;
}
void setHungup() {
@@ -105,11 +112,13 @@
}
bool isInterrupted() const {
- return stat == INTERRUPTED;
+ return stat == INTERRUPTED || stat == INTERRUPTED_HUNGUP;
}
void setInterrupted() {
- stat = INTERRUPTED;
+ stat = (stat == MONITORED_HUNGUP || stat == HUNGUP)
+ ? INTERRUPTED_HUNGUP
+ : INTERRUPTED;
}
bool isDeleted() const {
@@ -131,13 +140,13 @@
if (impl->isDeleted()) {
return;
}
+ impl->pollerHandle = 0;
if (impl->isInterrupted()) {
impl->setDeleted();
return;
}
- if (impl->isActive()) {
- impl->setDeleted();
- }
+ assert(impl->isIdle());
+ impl->setDeleted();
}
PollerHandleDeletionManager.markForDeletion(impl);
}
@@ -256,8 +265,13 @@
~PollerPrivate() {
// It's probably okay to ignore any errors here as there can't be data loss
::close(epollFd);
+
+ // Need to put the interruptHandle in idle state to delete it
+ static_cast<PollerHandle&>(interruptHandle).impl->setIdle();
}
-
+
+ void resetMode(PollerHandlePrivate& handle);
+
void interrupt() {
::epoll_event epe;
// Use EPOLLONESHOT so we only wake a single thread
@@ -279,74 +293,122 @@
PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
-void Poller::addFd(PollerHandle& handle, Direction dir) {
+void Poller::registerHandle(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- ::epoll_event epe;
- int op;
+ assert(eh.isIdle());
- if (eh.isIdle()) {
- op = EPOLL_CTL_ADD;
- epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
- } else {
- assert(eh.isActive());
- op = EPOLL_CTL_MOD;
- epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
- }
+ ::epoll_event epe;
+ epe.events = ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe));
- // Record monitoring state of this fd
- eh.events = epe.events;
eh.setActive();
}
-void Poller::delFd(PollerHandle& handle) {
+void Poller::unregisterHandle(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
+
int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
QPID_POSIX_CHECK(rc);
}
+
eh.setIdle();
}
-// modFd is equivalent to delFd followed by addFd
-void Poller::modFd(PollerHandle& handle, Direction dir) {
+void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
+ PollerHandle* ph;
+ {
+ ScopedLock<Mutex> l(eh.lock);
+ assert(!eh.isActive());
+
+ if (eh.isIdle() || eh.isDeleted()) {
+ return;
+ }
+
+ if (eh.events==0) {
+ eh.setActive();
+ return;
+ }
+
+ if (!eh.isInterrupted()) {
+ ::epoll_event epe;
+ epe.events = eh.events | ::EPOLLONESHOT;
+ epe.data.u64 = 0; // Keep valgrind happy
+ epe.data.ptr = &eh;
+
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+
+ eh.setActive();
+ return;
+ }
+ ph = eh.pollerHandle;
+ }
+
+ PollerHandlePrivate& ihp = *static_cast<PollerHandle&>(interruptHandle).impl;
+ ScopedLock<Mutex> l(ihp.lock);
+ interruptHandle.addHandle(*ph);
+ ihp.setActive();
+ interrupt();
+}
+
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
+ ::__uint32_t oldEvents = eh.events;
+ eh.events |= PollerPrivate::directionToEpollEvent(dir);
+
+ // If no change nothing more to do - avoid unnecessary system call
+ if (oldEvents==eh.events) {
+ return;
+ }
+
+ // If we're not actually listening wait till we are to perform change
+ if (!eh.isActive()) {
+ return;
+ }
+
::epoll_event epe;
- epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
+ epe.events = eh.events | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-
- // Record monitoring state of this fd
- eh.events = epe.events;
- eh.setActive();
}
-void Poller::rearmFd(PollerHandle& handle) {
+void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- assert(eh.isInactive());
+ assert(!eh.isIdle());
+ ::__uint32_t oldEvents = eh.events;
+ eh.events &= ~PollerPrivate::directionToEpollEvent(dir);
+
+ // If no change nothing more to do - avoid unnecessary system call
+ if (oldEvents==eh.events) {
+ return;
+ }
+
+ // If we're not actually listening wait till we are to perform change
+ if (!eh.isActive()) {
+ return;
+ }
+
::epoll_event epe;
- epe.events = eh.events;
+ epe.events = eh.events | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-
- eh.setActive();
}
void Poller::shutdown() {
@@ -368,14 +430,25 @@
{
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- if (!eh.isActive()) {
+ if (eh.isIdle() || eh.isDeleted()) {
return false;
}
+
+ if (eh.isInterrupted()) {
+ return true;
+ }
+
+ // Stop monitoring handle for read or write
::epoll_event epe;
epe.events = 0;
epe.data.u64 = 0; // Keep valgrind happy
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
- eh.setInterrupted();
+
+ if (eh.isInactive()) {
+ eh.setInterrupted();
+ return true;
+ }
+ eh.setInterrupted();
}
PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
@@ -414,6 +487,7 @@
}
Poller::Event Poller::wait(Duration timeout) {
+ static __thread PollerHandlePrivate* lastReturnedHandle = 0;
epoll_event epe;
int timeoutMs = (timeout == TIME_INFINITE) ? -1 : timeout / TIME_MSEC;
AbsTime targetTimeout =
@@ -421,6 +495,11 @@
FAR_FUTURE :
AbsTime(now(), timeout);
+ if (lastReturnedHandle) {
+ impl->resetMode(*lastReturnedHandle);
+ lastReturnedHandle = 0;
+ }
+
// Repeat until we weren't interrupted by signal
do {
PollerHandleDeletionManager.markAllUnusedInThisThread();
@@ -460,12 +539,19 @@
}
}
if (wrappedHandle) {
- ScopedLock<Mutex> l(wrappedHandle->impl->lock);
- if (!wrappedHandle->impl->isDeleted()) {
- wrappedHandle->impl->setInactive();
+ PollerHandlePrivate& eh = *wrappedHandle->impl;
+ {
+ ScopedLock<Mutex> l(eh.lock);
+ if (!eh.isDeleted()) {
+ if (!eh.isIdle()) {
+ eh.setInactive();
+ }
+ lastReturnedHandle = &eh;
+ assert(eh.pollerHandle == wrappedHandle);
return Event(wrappedHandle, INTERRUPTED);
}
- PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl);
+ }
+ PollerHandleDeletionManager.markForDeletion(&eh);
}
continue;
}
@@ -482,6 +568,7 @@
// the handle could have gone inactive since we left the epoll_wait
if (eh.isActive()) {
PollerHandle* handle = eh.pollerHandle;
+ assert(handle);
// If the connection has been hungup we could still be readable
// (just not writable), allow us to readable until we get here again
@@ -493,15 +580,8 @@
} else {
eh.setInactive();
}
+ lastReturnedHandle = &eh;
return Event(handle, PollerPrivate::epollToDirection(epe.events));
- } else if (eh.isDeleted()) {
- // The handle has been deleted whilst still active and so must be removed
- // from the poller
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
- // Ignore EBADF since it's quite likely that we could race with closing the fd
- if (rc == -1 && errno != EBADF) {
- QPID_POSIX_CHECK(rc);
- }
}
}
// We only get here if one of the following:
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Mon May 4 15:55:21 2009
@@ -75,6 +75,7 @@
class AsynchAcceptorPrivate {
public:
AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
+ ~AsynchAcceptorPrivate();
void start(Poller::shared_ptr poller);
private:
@@ -109,6 +110,10 @@
s.setNonblocking();
}
+AsynchAcceptorPrivate::~AsynchAcceptorPrivate() {
+ handle.stopWatch();
+}
+
void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
handle.startWatch(poller);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Mon May 4 15:55:21 2009
@@ -146,7 +146,7 @@
}
void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
- poller->addFd(PollerHandle(socket), Poller::INPUT);
+ poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
restart ();
}
@@ -426,7 +426,7 @@
void AsynchIO::start(Poller::shared_ptr poller0) {
poller = poller0;
- poller->addFd(PollerHandle(socket), Poller::INPUT);
+ poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
if (writeQueue.size() > 0) // Already have data queued for write
notifyPendingWrite();
startReading();
@@ -471,7 +471,7 @@
boost::bind(&AsynchIO::completion, this, _1));
IOHandle h(hp);
PollerHandle ph(h);
- poller->addFd(ph, Poller::OUTPUT);
+ poller->monitorHandle(ph, Poller::OUTPUT);
}
void AsynchIO::queueWriteClose() {
@@ -559,7 +559,7 @@
callback);
IOHandle h(hp);
PollerHandle ph(h);
- poller->addFd(ph, Poller::INPUT);
+ poller->monitorHandle(ph, Poller::INPUT);
}
/**
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp Mon May 4 15:55:21 2009
@@ -122,7 +122,7 @@
} while (true);
}
-void Poller::addFd(PollerHandle& handle, Direction dir) {
+void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
HANDLE h = (HANDLE)(handle.impl->fd);
if (h != INVALID_HANDLE_VALUE) {
HANDLE iocpHandle = ::CreateIoCompletionPort (h, impl->iocp, 0, 0);
@@ -146,9 +146,9 @@
}
// All no-ops...
-void Poller::delFd(PollerHandle& handle) {}
-void Poller::modFd(PollerHandle& handle, Direction dir) {}
-void Poller::rearmFd(PollerHandle& handle) {}
+void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {}
+void Poller::registerHandle(PollerHandle& handle) {}
+void Poller::unregisterHandle(PollerHandle& handle) {}
Poller::Event Poller::wait(Duration timeout) {
DWORD timeoutMs = 0;
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp Mon May 4 15:55:21 2009
@@ -76,15 +76,15 @@
if (!armed)
return;
- // addFd will queue a completion for the IOCP; when it's handled, a
+ // monitorHandle will queue a completion for the IOCP; when it's handled, a
// poller thread will call back to dispatch() below.
PollerHandle ph(*this);
- poller->addFd(ph, Poller::INPUT);
+ poller->monitorHandle(ph, Poller::INPUT);
}
void PollableConditionPrivate::dispatch(AsynchIoResult *result)
{
- delete result; // Poller::addFd() allocates this
+ delete result; // Poller::monitorHandle() allocates this
cb(parent);
}
Modified: qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp?rev=771338&r1=771337&r2=771338&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PollerTest.cpp Mon May 4 15:55:21 2009
@@ -111,7 +111,8 @@
PollerHandle h0(f0);
PollerHandle h1(f1);
- poller->addFd(h0, Poller::INOUT);
+ poller->registerHandle(h0);
+ poller->monitorHandle(h0, Poller::INOUT);
// h0 should be writable
Poller::Event event = poller->wait();
@@ -123,19 +124,16 @@
cout << "Wrote(0): " << bytesWritten << " bytes\n";
// Wait for 500ms - h0 no longer writable
- poller->rearmFd(h0);
event = poller->wait(500000000);
assert(event.handle == 0);
// Test we can read it all now
- poller->addFd(h1, Poller::INOUT);
+ poller->registerHandle(h1);
+ poller->monitorHandle(h1, Poller::INOUT);
event = poller->wait();
assert(event.handle == &h1);
assert(event.type == Poller::READ_WRITABLE);
- // Can't interrupt, it's not active
- assert(poller->interrupt(h1) == false);
-
bytesRead = readALot(sv[1]);
assert(bytesRead == bytesWritten);
cout << "Read(1): " << bytesRead << " bytes\n";
@@ -145,39 +143,52 @@
event = poller->wait();
assert(event.handle == &h0);
assert(event.type == Poller::INTERRUPTED);
- assert(poller->interrupt(h0) == false);
// Test multiple interrupts
- poller->rearmFd(h0);
- poller->rearmFd(h1);
assert(poller->interrupt(h0) == true);
assert(poller->interrupt(h1) == true);
- // Make sure we can't interrupt them again
- assert(poller->interrupt(h0) == false);
- assert(poller->interrupt(h1) == false);
+ // Make sure we can interrupt them again
+ assert(poller->interrupt(h0) == true);
+ assert(poller->interrupt(h1) == true);
- // Make sure that they both come out in the correct order
+ // Make sure that they both come out
event = poller->wait();
- assert(event.handle == &h0);
assert(event.type == Poller::INTERRUPTED);
- assert(poller->interrupt(h0) == false);
+ assert(event.handle == &h0 || event.handle == &h1);
+ if (event.handle == &h0) {
+ event = poller->wait();
+ assert(event.type == Poller::INTERRUPTED);
+ assert(event.handle == &h1);
+ } else {
+ event = poller->wait();
+ assert(event.type == Poller::INTERRUPTED);
+ assert(event.handle == &h0);
+ }
+
+ poller->unmonitorHandle(h1, Poller::INOUT);
+
event = poller->wait();
- assert(event.handle == &h1);
- assert(event.type == Poller::INTERRUPTED);
- assert(poller->interrupt(h1) == false);
+ assert(event.handle == &h0);
+ assert(event.type == Poller::WRITABLE);
- // At this point h1 should have been disabled from the poller
- // (as it was just returned) and h0 can write again
- poller->rearmFd(h0);
+ // We didn't write anything so it should still be writable
event = poller->wait();
assert(event.handle == &h0);
assert(event.type == Poller::WRITABLE);
- // Now both the handles should be disabled
+ poller->unmonitorHandle(h0, Poller::INOUT);
+
event = poller->wait(500000000);
assert(event.handle == 0);
+ poller->unregisterHandle(h1);
+ poller->unregisterHandle(h0);
+
+ // Make sure we can't interrupt them now
+ assert(poller->interrupt(h0) == false);
+ assert(poller->interrupt(h1) == false);
+
// Test shutdown
poller->shutdown();
event = poller->wait();
@@ -188,9 +199,6 @@
assert(event.handle == 0);
assert(event.type == Poller::SHUTDOWN);
- poller->delFd(h1);
- poller->delFd(h0);
-
return 0;
} catch (exception& e) {
cout << "Caught exception " << e.what() << "\n";
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org