You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/06/14 16:50:33 UTC

svn commit: r954496 - in /qpid/trunk/qpid/cpp/src/qpid: client/RdmaConnector.cpp sys/RdmaIOPlugin.cpp sys/rdma/RdmaClient.cpp sys/rdma/RdmaIO.cpp sys/rdma/RdmaServer.cpp sys/rdma/rdma_wrap.cpp sys/rdma/rdma_wrap.h

Author: astitcher
Date: Mon Jun 14 14:50:33 2010
New Revision: 954496

URL: http://svn.apache.org/viewvc?rev=954496&view=rev
Log:
Combine Rdma::Buffer and ibv_sge needed to send it

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h

Modified: qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=954496&r1=954495&r2=954496&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Mon Jun 14 14:50:33 2010
@@ -326,10 +326,9 @@ void RdmaConnector::writebuff(Rdma::Asyn
     Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
     if (codec->canEncode()) {
         std::auto_ptr<BufferBase> buffer = std::auto_ptr<BufferBase>(aio->getBuffer());
-        size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
+        size_t encoded = codec->encode(buffer->bytes(), buffer->byteCount());
 
-        buffer->dataStart = 0;
-        buffer->dataCount = encoded;
+        buffer->dataCount(encoded);
         aio->queueWrite(buffer.release());
     }
 }
@@ -362,7 +361,7 @@ size_t RdmaConnector::encode(const char*
 
 void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
     Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
-    codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+    codec->decode(buff->bytes(), buff->dataCount());
 }
 
 size_t RdmaConnector::decode(const char* buffer, size_t size) 
@@ -386,9 +385,9 @@ size_t RdmaConnector::decode(const char*
 
 void RdmaConnector::writeDataBlock(const AMQDataBlock& data) {
     Rdma::Buffer* buff = aio->getBuffer();
-    framing::Buffer out(buff->bytes, buff->byteCount);
+    framing::Buffer out(buff->bytes(), buff->byteCount());
     data.encode(out);
-    buff->dataCount = data.encodedSize();
+    buff->dataCount(data.encodedSize());
     aio->queueWrite(buff);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=954496&r1=954495&r2=954496&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Mon Jun 14 14:50:33 2010
@@ -108,9 +108,9 @@ void RdmaIOHandler::write(const framing:
 {
     QPID_LOG(debug, "Rdma: SENT [" << identifier << "] INIT(" << data << ")");
     Rdma::Buffer* buff = aio->getBuffer();
-    framing::Buffer out(buff->bytes, buff->byteCount);
+    framing::Buffer out(buff->bytes(), buff->byteCount());
     data.encode(out);
-    buff->dataCount = data.encodedSize();
+    buff->dataCount(data.encodedSize());
     aio->queueWrite(buff);
 }
 
@@ -135,8 +135,8 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&
     if (codec == 0) return;
     if (codec->canEncode()) {
         Rdma::Buffer* buff = aio->getBuffer();
-        size_t encoded=codec->encode(buff->bytes, buff->byteCount);
-        buff->dataCount = encoded;
+        size_t encoded=codec->encode(buff->bytes(), buff->byteCount());
+        buff->dataCount(encoded);
         aio->queueWrite(buff);
     }
     if (codec->isClosed())
@@ -178,7 +178,7 @@ void RdmaIOHandler::readbuff(Rdma::Async
     size_t decoded = 0;
     try {
         if (codec) {
-            decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+            decoded = codec->decode(buff->bytes(), buff->dataCount());
         }else{
             // Need to start protocol processing
             initProtocolIn(buff);
@@ -191,7 +191,7 @@ void RdmaIOHandler::readbuff(Rdma::Async
 }
 
 void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
-    framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+    framing::Buffer in(buff->bytes(), buff->dataCount());
     framing::ProtocolInitiation protocolInit;
     size_t decoded = 0;
     if (protocolInit.decode(in)) {

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp?rev=954496&r1=954495&r2=954496&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp Mon Jun 14 14:50:33 2010
@@ -67,8 +67,8 @@ vector<char> testString;
 void write(Rdma::AsynchIO& aio) {
     while (aio.writable() && aio.bufferAvailable() && smsgs < target) {
         Rdma::Buffer* b = aio.getBuffer();
-        std::copy(testString.begin(), testString.end(), b->bytes);
-        b->dataCount = msgsize;
+        std::copy(testString.begin(), testString.end(), b->bytes());
+        b->dataCount(msgsize);
         aio.queueWrite(b);
         ++smsgs;
         sbytes += msgsize;
@@ -81,7 +81,7 @@ void dataError(Rdma::AsynchIO&) {
 
 void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
     ++rmsgs;
-    rbytes += b->dataCount;
+    rbytes += b->dataCount();
 
     // When all messages have been recvd stop
     if (rmsgs < target) {
@@ -99,7 +99,7 @@ void full(Rdma::AsynchIO& a, Rdma::Buffe
 
     // Don't need to keep buffer just adjust the counts
     --smsgs;
-    sbytes -= b->dataCount;
+    sbytes -= b->dataCount();
 
     // Give buffer back
     a.returnBuffer(b);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=954496&r1=954495&r2=954496&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Mon Jun 14 14:50:33 2010
@@ -65,7 +65,6 @@ namespace Rdma {
             // Allocate recv buffer
             Buffer* b = qp->createBuffer(bufferSize);
             buffers.push_front(b);
-            b->dataCount = b->byteCount;
             qp->postRecv(b);
         }
 
@@ -74,8 +73,6 @@ namespace Rdma {
             Buffer* b = qp->createBuffer(bufferSize);
             buffers.push_front(b);
             bufferQueue.push_front(b);
-            b->dataCount = 0;
-            b->dataStart = 0;
         }
     }
 
@@ -410,8 +407,6 @@ namespace Rdma {
                 }
 
                 // At this point the buffer has been consumed so put it back on the recv queue
-                b->dataStart = 0;
-                b->dataCount = 0;
                 qp->postRecv(b);
 
                 // Received another message
@@ -425,8 +420,8 @@ namespace Rdma {
                     if (writable()) {
                         Buffer* ob = getBuffer();
                         // Have to send something as adapters hate it when you try to transfer 0 bytes
-                        *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit);
-                        ob->dataCount = sizeof(uint32_t);
+                        *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(recvCredit);
+                        ob->dataCount(sizeof(uint32_t));
 
                         int creditSent = recvCredit & ~FlagsMask;
                         qp->postSend(creditSent | IgnoreData, ob);
@@ -498,16 +493,12 @@ namespace Rdma {
         assert(!bufferQueue.empty());
         Buffer* b = bufferQueue.front();
         bufferQueue.pop_front();
-        b->dataCount = 0;
-        b->dataStart = 0;
         return b;
     }
 
     void AsynchIO::returnBuffer(Buffer* b) {
         qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
         bufferQueue.push_front(b);
-        b->dataCount = 0;
-        b->dataStart = 0;
     }
 
     ConnectionManager::ConnectionManager(

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp?rev=954496&r1=954495&r2=954496&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp Mon Jun 14 14:50:33 2010
@@ -70,8 +70,8 @@ void idle(ConRec* cr, Rdma::AsynchIO& a)
 void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
     // Echo data back
     Rdma::Buffer* buf = a.getBuffer();
-    std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes);
-    buf->dataCount = b->dataCount;
+    std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes());
+    buf->dataCount(b->dataCount());
     if (cr->queuedWrites.empty()) {
         // If can't write then full will be called and push buffer on back of queue
         a.queueWrite(buf);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp?rev=954496&r1=954495&r2=954496&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp Mon Jun 14 14:50:33 2010
@@ -50,19 +50,20 @@ namespace Rdma {
         return count;
     }
 
-    Buffer::Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
-        bytes(b),
-        byteCount(s),
-        dataStart(0),
-        dataCount(0),
+    Buffer::Buffer(::ibv_pd* pd, const int32_t s) :
+        bufferSize(s),
         mr(CHECK_NULL(::ibv_reg_mr(
-        pd, bytes, byteCount,
+        pd, new char[s], s,
         ::IBV_ACCESS_LOCAL_WRITE)))
-    {}
+    {
+        sge.addr = (uintptr_t) mr->addr;
+        sge.length = 0;
+        sge.lkey = mr->lkey;
+    }
 
     Buffer::~Buffer() {
         (void) ::ibv_dereg_mr(mr);
-        delete [] bytes;
+        delete [] bytes();
     }
 
     QueuePairEvent::QueuePairEvent() :
@@ -106,7 +107,7 @@ namespace Rdma {
 
     Buffer* QueuePairEvent::getBuffer() const {
         Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
-        b->dataCount = wc.byte_len;
+        b->dataCount(wc.byte_len);
         return b;
     }
 
@@ -157,7 +158,7 @@ namespace Rdma {
 
     // Create a buffer to use for writing
     Buffer* QueuePair::createBuffer(int s) {
-        return new Buffer(pd.get(), new char[s], s);
+        return new Buffer(pd.get(), s);
     }
 
     // Make channel non-blocking by making
@@ -213,14 +214,11 @@ namespace Rdma {
 
     void QueuePair::postRecv(Buffer* buf) {
         ::ibv_recv_wr rwr = {};
-        ::ibv_sge sge;
-
-        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
-        sge.length = buf->dataCount;
-        sge.lkey = buf->mr->lkey;
 
         rwr.wr_id = reinterpret_cast<uint64_t>(buf);
-        rwr.sg_list = &sge;
+        // We are given the whole buffer
+        buf->dataCount(buf->byteCount());
+        rwr.sg_list = &buf->sge;
         rwr.num_sge = 1;
 
         ::ibv_recv_wr* badrwr = 0;
@@ -231,16 +229,11 @@ namespace Rdma {
 
     void QueuePair::postSend(Buffer* buf) {
         ::ibv_send_wr swr = {};
-        ::ibv_sge sge;
-
-        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
-        sge.length = buf->dataCount;
-        sge.lkey = buf->mr->lkey;
 
         swr.wr_id = reinterpret_cast<uint64_t>(buf);
         swr.opcode = IBV_WR_SEND;
         swr.send_flags = IBV_SEND_SIGNALED;
-        swr.sg_list = &sge;
+        swr.sg_list = &buf->sge;
         swr.num_sge = 1;
 
         ::ibv_send_wr* badswr = 0;
@@ -251,17 +244,12 @@ namespace Rdma {
 
     void QueuePair::postSend(uint32_t imm, Buffer* buf) {
         ::ibv_send_wr swr = {};
-        ::ibv_sge sge;
-
-        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
-        sge.length = buf->dataCount;
-        sge.lkey = buf->mr->lkey;
-        swr.send_flags = IBV_SEND_SIGNALED;        
 
         swr.wr_id = reinterpret_cast<uint64_t>(buf);
         swr.imm_data = htonl(imm);
         swr.opcode = IBV_WR_SEND_WITH_IMM;
-        swr.sg_list = &sge;
+        swr.send_flags = IBV_SEND_SIGNALED;
+        swr.sg_list = &buf->sge;
         swr.num_sge = 1;
 
         ::ibv_send_wr* badswr = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=954496&r1=954495&r2=954496&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h Mon Jun 14 14:50:33 2010
@@ -45,19 +45,38 @@ namespace Rdma {
 
     struct Buffer {
         friend class QueuePair;
+        friend class QueuePairEvent;
 
-        char* const bytes;
-        const int32_t byteCount;
-        int32_t dataStart;
-        int32_t dataCount;
+        char* bytes() const;
+        int32_t byteCount() const;
+        int32_t dataCount() const;
+        void dataCount(int32_t);
 
-        Buffer(::ibv_pd* pd, char* const b, const int32_t s);
+        Buffer(::ibv_pd* pd, const int32_t s);
         ~Buffer();
 
     private:
+        const int32_t bufferSize;
         ::ibv_mr* mr;
+        ::ibv_sge sge;
     };
 
+    inline char* Buffer::bytes() const {
+      return (char*) sge.addr;
+    }
+
+    inline int32_t Buffer::byteCount() const {
+        return bufferSize;
+    }
+
+    inline int32_t Buffer::dataCount() const {
+        return sge.length;
+    }
+
+    inline void Buffer::dataCount(int32_t s) {
+        sge.length = s;
+    }
+
     class Connection;
 
     enum QueueDirection {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org