You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/09/23 20:43:08 UTC
svn commit: r698276 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/sys:
AtomicValue_gcc.h rdma/RdmaIO.cpp rdma/RdmaIO.h
Author: astitcher
Date: Tue Sep 23 11:43:08 2008
New Revision: 698276
URL: http://svn.apache.org/viewvc?rev=698276&view=rev
Log:
Removed the state lock from the RdmaIO code
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h?rev=698276&r1=698275&r2=698276&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h Tue Sep 23 11:43:08 2008
@@ -57,7 +57,7 @@
/** If current value == testval then set to newval. Returns true if the swap was performed. */
bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); }
- T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(0); }
+ T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); }
private:
T value;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=698276&r1=698275&r2=698276&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Tue Sep 23 11:43:08 2008
@@ -99,14 +99,25 @@
// Mark for deletion/Delete this object when we have no outstanding writes
void AsynchIO::deferDelete() {
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- if (outstandingWrites > 0 || state != IDLE) {
- deleting = true;
+ State oldState;
+ State newState;
+ bool doReturn;
+ //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ // It is safe to assign to deleting here as we either delete ourselves
+ // before leaving this function or deleting is set on exit
+ do {
+ newState = oldState = state.get();
+ doReturn = false;
+ if (outstandingWrites > 0 || oldState != IDLE) {
+ deleting = true;
+ doReturn = true;
+ } else{
+ newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
+ }
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doReturn) {
return;
}
- state = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
- }
delete this;
}
@@ -136,7 +147,7 @@
// Mark now closed (so we don't accept any more writes or make any idle callbacks)
void AsynchIO::queueWriteClose() {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ // Don't think we actually need to lock here as transition is 1 way only to closed
closed = true;
}
@@ -144,130 +155,150 @@
// As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
// If we are then we just return as we know that we will eventually do the idle callback anyway.
//
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
// We can get here in any state (as the caller could be in any thread)
- switch (state) {
- case NOTIFY_WRITE:
- case PENDING_NOTIFY:
- // We only need to note a pending notify if we're already doing a notify as data processing
- // is always followed by write notification processing
- state = PENDING_NOTIFY;
- return;
- case PENDING_DATA:
- return;
- case DATA:
- // Only need to return here as data processing will do the idleCallback itself anyway
+ State oldState;
+ State newState;
+ bool doReturn;
+ do {
+ newState = oldState = state.get();
+ doReturn = false;
+ switch (oldState) {
+ case NOTIFY_WRITE:
+ case PENDING_NOTIFY:
+ // We only need to note a pending notify if we're already doing a notify as data processing
+ // is always followed by write notification processing
+ newState = PENDING_NOTIFY;
+ doReturn = true;
+ break;
+ case PENDING_DATA:
+ doReturn = true;
+ break;
+ case DATA:
+ // Only need to return here as data processing will do the idleCallback itself anyway
+ doReturn = true;
+ break;
+ case IDLE:
+ newState = NOTIFY_WRITE;
+ break;
+ case DELETED:
+ assert(oldState!=DELETED);
+ doReturn = true;
+ };
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doReturn) {
return;
- case IDLE:
- state = NOTIFY_WRITE;
- break;
- case DELETED:
- assert(state!=DELETED);
- }
}
doWriteCallback();
// Keep track of what we need to do so that we can release the lock
- enum {COMPLETION, NOTIFY} action;
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ enum {COMPLETION, NOTIFY, RETURN, EXIT} action;
// If there was pending data whilst we were doing this, process it now
- switch (state) {
- case NOTIFY_WRITE:
- state = IDLE;
- return;
- case PENDING_DATA:
- action = COMPLETION;
- break;
- case PENDING_NOTIFY:
- action = NOTIFY;
- break;
- default:
- assert(state!=IDLE && state!=DATA && state!=DELETED);
- return;
- }
- // Using NOTIFY_WRITE for both cases is a bit strange, but we're making sure we get the
+ //
+ // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the
// correct result if we reenter notifyPendingWrite(), in which case we want to
// end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks
// not IDLE)
- state = NOTIFY_WRITE;
- }
do {
+ //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ do {
+ newState = oldState = state.get();
+ action = RETURN; // Anything but COMPLETION
+ switch (oldState) {
+ case NOTIFY_WRITE:
+ newState = IDLE;
+ action = (action == COMPLETION) ? EXIT : RETURN;
+ break;
+ case PENDING_DATA:
+ newState = NOTIFY_WRITE;
+ action = COMPLETION;
+ break;
+ case PENDING_NOTIFY:
+ newState = NOTIFY_WRITE;
+ action = NOTIFY;
+ break;
+ default:
+ assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED);
+ action = RETURN;
+ }
+ } while (!state.boolCompareAndSwap(oldState, newState));
+
// Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state
// so that we do need to process completions or notifications now
switch (action) {
case COMPLETION:
processCompletions();
+ // Fall through
case NOTIFY:
doWriteCallback();
break;
- }
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- switch (state) {
- case NOTIFY_WRITE:
- state = IDLE;
- goto exit;
- case PENDING_DATA:
- action = COMPLETION;
- break;
- case PENDING_NOTIFY:
- action = NOTIFY;
- break;
- default:
- assert(state!=IDLE && state!=DATA && state!=DELETED);
+ case RETURN:
+ return;
+ case EXIT:
+ // If we just processed completions we might need to delete ourselves
+ if (deleting && outstandingWrites == 0) {
+ delete this;
+ }
return;
- }
- state = NOTIFY_WRITE;
}
} while (true);
- exit:
- // If we just processed completions we might need to delete ourselves
- if (action == COMPLETION && deleting && outstandingWrites == 0) {
- delete this;
- }
}
void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) {
// Keep track of writable notifications
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- // We're already processing a notification
- switch (state) {
- case IDLE:
- break;
- default:
- state = PENDING_DATA;
+ // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ State oldState;
+ State newState;
+ bool doReturn;
+ do {
+ newState = oldState = state.get();
+ doReturn = false;
+ // We're already processing a notification
+ switch (oldState) {
+ case IDLE:
+ newState = DATA;
+ break;
+ default:
+ // Can't get here in DATA state as that would violate the serialisation rules
+ assert( oldState!=DATA );
+ newState = PENDING_DATA;
+ doReturn = true;
+ }
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doReturn) {
return;
}
- // Can't get here in DATA state as that would violate the serialisation rules
- assert( state==IDLE );
- state = DATA;
- }
processCompletions();
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- assert( state==DATA );
- state = NOTIFY_WRITE;
- }
+ //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ do {
+ newState = oldState = state.get();
+ assert( oldState==DATA );
+ newState = NOTIFY_WRITE;
+ } while (!state.boolCompareAndSwap(oldState, newState));
do {
doWriteCallback();
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- if ( state==NOTIFY_WRITE ) {
- state = IDLE;
+ // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ bool doBreak;
+ do {
+ newState = oldState = state.get();
+ doBreak = false;
+ if ( oldState==NOTIFY_WRITE ) {
+ newState = IDLE;
+ doBreak = true;
+ } else {
+ // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
+ assert( oldState==PENDING_NOTIFY );
+ newState = NOTIFY_WRITE;
+ }
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doBreak) {
break;
}
- // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
- assert( state==PENDING_NOTIFY );
- state = NOTIFY_WRITE;
- }
} while (true);
// We might need to delete ourselves
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=698276&r1=698275&r2=698276&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Tue Sep 23 11:43:08 2008
@@ -23,6 +23,7 @@
#include "rdma_wrap.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Mutex.h"
@@ -53,8 +54,9 @@
int outstandingWrites;
bool closed; // TODO: Perhaps (probably) this state can be merged with the following...
bool deleting; // TODO: Perhaps (probably) this state can be merged with the following...
- enum { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED } state;
- qpid::sys::Mutex stateLock;
+ enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED };
+ qpid::sys::AtomicValue<State> state;
+ //qpid::sys::Mutex stateLock;
std::deque<Buffer*> bufferQueue;
qpid::sys::Mutex bufferQueueLock;
boost::ptr_deque<Buffer> buffers;