You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/05/13 04:26:22 UTC

svn commit: r943771 - in /qpid/trunk/qpid/cpp/src/qpid/sys: RdmaIOPlugin.cpp rdma/RdmaClient.cpp rdma/RdmaServer.cpp rdma/rdma_factories.cpp rdma/rdma_factories.h rdma/rdma_wrap.cpp rdma/rdma_wrap.h

Author: astitcher
Date: Thu May 13 02:26:21 2010
New Revision: 943771

URL: http://svn.apache.org/viewvc?rev=943771&view=rev
Log:
Rearrange RDMA wrapper class code so that the interface and implementation
are better separated.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Thu May 13 02:26:21 2010
@@ -26,6 +26,7 @@
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
 #include "qpid/sys/OutputControl.h"
 #include "qpid/sys/SecuritySettings.h"
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp Thu May 13 02:26:21 2010
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
 #include "qpid/sys/Time.h"
 
 #include <netdb.h>

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp Thu May 13 02:26:21 2010
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
 
 #include <arpa/inet.h>
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp Thu May 13 02:26:21 2010
@@ -20,45 +20,77 @@
  */
 #include "qpid/sys/rdma/rdma_factories.h"
 
+#include "qpid/sys/rdma/rdma_exception.h"
+
+
 namespace Rdma {
+    // Intentionally ignore return values for these functions
+    // - we can't do anything about then anyway
     void acker(::rdma_cm_event* e) throw () {
-        if (e)
-            // Intentionally ignore return value - we can't do anything about it here
-            (void) ::rdma_ack_cm_event(e);
+        if (e) (void) ::rdma_ack_cm_event(e);
     }
 
     void destroyEChannel(::rdma_event_channel* c) throw () {
-        if (c)
-            // Intentionally ignore return value - we can't do anything about it here
-            (void) ::rdma_destroy_event_channel(c);
+        if (c) (void) ::rdma_destroy_event_channel(c);
     }
 
     void destroyId(::rdma_cm_id* i) throw () {
-        if (i)
-            // Intentionally ignore return value - we can't do anything about it here
-            (void) ::rdma_destroy_id(i);
+        if (i) (void) ::rdma_destroy_id(i);
     }
 
     void deallocPd(::ibv_pd* p) throw () {
-        if (p)
-            // Intentionally ignore return value - we can't do anything about it here
-            (void) ::ibv_dealloc_pd(p);
+        if (p) (void) ::ibv_dealloc_pd(p);
     }
 
     void destroyCChannel(::ibv_comp_channel* c) throw () {
-        if (c)
-            // Intentionally ignore return value - we can't do anything about it here
-            (void) ::ibv_destroy_comp_channel(c);
+        if (c) (void) ::ibv_destroy_comp_channel(c);
     }
 
     void destroyCq(::ibv_cq* cq) throw () {
-        if (cq)
-            (void) ::ibv_destroy_cq(cq);
+        if (cq) (void) ::ibv_destroy_cq(cq);
     }
 
     void destroyQp(::ibv_qp* qp) throw () {
-        if (qp)
-            (void) ::ibv_destroy_qp(qp);
+        if (qp) (void) ::ibv_destroy_qp(qp);
+    }
+
+    boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i) {
+        return boost::shared_ptr< ::rdma_cm_id >(i, destroyId);
+    }
+    
+    boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e) {
+        return boost::shared_ptr< ::rdma_cm_event >(e, acker);
+    }
+    
+    boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp) {
+    	return boost::shared_ptr< ::ibv_qp > (qp, destroyQp);
+    }
+
+    boost::shared_ptr< ::rdma_event_channel > mkEChannel() {
+        ::rdma_event_channel* c = CHECK_NULL(::rdma_create_event_channel());
+        return boost::shared_ptr< ::rdma_event_channel >(c, destroyEChannel);
     }
 
+    boost::shared_ptr< ::rdma_cm_id >
+    mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) {
+        ::rdma_cm_id* i;
+        CHECK(::rdma_create_id(ec, &i, context, ps));
+        return mkId(i);
+    }
+
+    boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) {
+        ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c));
+        return boost::shared_ptr< ::ibv_pd >(pd, deallocPd);
+    }
+
+    boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) {
+        ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c));
+        return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel);
+    }
+
+    boost::shared_ptr< ::ibv_cq >
+    mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) {
+        ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0));
+        return boost::shared_ptr< ::ibv_cq >(cq, destroyCq);
+    }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h Thu May 13 02:26:21 2010
@@ -21,49 +21,19 @@
 #ifndef RDMA_FACTORIES_H
 #define RDMA_FACTORIES_H
 
-#include "qpid/sys/rdma/rdma_exception.h"
-
 #include <rdma/rdma_cma.h>
 
 #include <boost/shared_ptr.hpp>
 
 namespace Rdma {
-    // These allow us to use simple shared_ptrs to do ref counting
-    void acker(::rdma_cm_event* e) throw ();
-    void destroyEChannel(::rdma_event_channel* c) throw ();
-    void destroyId(::rdma_cm_id* i) throw ();
-    void deallocPd(::ibv_pd* p) throw ();
-    void destroyCChannel(::ibv_comp_channel* c) throw ();
-    void destroyCq(::ibv_cq* cq) throw ();
-    void destroyQp(::ibv_qp* qp) throw ();
-
-    inline boost::shared_ptr< ::rdma_event_channel > mkEChannel() {
-        ::rdma_event_channel* c = CHECK_NULL(::rdma_create_event_channel());
-        return boost::shared_ptr< ::rdma_event_channel >(c, destroyEChannel);
-    }
-
-    inline boost::shared_ptr< ::rdma_cm_id >
-    mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) {
-        ::rdma_cm_id* i;
-        CHECK(::rdma_create_id(ec, &i, context, ps));
-        return boost::shared_ptr< ::rdma_cm_id >(i, destroyId);
-    }
-
-    inline boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) {
-        ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c));
-        return boost::shared_ptr< ::ibv_pd >(pd, deallocPd);
-    }
-
-    inline boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) {
-        ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c));
-        return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel);
-    }
-
-    inline boost::shared_ptr< ::ibv_cq >
-    mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) {
-        ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0));
-        return boost::shared_ptr< ::ibv_cq >(cq, destroyCq);
-    }
+    boost::shared_ptr< ::rdma_event_channel > mkEChannel();
+    boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps);
+    boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i);
+    boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e);
+    boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp);
+    boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c);
+    boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c);
+    boost::shared_ptr< ::ibv_cq > mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc);
 }
 
 #endif // RDMA_FACTORIES_H

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp Thu May 13 02:26:21 2010
@@ -21,6 +21,17 @@
 
 #include "qpid/sys/rdma/rdma_wrap.h"
 
+#include "qpid/sys/rdma/rdma_factories.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include <fcntl.h>
+#include <netdb.h>
+
+#include <iostream>
+#include <stdexcept>
+
 namespace Rdma {
     const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
         0,    // .private_data
@@ -39,20 +50,64 @@ namespace Rdma {
         return count;
     }
 
-    ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
-        // It's badly documented, but it seems from the librdma source code that all the following
-        // event types have a valid param.conn
-        switch (event->event) {
-        case RDMA_CM_EVENT_CONNECT_REQUEST:
-        case RDMA_CM_EVENT_ESTABLISHED:
-        case RDMA_CM_EVENT_REJECTED:
-        case RDMA_CM_EVENT_DISCONNECTED:
-        case RDMA_CM_EVENT_CONNECT_ERROR:
-            return event->param.conn;
-        default:
-            ::rdma_conn_param p = {};
-            return p;
-        }
+    Buffer::Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
+        bytes(b),
+        byteCount(s),
+        dataStart(0),
+        dataCount(0),
+        mr(CHECK_NULL(::ibv_reg_mr(
+        pd, bytes, byteCount,
+        ::IBV_ACCESS_LOCAL_WRITE)))
+    {}
+
+    Buffer::~Buffer() {
+        (void) ::ibv_dereg_mr(mr);
+        delete [] bytes;
+    }
+
+    QueuePairEvent::QueuePairEvent() :
+        dir(NONE)
+    {}
+
+    QueuePairEvent::QueuePairEvent(
+        const ::ibv_wc& w,
+        boost::shared_ptr< ::ibv_cq > c,
+        QueueDirection d) :
+        cq(c),
+        wc(w),
+        dir(d)
+    {
+        assert(dir != NONE);
+    }
+
+    QueuePairEvent::operator bool() const {
+        return dir != NONE;
+    }
+
+    bool QueuePairEvent::immPresent() const {
+        return wc.wc_flags & IBV_WC_WITH_IMM;
+    }
+
+    uint32_t QueuePairEvent::getImm() const {
+        return ntohl(wc.imm_data);
+    }
+
+    QueueDirection QueuePairEvent::getDirection() const {
+        return dir;
+    }
+
+    ::ibv_wc_opcode QueuePairEvent::getEventType() const {
+        return wc.opcode;
+    }
+
+    ::ibv_wc_status QueuePairEvent::getEventStatus() const {
+        return wc.status;
+    }
+
+    Buffer* QueuePairEvent::getBuffer() const {
+        Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
+        b->dataCount = wc.byte_len;
+        return b;
     }
 
     QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
@@ -84,7 +139,7 @@ namespace Rdma {
         qp_attr.qp_type      = IBV_QPT_RC;
 
         CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr));
-        qp = boost::shared_ptr< ::ibv_qp >(i->qp, destroyQp);
+        qp = mkQp(i->qp);
 
         // Set the qp context to this so we can find ourselves again
         qp->qp_context = this;
@@ -100,6 +155,62 @@ namespace Rdma {
         qp->qp_context = 0;
     }
 
+    // Create a buffer to use for writing
+    Buffer* QueuePair::createBuffer(int s) {
+        return new Buffer(pd.get(), new char[s], s);
+    }
+
+    // Make channel non-blocking by making
+    // associated fd nonblocking
+    void QueuePair::nonblocking() {
+        ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
+    }
+
+    // If we get EAGAIN because the channel has been set non blocking
+    // and we'd have to wait then return an empty event
+    QueuePair::intrusive_ptr QueuePair::getNextChannelEvent() {
+        // First find out which cq has the event
+        ::ibv_cq* cq;
+        void* ctx;
+        int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
+        if (rc == -1 && errno == EAGAIN)
+            return 0;
+        CHECK(rc);
+
+        // Batch acknowledge the event
+        if (cq == scq.get()) {
+            if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
+                ::ibv_ack_cq_events(cq, outstandingSendEvents);
+                outstandingSendEvents = 0;
+            }
+        } else if (cq == rcq.get()) {
+            if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
+                ::ibv_ack_cq_events(cq, outstandingRecvEvents);
+                outstandingRecvEvents = 0;
+            }
+        }
+
+        return static_cast<QueuePair*>(ctx);
+    }
+
+    QueuePairEvent QueuePair::getNextEvent() {
+        ::ibv_wc w;
+        if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
+            return QueuePairEvent(w, scq, SEND);
+        else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
+            return QueuePairEvent(w, rcq, RECV);
+        else
+            return QueuePairEvent();
+    }
+
+    void QueuePair::notifyRecv() {
+        CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
+    }
+
+    void QueuePair::notifySend() {
+        CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
+    }
+
     void QueuePair::postRecv(Buffer* buf) {
         ::ibv_recv_wr rwr = {};
         ::ibv_sge sge;
@@ -158,6 +269,229 @@ namespace Rdma {
         if (badswr)
             throw std::logic_error("ibv_post_send(): Bad swr");
     }
+
+    ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
+        id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
+                Connection::find(e->id) : new Connection(e->id)),
+        listen_id(Connection::find(e->listen_id)),
+        event(mkEvent(e))
+    {}
+
+    ConnectionEvent::operator bool() const {
+        return event;
+    }
+
+    ::rdma_cm_event_type ConnectionEvent::getEventType() const {
+        return event->event;
+    }
+
+    ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
+        // It's badly documented, but it seems from the librdma source code that all the following
+        // event types have a valid param.conn
+        switch (event->event) {
+        case RDMA_CM_EVENT_CONNECT_REQUEST:
+        case RDMA_CM_EVENT_ESTABLISHED:
+        case RDMA_CM_EVENT_REJECTED:
+        case RDMA_CM_EVENT_DISCONNECTED:
+        case RDMA_CM_EVENT_CONNECT_ERROR:
+            return event->param.conn;
+        default:
+            ::rdma_conn_param p = {};
+            return p;
+        }
+    }
+
+    boost::intrusive_ptr<Connection> ConnectionEvent::getConnection () const {
+        return id;
+    }
+
+    boost::intrusive_ptr<Connection> ConnectionEvent::getListenId() const {
+        return listen_id;
+    }
+
+    // Wrap the passed in rdma_cm_id with a Connection
+    // this basically happens only on connection request
+    Connection::Connection(::rdma_cm_id* i) :
+        qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+        id(mkId(i)),
+        context(0)
+    {
+        impl->fd = id->channel->fd;
+
+        // Just overwrite the previous context as it will
+        // have come from the listening connection
+        if (i)
+            i->context = this;
+    }
+
+    Connection::Connection() :
+        qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+        channel(mkEChannel()),
+        id(mkId(channel.get(), this, RDMA_PS_TCP)),
+        context(0)
+    {
+        impl->fd = channel->fd;
+    }
+
+    Connection::~Connection() {
+        // Reset the id context in case someone else has it
+        id->context = 0;
+    }
+
+    void Connection::ensureQueuePair() {
+        assert(id.get());
+
+        // Only allocate a queue pair if there isn't one already
+        if (qp)
+            return;
+
+        qp = new QueuePair(id);
+    }
+
+    Connection::intrusive_ptr Connection::make() {
+        return new Connection();
+    }
+
+    Connection::intrusive_ptr Connection::find(::rdma_cm_id* i) {
+        if (!i)
+            return 0;
+        Connection* id = static_cast< Connection* >(i->context);
+        if (!id)
+            throw std::logic_error("Couldn't find existing Connection");
+        return id;
+    }
+
+    // Make channel non-blocking by making
+    // associated fd nonblocking
+    void Connection::nonblocking() {
+        assert(id.get());
+        ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
+    }
+
+    // If we get EAGAIN because the channel has been set non blocking
+    // and we'd have to wait then return an empty event
+    ConnectionEvent Connection::getNextEvent() {
+        assert(id.get());
+        ::rdma_cm_event* e;
+        int rc = ::rdma_get_cm_event(id->channel, &e);
+        if (GETERR(rc) == EAGAIN)
+            return ConnectionEvent();
+        CHECK(rc);
+        return ConnectionEvent(e);
+    }
+
+    void Connection::bind(const qpid::sys::SocketAddress& src_addr) const {
+        assert(id.get());
+        CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr));
+    }
+
+    void Connection::listen(int backlog) const {
+        assert(id.get());
+        CHECK(::rdma_listen(id.get(), backlog));
+    }
+
+    void Connection::resolve_addr(
+        const qpid::sys::SocketAddress& dst_addr,
+        int timeout_ms) const
+    {
+        assert(id.get());
+        CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms));
+    }
+
+    void Connection::resolve_route(int timeout_ms) const {
+        assert(id.get());
+        CHECK(::rdma_resolve_route(id.get(), timeout_ms));
+    }
+
+    void Connection::disconnect() const {
+        assert(id.get());
+        int rc = ::rdma_disconnect(id.get());
+        // iWarp doesn't let you disconnect a disconnected connection
+        // but Infiniband can do so it's okay to call rdma_disconnect()
+        // in response to a disconnect event, but we may get an error
+        if (GETERR(rc) == EINVAL)
+            return;
+        CHECK(rc);
+    }
+
+    // TODO: Currently you can only connect with the default connection parameters
+    void Connection::connect(const void* data, size_t len) {
+        assert(id.get());
+        // Need to have a queue pair before we can connect
+        ensureQueuePair();
+
+        ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
+        p.private_data = data;
+        p.private_data_len = len;
+        CHECK(::rdma_connect(id.get(), &p));
+    }
+
+    void Connection::connect() {
+        connect(0, 0);
+    }
+
+    void Connection::accept(const ::rdma_conn_param& param, const void* data, size_t len) {
+        assert(id.get());
+        // Need to have a queue pair before we can accept
+        ensureQueuePair();
+
+        ::rdma_conn_param p = param;
+        p.private_data = data;
+        p.private_data_len = len;
+        CHECK(::rdma_accept(id.get(), &p));
+    }
+
+    void Connection::accept(const ::rdma_conn_param& param) {
+        accept(param, 0, 0);
+    }
+
+    void Connection::reject(const void* data, size_t len) const {
+        assert(id.get());
+        CHECK(::rdma_reject(id.get(), data, len));
+    }
+
+    void Connection::reject() const {
+        assert(id.get());
+        CHECK(::rdma_reject(id.get(), 0, 0));
+    }
+
+    QueuePair::intrusive_ptr Connection::getQueuePair() {
+        assert(id.get());
+
+        ensureQueuePair();
+
+        return qp;
+    }
+
+    std::string Connection::getLocalName() const {
+        ::sockaddr* addr = ::rdma_get_local_addr(id.get());
+        char hostName[NI_MAXHOST];
+        char portName[NI_MAXSERV];
+        CHECK_IBV(::getnameinfo(
+        addr, sizeof(::sockaddr_storage),
+        hostName, sizeof(hostName),
+        portName, sizeof(portName),
+        NI_NUMERICHOST | NI_NUMERICSERV));
+        std::string r(hostName);
+        r += ":";
+        r += portName;
+        return r;
+    }
+
+    std::string Connection::getPeerName() const {
+        ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
+        char hostName[NI_MAXHOST];
+        char portName[NI_MAXSERV];
+        CHECK_IBV(::getnameinfo(
+        addr, sizeof(::sockaddr_storage),
+        hostName, sizeof(hostName),
+        portName, sizeof(portName),
+        NI_NUMERICHOST | NI_NUMERICSERV));
+        std::string r(hostName);
+        r += ":";
+        r += portName;
+        return r;
+    }
 }
 
 std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h Thu May 13 02:26:21 2010
@@ -21,25 +21,19 @@
 #ifndef RDMA_WRAP_H
 #define RDMA_WRAP_H
 
-#include "qpid/sys/rdma/rdma_factories.h"
-
 #include <rdma/rdma_cma.h>
 
 #include "qpid/RefCounted.h"
 #include "qpid/sys/IOHandle.h"
-#include "qpid/sys/posix/PrivatePosix.h"
-
-#include <fcntl.h>
-
-#include <netdb.h>
 
-#include <vector>
-#include <algorithm>
-#include <iostream>
-#include <stdexcept>
 #include <boost/shared_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
 
+namespace qpid {
+namespace sys {
+    class SocketAddress;
+}}
+
 namespace Rdma {
     const int DEFAULT_TIMEOUT = 2000; // 2 secs
     const int DEFAULT_BACKLOG = 100;
@@ -57,20 +51,8 @@ namespace Rdma {
         int32_t dataStart;
         int32_t dataCount;
 
-        Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
-            bytes(b),
-            byteCount(s),
-            dataStart(0),
-            dataCount(0),
-            mr(CHECK_NULL(::ibv_reg_mr(
-                pd, bytes, byteCount,
-                ::IBV_ACCESS_LOCAL_WRITE)))
-        {}
-
-        ~Buffer() {
-            (void) ::ibv_dereg_mr(mr);
-            delete [] bytes;
-        }
+        Buffer(::ibv_pd* pd, char* const b, const int32_t s);
+        ~Buffer();
 
     private:
         ::ibv_mr* mr;
@@ -91,57 +73,28 @@ namespace Rdma {
 
         friend class QueuePair;
 
-        QueuePairEvent() :
-            dir(NONE)
-        {}
-
+        QueuePairEvent();
         QueuePairEvent(
             const ::ibv_wc& w,
             boost::shared_ptr< ::ibv_cq > c,
-            QueueDirection d) :
-            cq(c),
-            wc(w),
-            dir(d)
-        {
-            assert(dir != NONE);
-        }
+            QueueDirection d);
 
     public:
-        operator bool() const {
-            return dir != NONE;
-        }
-
-        bool immPresent() const {
-            return wc.wc_flags & IBV_WC_WITH_IMM;
-        }
-        
-        uint32_t getImm() const {
-            return ntohl(wc.imm_data);
-        }
-
-        QueueDirection getDirection() const {
-            return dir;
-        }
-
-        ::ibv_wc_opcode getEventType() const {
-            return wc.opcode;
-        }
-
-        ::ibv_wc_status getEventStatus() const {
-            return wc.status;
-        }
-
-        Buffer* getBuffer() const {
-            Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
-            b->dataCount = wc.byte_len;
-            return b;
-        }
+        operator bool() const;
+        bool immPresent() const;
+        uint32_t getImm() const;
+        QueueDirection getDirection() const;
+        ::ibv_wc_opcode getEventType() const;
+        ::ibv_wc_status getEventStatus() const;
+        Buffer* getBuffer() const;
     };
 
     // Wrapper for a queue pair - this has the functionality for
     // putting buffers on the receive queue and for sending buffers
     // to the other end of the connection.
     class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
+        friend class Connection;
+
         boost::shared_ptr< ::ibv_pd > pd;
         boost::shared_ptr< ::ibv_comp_channel > cchannel;
         boost::shared_ptr< ::ibv_cq > scq;
@@ -150,8 +103,6 @@ namespace Rdma {
         int outstandingSendEvents;
         int outstandingRecvEvents;
 
-        friend class Connection;
-
         QueuePair(boost::shared_ptr< ::rdma_cm_id > id);
         ~QueuePair();
 
@@ -159,52 +110,17 @@ namespace Rdma {
         typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
 
         // Create a buffer to use for writing
-        Buffer* createBuffer(int s) {
-            return new Buffer(pd.get(), new char[s], s);
-        }
+        Buffer* createBuffer(int s);
 
         // Make channel non-blocking by making
         // associated fd nonblocking
-        void nonblocking() {
-            ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
-        }
+        void nonblocking();
 
         // If we get EAGAIN because the channel has been set non blocking
         // and we'd have to wait then return an empty event
-        QueuePair::intrusive_ptr getNextChannelEvent() {
-            // First find out which cq has the event
-            ::ibv_cq* cq;
-            void* ctx;
-            int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
-            if (rc == -1 && errno == EAGAIN)
-                return 0;
-            CHECK(rc);
-
-            // Batch acknowledge the event
-            if (cq == scq.get()) {
-                if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
-                    ::ibv_ack_cq_events(cq, outstandingSendEvents);
-                    outstandingSendEvents = 0;
-                }
-            } else if (cq == rcq.get()) {
-                if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
-                    ::ibv_ack_cq_events(cq, outstandingRecvEvents);
-                    outstandingRecvEvents = 0;
-                }
-            }
+        QueuePair::intrusive_ptr getNextChannelEvent();
 
-            return static_cast<QueuePair*>(ctx);
-        }
-
-        QueuePairEvent getNextEvent() {
-            ::ibv_wc w;
-            if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
-                return QueuePairEvent(w, scq, SEND);
-            else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
-                return QueuePairEvent(w, rcq, RECV);
-            else
-                return QueuePairEvent();
-        }
+        QueuePairEvent getNextEvent();
 
         void postRecv(Buffer* buf);
         void postSend(Buffer* buf);
@@ -227,23 +143,11 @@ namespace Rdma {
 
         // Default copy, assignment and destructor ok
     public:
-        operator bool() const {
-            return event;
-        }
-
-        ::rdma_cm_event_type getEventType() const {
-            return event->event;
-        }
-
+        operator bool() const;
+        ::rdma_cm_event_type getEventType() const;
         ::rdma_conn_param getConnectionParam() const;
-
-        boost::intrusive_ptr<Connection> getConnection () const {
-            return id;
-        }
-
-        boost::intrusive_ptr<Connection> getListenId() const {
-            return listen_id;
-        }
+        boost::intrusive_ptr<Connection> getConnection () const;
+        boost::intrusive_ptr<Connection> getListenId() const;
     };
 
     // For the moment this is a fairly simple wrapper for rdma_cm_id.
@@ -264,60 +168,17 @@ namespace Rdma {
 
         // Wrap the passed in rdma_cm_id with a Connection
         // this basically happens only on connection request
-        Connection(::rdma_cm_id* i) :
-            qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
-            id(i, destroyId),
-            context(0)
-        {
-            impl->fd = id->channel->fd;
-
-            // Just overwrite the previous context as it will
-            // have come from the listening connection
-            if (i)
-                i->context = this;
-        }
+        Connection(::rdma_cm_id* i);
+        Connection();
+        ~Connection();
 
-        Connection() :
-            qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
-            channel(mkEChannel()),
-            id(mkId(channel.get(), this, RDMA_PS_TCP)),
-            context(0)
-        {
-            impl->fd = channel->fd;
-	}
-
-        ~Connection() {
-            // Reset the id context in case someone else has it
-            id->context = 0;
-        }
-
-        // Default destructor fine
-
-        void ensureQueuePair() {
-            assert(id.get());
-
-            // Only allocate a queue pair if there isn't one already
-            if (qp)
-                return;
-
-            qp = new QueuePair(id);
-        }
+        void ensureQueuePair();
 
     public:
         typedef boost::intrusive_ptr<Connection> intrusive_ptr;
 
-        static intrusive_ptr make() {
-            return new Connection();
-        }
-
-        static intrusive_ptr find(::rdma_cm_id* i) {
-            if (!i)
-                return 0;
-            Connection* id = static_cast< Connection* >(i->context);
-            if (!id)
-                throw std::logic_error("Couldn't find existing Connection");
-            return id;
-        }
+        static intrusive_ptr make();
+        static intrusive_ptr find(::rdma_cm_id* i);
 
         template <typename T>
         void addContext(T* c) {
@@ -333,169 +194,48 @@ namespace Rdma {
 
         // Make channel non-blocking by making
         // associated fd nonblocking
-        void nonblocking() {
-            assert(id.get());
-            ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
-        }
+        void nonblocking();
 
         // If we get EAGAIN because the channel has been set non blocking
         // and we'd have to wait then return an empty event
-        ConnectionEvent getNextEvent() {
-            assert(id.get());
-            ::rdma_cm_event* e;
-            int rc = ::rdma_get_cm_event(id->channel, &e);
-            if (GETERR(rc) == EAGAIN)
-                return ConnectionEvent();
-            CHECK(rc);
-            return ConnectionEvent(e);
-        }
-
-        void bind(const qpid::sys::SocketAddress& src_addr) const {
-            assert(id.get());
-            CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr));
-        }
-
-        void listen(int backlog = DEFAULT_BACKLOG) const {
-            assert(id.get());
-            CHECK(::rdma_listen(id.get(), backlog));
-        }
+        ConnectionEvent getNextEvent();
 
+        void bind(const qpid::sys::SocketAddress& src_addr) const;
+        void listen(int backlog = DEFAULT_BACKLOG) const;
         void resolve_addr(
             const qpid::sys::SocketAddress& dst_addr,
-            int timeout_ms = DEFAULT_TIMEOUT) const
-        {
-            assert(id.get());
-            CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms));
-        }
-
-        void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const {
-            assert(id.get());
-            CHECK(::rdma_resolve_route(id.get(), timeout_ms));
-        }
-
-        void disconnect() const {
-            assert(id.get());
-            int rc = ::rdma_disconnect(id.get());
-            // iWarp doesn't let you disconnect a disconnected connection
-            // but Infiniband can do so it's okay to call rdma_disconnect()
-            // in response to a disconnect event, but we may get an error
-            if (GETERR(rc) == EINVAL)
-	        return;
-            CHECK(rc);
-        }
+            int timeout_ms = DEFAULT_TIMEOUT) const;
+        void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const;
+        void disconnect() const;
 
         // TODO: Currently you can only connect with the default connection parameters
-        void connect() {
-            assert(id.get());
-
-            // Need to have a queue pair before we can connect
-            ensureQueuePair();
-
-            ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
-            CHECK(::rdma_connect(id.get(), &p));
-        }
-
+        void connect(const void* data, size_t len);
+        void connect();
         template <typename T>
         void connect(const T* data) {
-            assert(id.get());
-            // Need to have a queue pair before we can connect
-            ensureQueuePair();
-
-            ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
-            p.private_data = data;
-            p.private_data_len = sizeof(T);
-            CHECK(::rdma_connect(id.get(), &p));
-        }
+            connect(data, sizeof(T));
+	}
 
         // TODO: Not sure how to default accept params - they come from the connection request
         // event
+        void accept(const ::rdma_conn_param& param, const void* data, size_t len);
+        void accept(const ::rdma_conn_param& param);
         template <typename T>
         void accept(const ::rdma_conn_param& param, const T* data) {
-            assert(id.get());
-            // Need to have a queue pair before we can accept
-            ensureQueuePair();
-
-            ::rdma_conn_param p = param;
-            p.private_data = data;
-            p.private_data_len = sizeof(T);
-            CHECK(::rdma_accept(id.get(), &p));
-        }
-
-        void accept(const ::rdma_conn_param& param) {
-            assert(id.get());
-            // Need to have a queue pair before we can accept
-            ensureQueuePair();
-
-            ::rdma_conn_param p = param;
-            p.private_data = 0;
-            p.private_data_len = 0;
-            CHECK(::rdma_accept(id.get(), &p));
+            accept(param, data, sizeof(T));
         }
 
+        void reject(const void* data, size_t len) const;
+        void reject() const;
         template <typename T>
         void reject(const T* data) const {
-            assert(id.get());
-            CHECK(::rdma_reject(id.get(), data, sizeof(T)));
-        }
-
-        void reject() const {
-            assert(id.get());
-            CHECK(::rdma_reject(id.get(), 0, 0));
+            reject(data, sizeof(T));
         }
 
-        QueuePair::intrusive_ptr getQueuePair() {
-            assert(id.get());
-
-            ensureQueuePair();
-
-            return qp;
-        }
-        
-        std::string getLocalName() const {
-            ::sockaddr* addr = ::rdma_get_local_addr(id.get());
-            char hostName[NI_MAXHOST];
-            char portName[NI_MAXSERV];
-            CHECK_IBV(::getnameinfo(
-                addr, sizeof(::sockaddr_storage),
-                hostName, sizeof(hostName),
-                portName, sizeof(portName),
-                NI_NUMERICHOST | NI_NUMERICSERV));
-            std::string r(hostName);
-            r += ":";
-            r += portName;
-            return r;
-        }
-        
-        std::string getPeerName() const {
-            ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
-            char hostName[NI_MAXHOST];
-            char portName[NI_MAXSERV];
-            CHECK_IBV(::getnameinfo(
-                addr, sizeof(::sockaddr_storage),
-                hostName, sizeof(hostName),
-                portName, sizeof(portName),
-                NI_NUMERICHOST | NI_NUMERICSERV));
-            std::string r(hostName);
-            r += ":";
-            r += portName;
-            return r;
-        }
+        QueuePair::intrusive_ptr getQueuePair();
+        std::string getLocalName() const;
+        std::string getPeerName() const;
     };
-
-    inline void QueuePair::notifyRecv() {
-        CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
-    }
-
-    inline void QueuePair::notifySend() {
-        CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
-    }
-
-    inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
-        id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
-                Connection::find(e->id) : new Connection(e->id)),
-        listen_id(Connection::find(e->listen_id)),
-        event(e, acker)
-    {}
 }
 
 std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org