You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/09/23 20:43:08 UTC

svn commit: r698276 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/sys: AtomicValue_gcc.h rdma/RdmaIO.cpp rdma/RdmaIO.h

Author: astitcher
Date: Tue Sep 23 11:43:08 2008
New Revision: 698276

URL: http://svn.apache.org/viewvc?rev=698276&view=rev
Log:
Removed the state lock from the RdmaIO code

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h?rev=698276&r1=698275&r2=698276&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AtomicValue_gcc.h Tue Sep 23 11:43:08 2008
@@ -57,7 +57,7 @@
     /** If current value == testval then set to newval. Returns true if the swap was performed. */    
     bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); }
 
-    T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(0); }
+    T get() const { return const_cast<AtomicValue<T>*>(this)->fetchAndAdd(static_cast<T>(0)); }
         
   private:
     T value;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=698276&r1=698275&r2=698276&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Tue Sep 23 11:43:08 2008
@@ -99,14 +99,25 @@
 
     // Mark for deletion/Delete this object when we have no outstanding writes
     void AsynchIO::deferDelete() {
-        {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-        if (outstandingWrites > 0 || state != IDLE) {
-            deleting = true;
+        State oldState;
+        State newState;
+        bool doReturn;
+        //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        // It is safe to assign to deleting here as we either delete ourselves
+        // before leaving this function or deleting is set on exit
+        do {
+            newState = oldState = state.get();
+            doReturn = false;
+            if (outstandingWrites > 0 || oldState != IDLE) {
+                deleting = true;
+                doReturn = true;
+            } else{
+                newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
+            }
+        } while (!state.boolCompareAndSwap(oldState, newState));
+        if (doReturn) {
             return;
         }
-        state = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
-        }
         delete this;
     }
 
@@ -136,7 +147,7 @@
 
     // Mark now closed (so we don't accept any more writes or make any idle callbacks)
     void AsynchIO::queueWriteClose() {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        // Don't think we actually need to lock here as transition is 1 way only to closed
         closed = true;
     }
 
@@ -144,130 +155,150 @@
         // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
         // If we are then we just return as we know that  we will eventually do the idle callback anyway.
         //
-        {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
         // We can get here in any state (as the caller could be in any thread)
-        switch (state) {
-        case NOTIFY_WRITE:
-        case PENDING_NOTIFY:
-            // We only need to note a pending notify if we're already doing a notify as data processing
-            // is always followed by write notification processing
-            state = PENDING_NOTIFY;
-            return;
-        case PENDING_DATA:
-            return;
-        case DATA:
-            // Only need to return here as data processing will do the idleCallback itself anyway
+        State oldState;
+        State newState;
+        bool doReturn;
+        do {
+            newState = oldState = state.get();
+            doReturn = false;
+            switch (oldState) {
+            case NOTIFY_WRITE:
+            case PENDING_NOTIFY:
+                // We only need to note a pending notify if we're already doing a notify as data processing
+                // is always followed by write notification processing
+                newState = PENDING_NOTIFY;
+                doReturn = true;
+                break;
+            case PENDING_DATA:
+                doReturn = true;
+                break;
+            case DATA:
+                // Only need to return here as data processing will do the idleCallback itself anyway
+                doReturn = true;
+                break;
+            case IDLE:
+                newState = NOTIFY_WRITE;
+                break;
+            case DELETED:
+                assert(oldState!=DELETED);
+                doReturn = true;
+            };
+        } while (!state.boolCompareAndSwap(oldState, newState));
+        if (doReturn) {
             return;
-        case IDLE:
-            state = NOTIFY_WRITE;
-            break;
-        case DELETED:
-            assert(state!=DELETED);
-        }
         }
         
         doWriteCallback();
 
         // Keep track of what we need to do so that we can release the lock
-        enum {COMPLETION, NOTIFY} action;
-        {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        enum {COMPLETION, NOTIFY, RETURN, EXIT} action;
         // If there was pending data whilst we were doing this, process it now
-        switch (state) {
-        case NOTIFY_WRITE:
-            state = IDLE;
-            return;
-        case PENDING_DATA:
-            action = COMPLETION;
-            break;
-        case PENDING_NOTIFY:
-            action = NOTIFY;
-            break;
-        default:
-            assert(state!=IDLE && state!=DATA && state!=DELETED);
-            return;
-        }
-        // Using NOTIFY_WRITE for both cases is a bit strange, but we're making sure we get the
+        //
+        // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the
         // correct result if we reenter notifyPendingWrite(), in which case we want to
         // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks
         // not IDLE)
-        state = NOTIFY_WRITE;
-        }
         do {
+            //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+            do {
+                newState = oldState = state.get();
+                action = RETURN; // Anything but COMPLETION
+                switch (oldState) {
+                case NOTIFY_WRITE:
+                    newState = IDLE;
+                    action = (action == COMPLETION) ? EXIT : RETURN;
+                    break;
+                case PENDING_DATA:
+                    newState = NOTIFY_WRITE;
+                    action = COMPLETION;
+                    break;
+                case PENDING_NOTIFY:
+                    newState = NOTIFY_WRITE;
+                    action = NOTIFY;
+                    break;
+                default:
+                    assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED);
+                    action = RETURN;
+                }
+            } while (!state.boolCompareAndSwap(oldState, newState));
+
             // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state
             // so that we do need to process completions or notifications now
             switch (action) {
             case COMPLETION:
                 processCompletions();
+                // Fall through
             case NOTIFY:
                 doWriteCallback();
                 break;
-            }
-            {
-            qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-            switch (state) {
-            case NOTIFY_WRITE:
-                state = IDLE;
-                goto exit;
-            case PENDING_DATA:
-                action = COMPLETION;
-                break;
-            case PENDING_NOTIFY:
-                action = NOTIFY;
-                break;
-            default:
-                assert(state!=IDLE && state!=DATA && state!=DELETED);
+            case RETURN:
+                return;
+            case EXIT:
+                // If we just processed completions we might need to delete ourselves
+                if (deleting && outstandingWrites == 0) {
+                    delete this;
+                }
                 return;
-            }
-            state = NOTIFY_WRITE;
             }
         } while (true);
-    exit:
-        // If we just processed completions we might need to delete ourselves
-        if (action == COMPLETION && deleting && outstandingWrites == 0) {
-            delete this;
-        }
     }
 
     void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) {
         // Keep track of writable notifications
-        {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-        // We're already processing a notification
-        switch (state) {
-        case IDLE:
-            break;
-        default:
-            state = PENDING_DATA;
+        // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        State oldState;
+        State newState;
+        bool doReturn;
+        do {
+            newState = oldState = state.get();
+            doReturn = false;
+            // We're already processing a notification
+            switch (oldState) {
+            case IDLE:
+                newState  = DATA;
+                break;
+            default:
+                // Can't get here in DATA state as that would violate the serialisation rules
+                assert( oldState!=DATA );
+                newState = PENDING_DATA;
+                doReturn = true;
+            }
+        } while (!state.boolCompareAndSwap(oldState, newState));
+        if (doReturn) {
             return;
         }
-        // Can't get here in DATA state as that would violate the serialisation rules
-        assert( state==IDLE );
-        state  = DATA;
-        }
 
         processCompletions();
 
-        {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-        assert( state==DATA );
-        state = NOTIFY_WRITE;
-        }
+        //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        do {
+            newState = oldState = state.get();
+            assert( oldState==DATA );
+            newState = NOTIFY_WRITE;
+        } while (!state.boolCompareAndSwap(oldState, newState));
 
         do {
             doWriteCallback();
 
-            {
-            qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-            if ( state==NOTIFY_WRITE ) {
-                state = IDLE;
+            // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+            bool doBreak;
+            do {
+                newState = oldState = state.get();
+                doBreak = false;
+                if ( oldState==NOTIFY_WRITE ) {
+                    newState = IDLE;
+                    doBreak = true;
+                } else {
+                    // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
+                    assert( oldState==PENDING_NOTIFY );
+                    newState = NOTIFY_WRITE;
+                }
+            } while (!state.boolCompareAndSwap(oldState, newState));
+            if (doBreak) {
                 break;
             }
-            // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
-            assert( state==PENDING_NOTIFY );
-            state = NOTIFY_WRITE;
-            }
         } while (true);
 
         // We might need to delete ourselves

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=698276&r1=698275&r2=698276&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Tue Sep 23 11:43:08 2008
@@ -23,6 +23,7 @@
 
 #include "rdma_wrap.h"
 
+#include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Mutex.h"
 
@@ -53,8 +54,9 @@
         int outstandingWrites;
         bool closed; // TODO: Perhaps (probably) this state can be merged with the following...
         bool deleting; // TODO: Perhaps (probably) this state can be merged with the following...
-        enum { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED } state;
-        qpid::sys::Mutex stateLock;
+        enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED };
+        qpid::sys::AtomicValue<State> state;
+        //qpid::sys::Mutex stateLock;
         std::deque<Buffer*> bufferQueue;
         qpid::sys::Mutex bufferQueueLock;
         boost::ptr_deque<Buffer> buffers;