You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/09/19 16:15:54 UTC

svn commit: r697101 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: client/RdmaConnector.cpp sys/RdmaIOPlugin.cpp sys/rdma/RdmaIO.cpp sys/rdma/RdmaIO.h

Author: astitcher
Date: Fri Sep 19 07:15:54 2008
New Revision: 697101

URL: http://svn.apache.org/viewvc?rev=697101&view=rev
Log:
RDMA bugfixes:
- Changed Rdma connection creation to allocate all necessary buffer
  memory immediately. This has the effect that no later buffer allocations happen
  which can fail so that once accepted connections won't fail because of lack of
  locked memory.
- Fixed connection logic so we reject a new connection if we can't create the necessary
  handlers rather than kill the entire broker (this includes not enough locked memory)

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=697101&r1=697100&r2=697101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Fri Sep 19 07:15:54 2008
@@ -305,6 +305,7 @@
     Mutex::ScopedLock l(lock);
     identifier = id;
     aio = a;
+    assert(aio->bufferAvailable());
     newBuffer();
 }
 void RdmaConnector::Writer::handle(framing::AMQFrame& frame) { 
@@ -346,7 +347,7 @@
     if (lastEof==0)
         return;
     size_t bytesWritten = 0;
-    while (aio->writable() && !frames.empty()) {
+    while (aio->writable() && aio->bufferAvailable() && !frames.empty()) {
         const AMQFrame* frame = &frames.front();        
         uint32_t size = frame->size();
         while (size <= encode.available()) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=697101&r1=697100&r2=697101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Fri Sep 19 07:15:54 2008
@@ -96,7 +96,7 @@
 
 void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
 {
-    QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");
+    QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")");
     Rdma::Buffer* buff = aio->getBuffer();
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
@@ -113,7 +113,9 @@
 }
 
 void RdmaIOHandler::idle(Rdma::AsynchIO&) {
-    if (!aio->writable()) {
+    // TODO: Shouldn't need this test as idle() should only ever be called when
+    // the connection is writable anyway
+    if ( !(aio->writable() && aio->bufferAvailable()) ) {
         return;
     }
     if (isClient && codec == 0) {
@@ -138,7 +140,7 @@
 }
 
 void RdmaIOHandler::full(Rdma::AsynchIO&) {
-    QPID_LOG(debug, "buffer full [" << identifier << "]");
+    QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
 }
 
 // The logic here is subtly different from TCP as RDMA is message oriented
@@ -163,7 +165,7 @@
         framing::ProtocolInitiation protocolInit;
         if (protocolInit.decode(in)) {
             decoded = in.getPosition();
-            QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")");
+            QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
             try {
                 codec = factory->create(protocolInit.getVersion(), *this, identifier);
                 if (!codec) {
@@ -231,19 +233,28 @@
 
 bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
         ConnectionCodec::Factory* f) {
-    RdmaIOHandler* async = new RdmaIOHandler(ci, f);
-    Rdma::AsynchIO* aio =
-        new Rdma::AsynchIO(ci->getQueuePair(),
-            cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
-            boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
-            boost::bind(&RdmaIOHandler::idle, async, _1),
-            0, // boost::bind(&RdmaIOHandler::full, async, _1),
-            boost::bind(&RdmaIOHandler::error, async, _1));
-    async->init(aio);
-
-    // Record aio so we can get it back from a connection
-    ci->addContext(async);
-    return true;
+    try {
+        RdmaIOHandler* async = new RdmaIOHandler(ci, f);
+        Rdma::AsynchIO* aio =
+            new Rdma::AsynchIO(ci->getQueuePair(),
+                cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
+                boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
+                boost::bind(&RdmaIOHandler::idle, async, _1),
+                0, // boost::bind(&RdmaIOHandler::full, async, _1),
+                boost::bind(&RdmaIOHandler::error, async, _1));
+        async->init(aio);
+    
+        // Record aio so we can get it back from a connection
+        ci->addContext(async);
+        return true;
+    } catch (const Rdma::Exception& e) {
+        QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma excepion): " << e.what());
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what());
+    }
+
+    // If we get here we caught an exception so reject connection
+    return false;
 }
 
 void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
@@ -312,7 +323,7 @@
     string port = ss.str();
     int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
     if (n<0) {
-        throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+        throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n)));
     }
 
     Rdma::Connector c(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=697101&r1=697100&r2=697101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Fri Sep 19 07:15:54 2008
@@ -62,11 +62,21 @@
 
         // Prepost some recv buffers before we go any further
         for (int i = 0; i<recvBufferCount; ++i) {
+            // Allocate recv buffer
             Buffer* b = qp->createBuffer(bufferSize);
             buffers.push_front(b);
             b->dataCount = b->byteCount;
             qp->postRecv(b);
         }
+            
+        for (int i = 0; i<xmitBufferCount; ++i) {
+            // Allocate xmit buffer
+            Buffer* b = qp->createBuffer(bufferSize);
+            buffers.push_front(b);
+            bufferQueue.push_front(b);
+            b->dataCount = 0;
+            b->dataStart = 0;
+        }
     }
 
     AsynchIO::~AsynchIO() {
@@ -378,18 +388,12 @@
 
     Buffer* AsynchIO::getBuffer() {
         qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
-        if (bufferQueue.empty()) {
-            Buffer* b = qp->createBuffer(bufferSize);
-            buffers.push_front(b);
-            return b;
-        } else {
-            Buffer* b = bufferQueue.front();
-            bufferQueue.pop_front();
-            b->dataCount = 0;
-            b->dataStart = 0;
-            return b;
-        }
-
+        assert(!bufferQueue.empty());
+        Buffer* b = bufferQueue.front();
+        bufferQueue.pop_front();
+        b->dataCount = 0;
+        b->dataStart = 0;
+        return b;
     }
     
     void AsynchIO::returnBuffer(Buffer* b) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=697101&r1=697100&r2=697101&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Fri Sep 19 07:15:54 2008
@@ -65,6 +65,9 @@
         ErrorCallback errorCallback;
 
     public:
+        // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use
+        // for buffers both read and write (allocate half to each up front) and fail if we cannot allocate that much
+        // locked memory
         AsynchIO(
             QueuePair::intrusive_ptr q,
             int size,
@@ -78,6 +81,7 @@
 
         void start(qpid::sys::Poller::shared_ptr poller);
         bool writable() const;
+        bool bufferAvailable() const;
         void queueWrite(Buffer* buff);
         void notifyPendingWrite();
         void queueWriteClose();
@@ -109,6 +113,9 @@
         return outstandingWrites;
     }
 
+    inline bool AsynchIO::bufferAvailable() const {
+        return !bufferQueue.empty();
+    }
     // These are the parameters necessary to start the conversation
     // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer
     // * Each peer HAS to know the initial "credit" it has for transmitting to its peer