You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/10/12 18:05:16 UTC

svn commit: r1021822 - in /qpid/trunk/qpid/cpp/src/qpid/sys/rdma: RdmaIO.cpp RdmaIO.h

Author: astitcher
Date: Tue Oct 12 16:05:15 2010
New Revision: 1021822

URL: http://svn.apache.org/viewvc?rev=1021822&view=rev
Log:
Rewrite Rdma::AsynchIO to use deferred code rather than a state machine:
This eliminates a lot of difficult to understand error prone
state machine code

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=1021822&r1=1021821&r2=1021822&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Tue Oct 12 16:05:15 2010
@@ -54,7 +54,8 @@ namespace Rdma {
         readCallback(rc),
         idleCallback(ic),
         fullCallback(fc),
-        errorCallback(ec)
+        errorCallback(ec),
+        pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this))
     {
         qp->nonblocking();
         qp->notifyRecv();
@@ -73,7 +74,7 @@ namespace Rdma {
             QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
 
         // Turn off callbacks if necessary (before doing the deletes)
-        if (state.get() != SHUTDOWN) {
+        if (state.get() != STOPPED) {
             QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown");
             dataHandle.stopWatch();
         }
@@ -88,30 +89,9 @@ namespace Rdma {
 
     // Mark for deletion/Delete this object when we have no outstanding writes
     void AsynchIO::stop(NotifyCallback nc) {
-        State oldState;
-        State newState;
-        bool doReturn;
-        //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-        do {
-            newState = oldState = state.get();
-            doReturn = true;
-            if (oldState == IDLE || oldState == DRAINED) {
-                doReturn = false;
-                newState = SHUTDOWN;
-            }
-        } while (!state.boolCompareAndSwap(oldState, newState));
-        
-        // Ensure we can't get any more callbacks (except for the stopped callback)
-        dataHandle.stopWatch();
-
-        if (doReturn) {
-            notifyCallback = nc;
-            return;
-        }
-        // Callback, but don't store it - SHUTDOWN state means callback has been called
-        // we *are* allowed to delete the AsynchIO in this callback, so we have to return immediately
-        // after the callback
-        nc(*this);
+        state = STOPPED;
+        notifyCallback = nc;
+        dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this));
     }
 
     namespace {
@@ -130,31 +110,8 @@ namespace Rdma {
 
     // Mark writing closed (so we don't accept any more writes or make any idle callbacks)
     void AsynchIO::drainWriteQueue(NotifyCallback nc) {
-        State oldState;
-        State newState;
-        bool doReturn;
-        //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-        do {
-            newState = oldState = state.get();
-            doReturn = true;
-            switch (oldState) {
-            case IDLE:
-                if (outstandingWrites == 0) {
-                    doReturn = false;
-                    newState = DRAINED;
-                    break;
-                }
-                /*FALLTHRU*/
-            default:
-                draining = true;
-                break;
-            }
-        } while (!state.boolCompareAndSwap(oldState, newState));
-        if (doReturn) {
-            notifyCallback = nc;
-            return;
-        }
-        nc(*this);
+        draining = true;
+        notifyCallback = nc;
     }
 
     void AsynchIO::queueWrite(Buffer* buff) {
@@ -183,167 +140,15 @@ namespace Rdma {
     }
 
     void AsynchIO::notifyPendingWrite() {
-        // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
-        // If we are then we just return as we know that  we will eventually do the idle callback anyway.
-        //
-        // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-        // We can get here in any state (as the caller could be in any thread)
-        State oldState;
-        State newState;
-        bool doReturn;
-        do {
-            newState = oldState = state.get();
-            doReturn = false;
-            switch (oldState) {
-            case NOTIFY_WRITE:
-            case PENDING_NOTIFY:
-                // We only need to note a pending notify if we're already doing a notify as data processing
-                // is always followed by write notification processing
-                newState = PENDING_NOTIFY;
-                doReturn = true;
-                break;
-            case PENDING_DATA:
-                doReturn = true;
-                break;
-            case DATA:
-                // Only need to return here as data processing will do the idleCallback itself anyway
-                doReturn = true;
-                break;
-            case IDLE:
-                newState = NOTIFY_WRITE;
-                break;
-            case SHUTDOWN:
-                // We can get here because it is too hard to eliminate all races of stop() and notifyPendingWrite()
-                // just do nothing.
-                doReturn = true;
-            case DRAINED:
-                // This is not allowed - we can't make any more writes as we're draining the write queue.
-                assert(oldState!=DRAINED);
-                doReturn = true;
-            };
-        } while (!state.boolCompareAndSwap(oldState, newState));
-        if (doReturn) {
-            return;
-        }
-
-        doWriteCallback();
-
-        // Keep track of what we need to do so that we can release the lock
-        enum {COMPLETION, NOTIFY, RETURN, EXIT} action;
-        // If there was pending data whilst we were doing this, process it now
-        //
-        // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the
-        // correct result if we reenter notifyPendingWrite(), in which case we want to
-        // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks
-        // not IDLE)
-        do {
-            //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-            do {
-                newState = oldState = state.get();
-                action = RETURN; // Anything but COMPLETION
-                switch (oldState) {
-                case NOTIFY_WRITE:
-                    newState = IDLE;
-                    action = (action == COMPLETION) ? EXIT : RETURN;
-                    break;
-                case PENDING_DATA:
-                    newState = NOTIFY_WRITE;
-                    action = COMPLETION;
-                    break;
-                case PENDING_NOTIFY:
-                    newState = NOTIFY_WRITE;
-                    action = NOTIFY;
-                    break;
-                default:
-                    assert(oldState!=IDLE && oldState!=DATA && oldState!=SHUTDOWN);
-                    action = RETURN;
-                }
-            } while (!state.boolCompareAndSwap(oldState, newState));
-
-            // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state
-            // so that we do need to process completions or notifications now
-            switch (action) {
-            case COMPLETION:
-                processCompletions();
-                // Fall through
-            case NOTIFY:
-                doWriteCallback();
-                break;
-            case RETURN:
-                return;
-            case EXIT:
-                // If we just processed completions we might need to delete ourselves
-                // TODO: XXX: can we delete ourselves correctly in notifyPendingWrite()?
-                checkDrainedStopped();
-                return;
-            }
-        } while (true);
+        dataHandle.call(pendingWriteAction);
     }
 
     void AsynchIO::dataEvent() {
-        // Keep track of writable notifications
-        // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-        State oldState;
-        State newState;
-        bool doReturn;
-        do {
-            newState = oldState = state.get();
-            doReturn = false;
-            // We're already processing a notification
-            switch (oldState) {
-            case IDLE:
-                newState  = DATA;
-                break;
-            case SHUTDOWN:
-                doReturn = true;
-                // Fallthru
-            case DRAINED:
-                break;
-            default:
-                // Can't get here in DATA state as that would violate the serialisation rules
-                assert( oldState!=DATA );
-                newState = PENDING_DATA;
-                doReturn = true;
-            }
-        } while (!state.boolCompareAndSwap(oldState, newState));
-        if (doReturn) {
-            return;
-        }
-
+        if (state.get() == STOPPED) return;
+        
         processCompletions();
 
-        //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-        do {
-            newState = oldState = state.get();
-            switch (oldState) {
-            case DATA:
-                newState = NOTIFY_WRITE;
-                break;
-            case DRAINED:
-                break;
-            default:
-                assert( oldState==DATA || oldState==DRAINED);
-            }
-        } while (!state.boolCompareAndSwap(oldState, newState));
-
-        while (newState==NOTIFY_WRITE) {
-            doWriteCallback();
-
-            // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-            do {
-                newState = oldState = state.get();
-                if ( oldState==NOTIFY_WRITE ) {
-                    newState = IDLE;
-                } else {
-                    // Can't get DATA/PENDING_DATA/DRAINED here as dataEvent cannot be reentered
-                    assert( oldState==PENDING_NOTIFY );
-                    newState = NOTIFY_WRITE;
-                }
-            } while (!state.boolCompareAndSwap(oldState, newState));
-        }
-
-        // We might delete ourselves in here so return immediately
-	checkDrainedStopped();
+        doWriteCallback();
     }
 
     void AsynchIO::processCompletions() {
@@ -471,46 +276,30 @@ namespace Rdma {
                 return;
             }
         }
+        
+        checkDrained();
     }
 
-    void AsynchIO::checkDrainedStopped() {
+    void AsynchIO::checkDrained() {
         // If we've got all the write confirmations and we're draining
         // We might get deleted in the drained callback so return immediately
         if (draining) {
             if (outstandingWrites == 0) {
                  draining = false;
-                 doDrainedCallback();
+                 NotifyCallback nc;
+                 nc.swap(notifyCallback);
+                 nc(*this);
             }
             return;
         }
-
-        // We might need to delete ourselves
-        if (notifyCallback) {
-            doStoppedCallback();
-        }
     }
-
-    void AsynchIO::doDrainedCallback() {
-        NotifyCallback nc;
-        nc.swap(notifyCallback);
-        // Transition unconditionally to DRAINED
-        State oldState;
-        do {
-            oldState = state.get();
-            assert(oldState==IDLE);
-        } while (!state.boolCompareAndSwap(oldState, DRAINED));
-        nc(*this);
-    }
-
+    
     void AsynchIO::doStoppedCallback() {
+        // Ensure we can't get any more callbacks (except for the stopped callback)
+        dataHandle.stopWatch();
+
         NotifyCallback nc;
         nc.swap(notifyCallback);
-        // Transition unconditionally to SHUTDOWN
-        State oldState;
-        do {
-            oldState = state.get();
-            assert(oldState==IDLE);
-        } while (!state.boolCompareAndSwap(oldState, SHUTDOWN));
         nc(*this);
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=1021822&r1=1021821&r2=1021822&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Tue Oct 12 16:05:15 2010
@@ -50,10 +50,9 @@ namespace Rdma {
         int recvBufferCount;
         int xmitBufferCount;
         int outstandingWrites;
-        bool draining; // TODO: Perhaps (probably) this state can be merged with the following...
-        enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN };
+        bool draining;
+        enum State {IDLE, STOPPED};
         qpid::sys::AtomicValue<State> state;
-        //qpid::sys::Mutex stateLock;
         QueuePair::intrusive_ptr qp;
         qpid::sys::DispatchHandleRef dataHandle;
 
@@ -62,6 +61,7 @@ namespace Rdma {
         FullCallback fullCallback;
         ErrorCallback errorCallback;
         NotifyCallback notifyCallback;
+        qpid::sys::DispatchHandle::Callback pendingWriteAction;
 
     public:
         typedef boost::function1<void, AsynchIO&> RequestCallback;
@@ -103,9 +103,8 @@ namespace Rdma {
         void dataEvent();
         void processCompletions();
         void doWriteCallback();
-        void checkDrainedStopped();
+        void checkDrained();
         void doStoppedCallback();
-        void doDrainedCallback();
     };
 
     // We're only writable if:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org