You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/05/18 23:41:25 UTC
svn commit: r945904 - in /qpid/trunk/qpid/cpp/src/qpid:
client/RdmaConnector.cpp sys/RdmaIOPlugin.cpp sys/rdma/RdmaClient.cpp
sys/rdma/RdmaIO.cpp sys/rdma/RdmaIO.h sys/rdma/RdmaServer.cpp
Author: astitcher
Date: Tue May 18 21:41:25 2010
New Revision: 945904
URL: http://svn.apache.org/viewvc?rev=945904&view=rev
Log:
Fix RDMA for upstream changes which now require notification on shutdown
differently from before
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Tue May 18 21:41:25 2010
@@ -75,14 +75,12 @@ class RdmaConnector : public Connector,
framing::OutputHandler* output;
Rdma::AsynchIO* aio;
+ std::auto_ptr<Rdma::Connector> acon;
sys::Poller::shared_ptr poller;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~RdmaConnector();
- void handleClosed();
- bool closeInternal();
-
// Callbacks
void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
@@ -92,7 +90,9 @@ class RdmaConnector : public Connector,
void readbuff(Rdma::AsynchIO&, Rdma::Buffer*);
void writebuff(Rdma::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
- void eof(Rdma::AsynchIO&);
+ void dataError(Rdma::AsynchIO&);
+ void drained();
+ void stopped(Rdma::AsynchIO* aio=0);
std::string identifier;
@@ -153,26 +153,33 @@ RdmaConnector::RdmaConnector(Poller::sha
QPID_LOG(debug, "RdmaConnector created for " << version);
}
+namespace {
+ void deleteAsynchIO(Rdma::AsynchIO& aio) {
+ delete &aio;
+ }
+}
+
RdmaConnector::~RdmaConnector() {
+ QPID_LOG(debug, "~RdmaConnector " << identifier);
close();
- if (aio) aio->deferDelete();
+ if (aio) aio->stop(deleteAsynchIO);
}
void RdmaConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(pollingLock);
assert(!polling);
- Rdma::Connector* c = new Rdma::Connector(
+ acon.reset(new Rdma::Connector(
Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
boost::bind(&RdmaConnector::disconnected, this, poller, _1),
- boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
+ boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)));
polling = true;
SocketAddress sa(host, boost::lexical_cast<std::string>(port));
- c->start(poller, sa);
+ acon->start(poller, sa);
}
// The following only gets run when connected
@@ -184,7 +191,7 @@ void RdmaConnector::connected(Poller::sh
boost::bind(&RdmaConnector::readbuff, this, _1, _2),
boost::bind(&RdmaConnector::writebuff, this, _1),
0, // write buffers full
- boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection
+ boost::bind(&RdmaConnector::dataError, this, _1));
identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName());
ProtocolInitiation init(version);
@@ -194,31 +201,70 @@ void RdmaConnector::connected(Poller::sh
}
void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
- QPID_LOG(trace, "Connection Error " << identifier);
- eof(*aio);
+ QPID_LOG(debug, "Connection Error " << identifier);
+ {
+ Mutex::ScopedLock l(pollingLock);
+ // If we're closed already then we'll get to drain() anyway
+ if (!polling) return;
+ polling = false;
+ }
+ stopped();
}
void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) {
- eof(*aio);
+ QPID_LOG(debug, "Connection disconnected " << identifier);
+ {
+ Mutex::ScopedLock l(pollingLock);
+ // If we're closed already then we'll get to drain() anyway
+ if (!polling) return;
+ polling = false;
+ }
+ drained();
}
void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) {
- QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
- eof(*aio);
+ QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
+ {
+ Mutex::ScopedLock l(pollingLock);
+ // If we're closed already then we'll get to drain() anyway
+ if (!polling) return;
+ polling = false;
+ }
+ stopped();
}
-bool RdmaConnector::closeInternal() {
+void RdmaConnector::dataError(Rdma::AsynchIO&) {
+ QPID_LOG(debug, "Data Error " << identifier);
+ {
Mutex::ScopedLock l(pollingLock);
- bool ret = polling;
+ // If we're closed already then we'll get to drain() anyway
+ if (!polling) return;
polling = false;
- if (ret) {
- if (aio) aio->queueWriteClose();
}
- return ret;
+ drained();
+}
+
+void RdmaConnector::stopped(Rdma::AsynchIO* aio) {
+ delete aio;
+ if (shutdownHandler) {
+ shutdownHandler->shutdown();
+ }
+}
+
+void RdmaConnector::drained() {
+ QPID_LOG(debug, "RdmaConnector::drained " << identifier);
+ if (aio) {
+ aio->stop(boost::bind(&RdmaConnector::stopped, this, aio));
+ aio = 0;
+ }
}
void RdmaConnector::close() {
- closeInternal();
+ QPID_LOG(debug, "RdmaConnector::close " << identifier);
+ Mutex::ScopedLock l(pollingLock);
+ if (!polling) return;
+ polling = false;
+ if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
}
void RdmaConnector::setInputHandler(InputHandler* handler){
@@ -259,11 +305,6 @@ void RdmaConnector::send(AMQFrame& frame
if (notifyWrite && polling) aio->notifyPendingWrite();
}
-void RdmaConnector::handleClosed() {
- if (closeInternal() && shutdownHandler)
- shutdownHandler->shutdown();
-}
-
// Called in IO thread. (write idle routine)
// This is NOT only called in response to previously calling notifyPendingWrite
void RdmaConnector::writebuff(Rdma::AsynchIO&) {
@@ -340,10 +381,6 @@ void RdmaConnector::writeDataBlock(const
aio->queueWrite(buff);
}
-void RdmaConnector::eof(Rdma::AsynchIO&) {
- handleClosed();
-}
-
void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
{
securityLayer = sl;
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue May 18 21:41:25 2010
@@ -74,6 +74,7 @@ class RdmaIOHandler : public OutputContr
void full(Rdma::AsynchIO& aio);
void idle(Rdma::AsynchIO& aio);
void error(Rdma::AsynchIO& aio);
+ void drained(Rdma::AsynchIO& aio);
};
RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::ConnectionCodec::Factory* f) :
@@ -89,12 +90,18 @@ void RdmaIOHandler::init(Rdma::AsynchIO*
aio = a;
}
+namespace {
+ void deleteAsynchIO(Rdma::AsynchIO& aio) {
+ delete &aio;
+ }
+}
+
RdmaIOHandler::~RdmaIOHandler() {
if (codec)
codec->closed();
delete codec;
- aio->deferDelete();
+ aio->stop(deleteAsynchIO);
}
void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
@@ -108,7 +115,7 @@ void RdmaIOHandler::write(const framing:
}
void RdmaIOHandler::close() {
- aio->queueWriteClose();
+ aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
}
// TODO: Dummy implementation, need to fill this in for heartbeat timeout to work
@@ -133,7 +140,7 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&
aio->queueWrite(buff);
}
if (codec->isClosed())
- aio->queueWriteClose();
+ aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
}
void RdmaIOHandler::initProtocolOut() {
@@ -149,6 +156,9 @@ void RdmaIOHandler::error(Rdma::AsynchIO
close();
}
+void RdmaIOHandler::drained(Rdma::AsynchIO&) {
+}
+
void RdmaIOHandler::full(Rdma::AsynchIO&) {
QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
}
@@ -176,7 +186,7 @@ void RdmaIOHandler::readbuff(Rdma::Async
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
- aio->queueWriteClose();
+ aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
}
}
@@ -195,7 +205,7 @@ void RdmaIOHandler::initProtocolIn(Rdma:
// send valid version header & close connection.
write(framing::ProtocolInitiation(framing::highestProtocolVersion));
readError = true;
- aio->queueWriteClose();
+ aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
}
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp Tue May 18 21:41:25 2010
@@ -117,6 +117,10 @@ void idle(Poller::shared_ptr p, Rdma::As
}
}
+void drained(Rdma::AsynchIO&) {
+ cout << "Drained:\n";
+}
+
void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
cout << "Connected\n";
Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Tue May 18 21:41:25 2010
@@ -49,8 +49,7 @@ namespace Rdma {
recvBufferCount(rCount),
xmitBufferCount(xCredit),
outstandingWrites(0),
- closed(false),
- deleting(false),
+ draining(false),
state(IDLE),
readCallback(rc),
idleCallback(ic),
@@ -85,8 +84,11 @@ namespace Rdma {
if ( outstandingWrites>0 )
QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
- // Turn off callbacks (before doing the deletes)
- dataHandle.stopWatch();
+ // Turn off callbacks if necessary (before doing the deletes)
+ if (state.get() != SHUTDOWN) {
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown");
+ dataHandle.stopWatch();
+ }
// The buffers ptr_deque automatically deletes all the buffers we've allocated
// TODO: It might turn out to be more efficient in high connection loads to reuse the
@@ -99,27 +101,58 @@ namespace Rdma {
}
// Mark for deletion/Delete this object when we have no outstanding writes
- void AsynchIO::deferDelete() {
+ void AsynchIO::stop(NotifyCallback nc) {
+ State oldState;
+ State newState;
+ bool doReturn;
+ //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ do {
+ newState = oldState = state.get();
+ doReturn = false;
+ if (outstandingWrites > 0 || (oldState != IDLE && oldState != DRAINED)) {
+ doReturn = true;
+ break;
+ }
+
+ newState = SHUTDOWN;
+
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doReturn) {
+ notifyCallback = nc;
+ return;
+ }
+ dataHandle.stopWatch();
+ // Callback, but don't store it - SHUTDOWN state means callback has been called
+ // we *are* allowed to delete the AsynchIO in this callback, so we have to return immediately
+ // after the callback
+ nc(*this);
+ }
+
+ // Mark writing closed (so we don't accept any more writes or make any idle callbacks)
+ void AsynchIO::drainWriteQueue(NotifyCallback nc) {
+ draining = true;
+
State oldState;
State newState;
bool doReturn;
//qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- // It is safe to assign to deleting here as we either delete ourselves
- // before leaving this function or deleting is set on exit
do {
newState = oldState = state.get();
doReturn = false;
- if (outstandingWrites > 0 || oldState != IDLE) {
- deleting = true;
+ if (oldState != IDLE) {
doReturn = true;
- } else{
- newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
+ break;
+ }
+
+ if (outstandingWrites == 0) {
+ newState = DRAINED;
}
} while (!state.boolCompareAndSwap(oldState, newState));
if (doReturn) {
+ notifyCallback = nc;
return;
}
- delete this;
+ nc(*this);
}
void AsynchIO::queueWrite(Buffer* buff) {
@@ -146,12 +179,6 @@ namespace Rdma {
}
}
- // Mark now closed (so we don't accept any more writes or make any idle callbacks)
- void AsynchIO::queueWriteClose() {
- // Don't think we actually need to lock here as transition is 1 way only to closed
- closed = true;
- }
-
void AsynchIO::notifyPendingWrite() {
// As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
// If we are then we just return as we know that we will eventually do the idle callback anyway.
@@ -182,8 +209,13 @@ namespace Rdma {
case IDLE:
newState = NOTIFY_WRITE;
break;
- case DELETED:
- assert(oldState!=DELETED);
+ case SHUTDOWN:
+ // This is not allowed - we can't make any more writes as we shut the connection down.
+ assert(oldState!=SHUTDOWN);
+ doReturn = true;
+ case DRAINED:
+ // This is not allowed - we can't make any more writes as we're draining the write queue.
+ assert(oldState!=DRAINED);
doReturn = true;
};
} while (!state.boolCompareAndSwap(oldState, newState));
@@ -220,7 +252,7 @@ namespace Rdma {
action = NOTIFY;
break;
default:
- assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED);
+ assert(oldState!=IDLE && oldState!=DATA && oldState!=SHUTDOWN);
action = RETURN;
}
} while (!state.boolCompareAndSwap(oldState, newState));
@@ -238,8 +270,8 @@ namespace Rdma {
return;
case EXIT:
// If we just processed completions we might need to delete ourselves
- if (deleting && outstandingWrites == 0) {
- delete this;
+ if (notifyCallback && outstandingWrites == 0) {
+ doStoppedCallback();
}
return;
}
@@ -260,6 +292,8 @@ namespace Rdma {
case IDLE:
newState = DATA;
break;
+ case DRAINED:
+ break;
default:
// Can't get here in DATA state as that would violate the serialisation rules
assert( oldState!=DATA );
@@ -276,35 +310,45 @@ namespace Rdma {
//qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
do {
newState = oldState = state.get();
- assert( oldState==DATA );
- newState = NOTIFY_WRITE;
+ switch (oldState) {
+ case DATA:
+ newState = NOTIFY_WRITE;
+ break;
+ case DRAINED:
+ break;
+ default:
+ assert( oldState==DATA || oldState==DRAINED);
+ }
} while (!state.boolCompareAndSwap(oldState, newState));
- do {
+ while (newState==NOTIFY_WRITE) {
doWriteCallback();
// qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- bool doBreak;
do {
newState = oldState = state.get();
- doBreak = false;
if ( oldState==NOTIFY_WRITE ) {
newState = IDLE;
- doBreak = true;
} else {
- // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
+ // Can't get DATA/PENDING_DATA/DRAINED here as dataEvent cannot be reentered
assert( oldState==PENDING_NOTIFY );
newState = NOTIFY_WRITE;
}
} while (!state.boolCompareAndSwap(oldState, newState));
- if (doBreak) {
- break;
+ }
+
+ // If we've got all the write confirmations and we're draining
+ if (draining) {
+ if (outstandingWrites == 0) {
+ doDrainedCallback();
+ draining = false;
}
- } while (true);
+ return;
+ }
// We might need to delete ourselves
- if (deleting && outstandingWrites == 0) {
- delete this;
+ if (notifyCallback && outstandingWrites == 0) {
+ doStoppedCallback();
}
}
@@ -418,6 +462,29 @@ namespace Rdma {
}
}
+ void AsynchIO::doDrainedCallback() {
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ // Transition unconditionally to DRAINED
+ State oldState;
+ do {
+ oldState = state.get();
+ } while (!state.boolCompareAndSwap(oldState, DRAINED));
+ nc(*this);
+ }
+
+ void AsynchIO::doStoppedCallback() {
+ dataHandle.stopWatch();
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ // Transition unconditionally to SHUTDOWN
+ State oldState;
+ do {
+ oldState = state.get();
+ } while (!state.boolCompareAndSwap(oldState, SHUTDOWN));
+ nc(*this);
+ }
+
Buffer* AsynchIO::getBuffer() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
assert(!bufferQueue.empty());
@@ -444,12 +511,13 @@ namespace Rdma {
errorCallback(errc),
disconnectedCallback(dc)
{
+ QPID_LOG(debug, "RDMA: ci=" << ci << ": Creating ConnectionManager");
ci->nonblocking();
}
ConnectionManager::~ConnectionManager()
{
- handle.stopWatch();
+ QPID_LOG(debug, "RDMA: ci=" << ci << ": Deleting ConnectionManager");
}
void ConnectionManager::start(Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr) {
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Tue May 18 21:41:25 2010
@@ -43,8 +43,9 @@ namespace Rdma {
{
typedef boost::function1<void, AsynchIO&> ErrorCallback;
typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
- typedef boost::function1<void, AsynchIO&> IdleCallback;
- typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback;
+ typedef boost::function1<void, AsynchIO&> IdleCallback;
+ typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback;
+ typedef boost::function1<void, AsynchIO&> NotifyCallback;
QueuePair::intrusive_ptr qp;
qpid::sys::DispatchHandleRef dataHandle;
@@ -54,9 +55,8 @@ namespace Rdma {
int recvBufferCount;
int xmitBufferCount;
int outstandingWrites;
- bool closed; // TODO: Perhaps (probably) this state can be merged with the following...
- bool deleting; // TODO: Perhaps (probably) this state can be merged with the following...
- enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED };
+ bool draining; // TODO: Perhaps (probably) this state can be merged with the following...
+ enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN };
qpid::sys::AtomicValue<State> state;
//qpid::sys::Mutex stateLock;
std::deque<Buffer*> bufferQueue;
@@ -67,6 +67,7 @@ namespace Rdma {
IdleCallback idleCallback;
FullCallback fullCallback;
ErrorCallback errorCallback;
+ NotifyCallback notifyCallback;
public:
// TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use
@@ -82,22 +83,20 @@ namespace Rdma {
FullCallback fc,
ErrorCallback ec
);
+ ~AsynchIO();
void start(qpid::sys::Poller::shared_ptr poller);
bool writable() const;
bool bufferAvailable() const;
void queueWrite(Buffer* buff);
void notifyPendingWrite();
- void queueWriteClose();
- void deferDelete();
+ void drainWriteQueue(NotifyCallback);
+ void stop(NotifyCallback);
int incompletedWrites() const;
Buffer* getBuffer();
void returnBuffer(Buffer*);
private:
- // Don't let anyone else delete us to make sure there can't be outstanding callbacks
- ~AsynchIO();
-
// Constants for the peer-peer command messages
// These are sent in the high bits if the imm data of an rdma message
// The low bits are used to send the credit
@@ -107,10 +106,12 @@ namespace Rdma {
void dataEvent(qpid::sys::DispatchHandle& handle);
void processCompletions();
void doWriteCallback();
+ void doStoppedCallback();
+ void doDrainedCallback();
};
inline bool AsynchIO::writable() const {
- return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0);
+ return (!draining && outstandingWrites < xmitBufferCount && xmitCredit > 0);
}
inline int AsynchIO::incompletedWrites() const {
@@ -146,7 +147,7 @@ namespace Rdma {
class ConnectionManager {
Connection::intrusive_ptr ci;
- qpid::sys::DispatchHandle handle;
+ qpid::sys::DispatchHandleRef handle;
protected:
ErrorCallback errorCallback;
@@ -160,7 +161,7 @@ namespace Rdma {
virtual ~ConnectionManager();
- void start(qpid::sys::Poller::shared_ptr polle, const qpid::sys::SocketAddress& addrr);
+ void start(qpid::sys::Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr);
private:
void event(qpid::sys::DispatchHandle& handle);
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp?rev=945904&r1=945903&r2=945904&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp Tue May 18 21:41:25 2010
@@ -86,10 +86,14 @@ void full(ConRec* cr, Rdma::AsynchIO&, R
cr->queuedWrites.push(buf);
}
+void drained(Rdma::AsynchIO&) {
+ cout << "Drained:\n";
+}
+
void disconnected(Rdma::Connection::intrusive_ptr& ci) {
ConRec* cr = ci->getContext<ConRec>();
cr->connection->disconnect();
- cr->data->queueWriteClose();
+ cr->data->drainWriteQueue(drained);
delete cr;
cout << "Disconnected: " << cr << "\n";
}
@@ -98,7 +102,7 @@ void connectionError(Rdma::Connection::i
ConRec* cr = ci->getContext<ConRec>();
cr->connection->disconnect();
if (cr) {
- cr->data->queueWriteClose();
+ cr->data->drainWriteQueue(drained);
delete cr;
}
cout << "Connection error: " << cr << "\n";
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org