You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/05/18 23:41:25 UTC

svn commit: r945904 - in /qpid/trunk/qpid/cpp/src/qpid: client/RdmaConnector.cpp sys/RdmaIOPlugin.cpp sys/rdma/RdmaClient.cpp sys/rdma/RdmaIO.cpp sys/rdma/RdmaIO.h sys/rdma/RdmaServer.cpp

Author: astitcher
Date: Tue May 18 21:41:25 2010
New Revision: 945904

URL: http://svn.apache.org/viewvc?rev=945904&view=rev
Log:
Fix RDMA for upstream changes which now require notification on shutdown
differently from before

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Tue May 18 21:41:25 2010
@@ -75,14 +75,12 @@ class RdmaConnector : public Connector, 
     framing::OutputHandler* output;
 
     Rdma::AsynchIO* aio;
+    std::auto_ptr<Rdma::Connector> acon;
     sys::Poller::shared_ptr poller;
     std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
 
     ~RdmaConnector();
 
-    void handleClosed();
-    bool closeInternal();
-
     // Callbacks
     void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
     void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
@@ -92,7 +90,9 @@ class RdmaConnector : public Connector, 
     void readbuff(Rdma::AsynchIO&, Rdma::Buffer*);
     void writebuff(Rdma::AsynchIO&);
     void writeDataBlock(const framing::AMQDataBlock& data);
-    void eof(Rdma::AsynchIO&);
+    void dataError(Rdma::AsynchIO&);
+    void drained();
+    void stopped(Rdma::AsynchIO* aio=0);
 
     std::string identifier;
 
@@ -153,26 +153,33 @@ RdmaConnector::RdmaConnector(Poller::sha
     QPID_LOG(debug, "RdmaConnector created for " << version);
 }
 
+namespace {
+    void deleteAsynchIO(Rdma::AsynchIO& aio) {
+        delete &aio;
+    }
+}
+
 RdmaConnector::~RdmaConnector() {
+    QPID_LOG(debug, "~RdmaConnector " << identifier);
     close();
-    if (aio) aio->deferDelete();
+    if (aio) aio->stop(deleteAsynchIO);
 }
 
 void RdmaConnector::connect(const std::string& host, int port){
     Mutex::ScopedLock l(pollingLock);
     assert(!polling);
 
-    Rdma::Connector* c = new Rdma::Connector(
+    acon.reset(new Rdma::Connector(
         Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
         boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
         boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
         boost::bind(&RdmaConnector::disconnected, this, poller, _1),
-        boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
+        boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)));
 
     polling = true;
 
     SocketAddress sa(host, boost::lexical_cast<std::string>(port));
-    c->start(poller, sa);
+    acon->start(poller, sa);
 }
 
 // The following only gets run when connected
@@ -184,7 +191,7 @@ void RdmaConnector::connected(Poller::sh
         boost::bind(&RdmaConnector::readbuff, this, _1, _2),
         boost::bind(&RdmaConnector::writebuff, this, _1),
         0, // write buffers full
-        boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection
+        boost::bind(&RdmaConnector::dataError, this, _1));
 
     identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName());
     ProtocolInitiation init(version);
@@ -194,31 +201,70 @@ void RdmaConnector::connected(Poller::sh
 }
 
 void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
-    QPID_LOG(trace, "Connection Error " << identifier);
-    eof(*aio);
+    QPID_LOG(debug, "Connection Error " << identifier);
+    {
+    Mutex::ScopedLock l(pollingLock);
+    // If we're closed already then we'll get to drain() anyway
+    if (!polling) return;
+    polling = false;
+    }
+    stopped();
 }
 
 void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) {
-    eof(*aio);
+    QPID_LOG(debug, "Connection disconnected " << identifier);
+    {
+    Mutex::ScopedLock l(pollingLock);
+    // If we're closed already then we'll get to drain() anyway
+    if (!polling) return;
+    polling = false;
+    }
+    drained();
 }
 
 void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) {
-    QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
-    eof(*aio);
+    QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
+    {
+    Mutex::ScopedLock l(pollingLock);
+    // If we're closed already then we'll get to drain() anyway
+    if (!polling) return;
+    polling = false;
+    }
+    stopped();
 }
 
-bool RdmaConnector::closeInternal() {
+void RdmaConnector::dataError(Rdma::AsynchIO&) {
+    QPID_LOG(debug, "Data Error " << identifier);
+    {
     Mutex::ScopedLock l(pollingLock);
-    bool ret = polling;
+    // If we're closed already then we'll get to drain() anyway
+    if (!polling) return;
     polling = false;
-    if (ret) {
-        if (aio) aio->queueWriteClose();
     }
-    return ret;
+    drained();
+}
+
+void RdmaConnector::stopped(Rdma::AsynchIO* aio) {
+    delete aio;
+    if (shutdownHandler) {
+        shutdownHandler->shutdown();
+    }
+}
+
+void RdmaConnector::drained() {
+    QPID_LOG(debug, "RdmaConnector::drained " << identifier);
+    if (aio) {
+        aio->stop(boost::bind(&RdmaConnector::stopped, this, aio));
+        aio = 0;
+    }
 }
 
 void RdmaConnector::close() {
-    closeInternal();
+    QPID_LOG(debug, "RdmaConnector::close " << identifier);
+    Mutex::ScopedLock l(pollingLock);
+    if (!polling) return;
+    polling = false;
+    if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
 }
 
 void RdmaConnector::setInputHandler(InputHandler* handler){
@@ -259,11 +305,6 @@ void RdmaConnector::send(AMQFrame& frame
     if (notifyWrite && polling) aio->notifyPendingWrite();
 }
 
-void RdmaConnector::handleClosed() {
-    if (closeInternal() && shutdownHandler)
-        shutdownHandler->shutdown();
-}
-
 // Called in IO thread. (write idle routine)
 // This is NOT only called in response to previously calling notifyPendingWrite
 void RdmaConnector::writebuff(Rdma::AsynchIO&) {
@@ -340,10 +381,6 @@ void RdmaConnector::writeDataBlock(const
     aio->queueWrite(buff);
 }
 
-void RdmaConnector::eof(Rdma::AsynchIO&) {
-    handleClosed();
-}
-
 void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
 {
     securityLayer = sl;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue May 18 21:41:25 2010
@@ -74,6 +74,7 @@ class RdmaIOHandler : public OutputContr
     void full(Rdma::AsynchIO& aio);
     void idle(Rdma::AsynchIO& aio);
     void error(Rdma::AsynchIO& aio);
+    void drained(Rdma::AsynchIO& aio);
 };
 
 RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::ConnectionCodec::Factory* f) :
@@ -89,12 +90,18 @@ void RdmaIOHandler::init(Rdma::AsynchIO*
     aio = a;
 }
 
+namespace {
+    void deleteAsynchIO(Rdma::AsynchIO& aio) {
+        delete &aio;
+    }
+}
+
 RdmaIOHandler::~RdmaIOHandler() {
     if (codec)
         codec->closed();
     delete codec;
 
-    aio->deferDelete();
+    aio->stop(deleteAsynchIO);
 }
 
 void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
@@ -108,7 +115,7 @@ void RdmaIOHandler::write(const framing:
 }
 
 void RdmaIOHandler::close() {
-    aio->queueWriteClose();
+    aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
 }
 
 // TODO: Dummy implementation, need to fill this in for heartbeat timeout to work
@@ -133,7 +140,7 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&
         aio->queueWrite(buff);
     }
     if (codec->isClosed())
-        aio->queueWriteClose();
+        aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
 }
 
 void RdmaIOHandler::initProtocolOut() {
@@ -149,6 +156,9 @@ void RdmaIOHandler::error(Rdma::AsynchIO
     close();
 }
 
+void RdmaIOHandler::drained(Rdma::AsynchIO&) {
+}
+
 void RdmaIOHandler::full(Rdma::AsynchIO&) {
     QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
 }
@@ -176,7 +186,7 @@ void RdmaIOHandler::readbuff(Rdma::Async
     }catch(const std::exception& e){
         QPID_LOG(error, e.what());
         readError = true;
-        aio->queueWriteClose();
+        aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
     }
 }
 
@@ -195,7 +205,7 @@ void RdmaIOHandler::initProtocolIn(Rdma:
             // send valid version header & close connection.
             write(framing::ProtocolInitiation(framing::highestProtocolVersion));
             readError = true;
-            aio->queueWriteClose();
+            aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
         }
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp Tue May 18 21:41:25 2010
@@ -117,6 +117,10 @@ void idle(Poller::shared_ptr p, Rdma::As
     }
 }
 
+void drained(Rdma::AsynchIO&) {
+    cout << "Drained:\n";
+}
+
 void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
     cout << "Connected\n";
     Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Tue May 18 21:41:25 2010
@@ -49,8 +49,7 @@ namespace Rdma {
         recvBufferCount(rCount),
         xmitBufferCount(xCredit),
         outstandingWrites(0),
-        closed(false),
-        deleting(false),
+        draining(false),
         state(IDLE),
         readCallback(rc),
         idleCallback(ic),
@@ -85,8 +84,11 @@ namespace Rdma {
         if ( outstandingWrites>0 )
             QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
 
-        // Turn off callbacks (before doing the deletes)
-        dataHandle.stopWatch();
+        // Turn off callbacks if necessary (before doing the deletes)
+        if (state.get() != SHUTDOWN) {
+            QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown");
+            dataHandle.stopWatch();
+        }
 
         // The buffers ptr_deque automatically deletes all the buffers we've allocated
         // TODO: It might turn out to be more efficient in high connection loads to reuse the
@@ -99,27 +101,58 @@ namespace Rdma {
     }
 
     // Mark for deletion/Delete this object when we have no outstanding writes
-    void AsynchIO::deferDelete() {
+    void AsynchIO::stop(NotifyCallback nc) {
+        State oldState;
+        State newState;
+        bool doReturn;
+        //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        do {
+            newState = oldState = state.get();
+            doReturn = false;
+            if (outstandingWrites > 0 || (oldState != IDLE && oldState != DRAINED)) {
+                doReturn = true;
+                break;
+            }
+
+            newState = SHUTDOWN;
+
+        } while (!state.boolCompareAndSwap(oldState, newState));
+        if (doReturn) {
+            notifyCallback = nc;
+            return;
+        }
+        dataHandle.stopWatch();
+        // Callback, but don't store it - SHUTDOWN state means callback has been called
+        // we *are* allowed to delete the AsynchIO in this callback, so we have to return immediately
+        // after the callback
+        nc(*this);
+    }
+
+    // Mark writing closed (so we don't accept any more writes or make any idle callbacks)
+    void AsynchIO::drainWriteQueue(NotifyCallback nc) {
+        draining = true;
+
         State oldState;
         State newState;
         bool doReturn;
         //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-        // It is safe to assign to deleting here as we either delete ourselves
-        // before leaving this function or deleting is set on exit
         do {
             newState = oldState = state.get();
             doReturn = false;
-            if (outstandingWrites > 0 || oldState != IDLE) {
-                deleting = true;
+            if (oldState != IDLE)  {
                 doReturn = true;
-            } else{
-                newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
+                break;
+            }
+
+            if (outstandingWrites == 0) {
+                newState = DRAINED;
             }
         } while (!state.boolCompareAndSwap(oldState, newState));
         if (doReturn) {
+            notifyCallback = nc;
             return;
         }
-        delete this;
+        nc(*this);
     }
 
     void AsynchIO::queueWrite(Buffer* buff) {
@@ -146,12 +179,6 @@ namespace Rdma {
         }
     }
 
-    // Mark now closed (so we don't accept any more writes or make any idle callbacks)
-    void AsynchIO::queueWriteClose() {
-        // Don't think we actually need to lock here as transition is 1 way only to closed
-        closed = true;
-    }
-
     void AsynchIO::notifyPendingWrite() {
         // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
         // If we are then we just return as we know that  we will eventually do the idle callback anyway.
@@ -182,8 +209,13 @@ namespace Rdma {
             case IDLE:
                 newState = NOTIFY_WRITE;
                 break;
-            case DELETED:
-                assert(oldState!=DELETED);
+            case SHUTDOWN:
+                // This is not allowed - we can't make any more writes as we shut the connection down.
+                assert(oldState!=SHUTDOWN);
+                doReturn = true;
+            case DRAINED:
+                // This is not allowed - we can't make any more writes as we're draining the write queue.
+                assert(oldState!=DRAINED);
                 doReturn = true;
             };
         } while (!state.boolCompareAndSwap(oldState, newState));
@@ -220,7 +252,7 @@ namespace Rdma {
                     action = NOTIFY;
                     break;
                 default:
-                    assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED);
+                    assert(oldState!=IDLE && oldState!=DATA && oldState!=SHUTDOWN);
                     action = RETURN;
                 }
             } while (!state.boolCompareAndSwap(oldState, newState));
@@ -238,8 +270,8 @@ namespace Rdma {
                 return;
             case EXIT:
                 // If we just processed completions we might need to delete ourselves
-                if (deleting && outstandingWrites == 0) {
-                    delete this;
+                if (notifyCallback && outstandingWrites == 0) {
+                    doStoppedCallback();
                 }
                 return;
             }
@@ -260,6 +292,8 @@ namespace Rdma {
             case IDLE:
                 newState  = DATA;
                 break;
+            case DRAINED:
+                break;
             default:
                 // Can't get here in DATA state as that would violate the serialisation rules
                 assert( oldState!=DATA );
@@ -276,35 +310,45 @@ namespace Rdma {
         //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
         do {
             newState = oldState = state.get();
-            assert( oldState==DATA );
-            newState = NOTIFY_WRITE;
+            switch (oldState) {
+            case DATA:
+                newState = NOTIFY_WRITE;
+                break;
+            case DRAINED:
+                break;
+            default:
+                assert( oldState==DATA || oldState==DRAINED);
+            }
         } while (!state.boolCompareAndSwap(oldState, newState));
 
-        do {
+        while (newState==NOTIFY_WRITE) {
             doWriteCallback();
 
             // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
-            bool doBreak;
             do {
                 newState = oldState = state.get();
-                doBreak = false;
                 if ( oldState==NOTIFY_WRITE ) {
                     newState = IDLE;
-                    doBreak = true;
                 } else {
-                    // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
+                    // Can't get DATA/PENDING_DATA/DRAINED here as dataEvent cannot be reentered
                     assert( oldState==PENDING_NOTIFY );
                     newState = NOTIFY_WRITE;
                 }
             } while (!state.boolCompareAndSwap(oldState, newState));
-            if (doBreak) {
-                break;
+        }
+
+        // If we've got all the write confirmations and we're draining
+        if (draining) {
+            if (outstandingWrites == 0) {
+                 doDrainedCallback();
+                 draining = false;
             }
-        } while (true);
+            return;
+        }
 
         // We might need to delete ourselves
-        if (deleting && outstandingWrites == 0) {
-            delete this;
+        if (notifyCallback && outstandingWrites == 0) {
+            doStoppedCallback();
         }
     }
 
@@ -418,6 +462,29 @@ namespace Rdma {
         }
     }
 
+    void AsynchIO::doDrainedCallback() {
+        NotifyCallback nc;
+        nc.swap(notifyCallback);
+        // Transition unconditionally to DRAINED
+        State oldState;
+        do {
+            oldState = state.get();
+        } while (!state.boolCompareAndSwap(oldState, DRAINED));
+        nc(*this);
+    }
+
+    void AsynchIO::doStoppedCallback() {
+        dataHandle.stopWatch();
+        NotifyCallback nc;
+        nc.swap(notifyCallback);
+        // Transition unconditionally to SHUTDOWN
+        State oldState;
+        do {
+            oldState = state.get();
+        } while (!state.boolCompareAndSwap(oldState, SHUTDOWN));
+        nc(*this);
+    }
+
     Buffer* AsynchIO::getBuffer() {
         qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
         assert(!bufferQueue.empty());
@@ -444,12 +511,13 @@ namespace Rdma {
         errorCallback(errc),
         disconnectedCallback(dc)
     {
+        QPID_LOG(debug, "RDMA: ci=" << ci << ": Creating ConnectionManager");
         ci->nonblocking();
     }
 
     ConnectionManager::~ConnectionManager()
     {
-        handle.stopWatch();
+        QPID_LOG(debug, "RDMA: ci=" << ci << ": Deleting ConnectionManager");
     }
 
     void ConnectionManager::start(Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr) {

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Tue May 18 21:41:25 2010
@@ -43,8 +43,9 @@ namespace Rdma {
     {
         typedef boost::function1<void, AsynchIO&> ErrorCallback;
         typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
-        typedef boost::function1<void, AsynchIO&>  IdleCallback;
-        typedef boost::function2<void, AsynchIO&, Buffer*>  FullCallback;
+        typedef boost::function1<void, AsynchIO&> IdleCallback;
+        typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback;
+        typedef boost::function1<void, AsynchIO&> NotifyCallback;
 
         QueuePair::intrusive_ptr qp;
         qpid::sys::DispatchHandleRef dataHandle;
@@ -54,9 +55,8 @@ namespace Rdma {
         int recvBufferCount;
         int xmitBufferCount;
         int outstandingWrites;
-        bool closed; // TODO: Perhaps (probably) this state can be merged with the following...
-        bool deleting; // TODO: Perhaps (probably) this state can be merged with the following...
-        enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED };
+        bool draining; // TODO: Perhaps (probably) this state can be merged with the following...
+        enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN };
         qpid::sys::AtomicValue<State> state;
         //qpid::sys::Mutex stateLock;
         std::deque<Buffer*> bufferQueue;
@@ -67,6 +67,7 @@ namespace Rdma {
         IdleCallback idleCallback;
         FullCallback fullCallback;
         ErrorCallback errorCallback;
+        NotifyCallback notifyCallback;
 
     public:
         // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use
@@ -82,22 +83,20 @@ namespace Rdma {
             FullCallback fc,
             ErrorCallback ec
         );
+        ~AsynchIO();
 
         void start(qpid::sys::Poller::shared_ptr poller);
         bool writable() const;
         bool bufferAvailable() const;
         void queueWrite(Buffer* buff);
         void notifyPendingWrite();
-        void queueWriteClose();
-        void deferDelete();
+        void drainWriteQueue(NotifyCallback);
+        void stop(NotifyCallback);
         int incompletedWrites() const;
         Buffer* getBuffer();
         void returnBuffer(Buffer*);
 
     private:
-        // Don't let anyone else delete us to make sure there can't be outstanding callbacks
-        ~AsynchIO();
-
         // Constants for the peer-peer command messages
         // These are sent in the high bits if the imm data of an rdma message
         // The low bits are used to send the credit
@@ -107,10 +106,12 @@ namespace Rdma {
         void dataEvent(qpid::sys::DispatchHandle& handle);
         void processCompletions();
         void doWriteCallback();
+        void doStoppedCallback();
+        void doDrainedCallback();
     };
 
     inline bool AsynchIO::writable() const {
-        return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0);
+        return (!draining && outstandingWrites < xmitBufferCount && xmitCredit > 0);
     }
 
     inline int AsynchIO::incompletedWrites() const {
@@ -146,7 +147,7 @@ namespace Rdma {
 
     class ConnectionManager {
         Connection::intrusive_ptr ci;
-        qpid::sys::DispatchHandle handle;
+        qpid::sys::DispatchHandleRef handle;
 
     protected:
         ErrorCallback errorCallback;
@@ -160,7 +161,7 @@ namespace Rdma {
 
         virtual ~ConnectionManager();
 
-        void start(qpid::sys::Poller::shared_ptr polle, const qpid::sys::SocketAddress& addrr);
+        void start(qpid::sys::Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr);
 
     private:
         void event(qpid::sys::DispatchHandle& handle);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp Tue May 18 21:41:25 2010
@@ -86,10 +86,14 @@ void full(ConRec* cr, Rdma::AsynchIO&, R
     cr->queuedWrites.push(buf);
 }
 
+void drained(Rdma::AsynchIO&) {
+    cout << "Drained:\n";
+}
+
 void disconnected(Rdma::Connection::intrusive_ptr& ci) {
     ConRec* cr = ci->getContext<ConRec>();
     cr->connection->disconnect();
-    cr->data->queueWriteClose();
+    cr->data->drainWriteQueue(drained);
     delete cr;
     cout << "Disconnected: " << cr << "\n";
 }
@@ -98,7 +102,7 @@ void connectionError(Rdma::Connection::i
     ConRec* cr = ci->getContext<ConRec>();
     cr->connection->disconnect();
     if (cr) {
-        cr->data->queueWriteClose();
+        cr->data->drainWriteQueue(drained);
         delete cr;
     }
     cout << "Connection error: " << cr << "\n";



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org