You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2010/05/13 04:26:22 UTC
svn commit: r943771 - in /qpid/trunk/qpid/cpp/src/qpid/sys: RdmaIOPlugin.cpp
rdma/RdmaClient.cpp rdma/RdmaServer.cpp rdma/rdma_factories.cpp
rdma/rdma_factories.h rdma/rdma_wrap.cpp rdma/rdma_wrap.h
Author: astitcher
Date: Thu May 13 02:26:21 2010
New Revision: 943771
URL: http://svn.apache.org/viewvc?rev=943771&view=rev
Log:
Rearrange RDMA wrapper class code so that the interface and implementation
are better separated.
Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Thu May 13 02:26:21 2010
@@ -26,6 +26,7 @@
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/sys/SecuritySettings.h"
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp Thu May 13 02:26:21 2010
@@ -19,6 +19,7 @@
*
*/
#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
#include "qpid/sys/Time.h"
#include <netdb.h>
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp Thu May 13 02:26:21 2010
@@ -19,6 +19,7 @@
*
*/
#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/rdma/rdma_exception.h"
#include <arpa/inet.h>
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp Thu May 13 02:26:21 2010
@@ -20,45 +20,77 @@
*/
#include "qpid/sys/rdma/rdma_factories.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+
+
namespace Rdma {
+ // Intentionally ignore return values for these functions
+ // - we can't do anything about then anyway
void acker(::rdma_cm_event* e) throw () {
- if (e)
- // Intentionally ignore return value - we can't do anything about it here
- (void) ::rdma_ack_cm_event(e);
+ if (e) (void) ::rdma_ack_cm_event(e);
}
void destroyEChannel(::rdma_event_channel* c) throw () {
- if (c)
- // Intentionally ignore return value - we can't do anything about it here
- (void) ::rdma_destroy_event_channel(c);
+ if (c) (void) ::rdma_destroy_event_channel(c);
}
void destroyId(::rdma_cm_id* i) throw () {
- if (i)
- // Intentionally ignore return value - we can't do anything about it here
- (void) ::rdma_destroy_id(i);
+ if (i) (void) ::rdma_destroy_id(i);
}
void deallocPd(::ibv_pd* p) throw () {
- if (p)
- // Intentionally ignore return value - we can't do anything about it here
- (void) ::ibv_dealloc_pd(p);
+ if (p) (void) ::ibv_dealloc_pd(p);
}
void destroyCChannel(::ibv_comp_channel* c) throw () {
- if (c)
- // Intentionally ignore return value - we can't do anything about it here
- (void) ::ibv_destroy_comp_channel(c);
+ if (c) (void) ::ibv_destroy_comp_channel(c);
}
void destroyCq(::ibv_cq* cq) throw () {
- if (cq)
- (void) ::ibv_destroy_cq(cq);
+ if (cq) (void) ::ibv_destroy_cq(cq);
}
void destroyQp(::ibv_qp* qp) throw () {
- if (qp)
- (void) ::ibv_destroy_qp(qp);
+ if (qp) (void) ::ibv_destroy_qp(qp);
+ }
+
+ boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i) {
+ return boost::shared_ptr< ::rdma_cm_id >(i, destroyId);
+ }
+
+ boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e) {
+ return boost::shared_ptr< ::rdma_cm_event >(e, acker);
+ }
+
+ boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp) {
+ return boost::shared_ptr< ::ibv_qp > (qp, destroyQp);
+ }
+
+ boost::shared_ptr< ::rdma_event_channel > mkEChannel() {
+ ::rdma_event_channel* c = CHECK_NULL(::rdma_create_event_channel());
+ return boost::shared_ptr< ::rdma_event_channel >(c, destroyEChannel);
}
+ boost::shared_ptr< ::rdma_cm_id >
+ mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) {
+ ::rdma_cm_id* i;
+ CHECK(::rdma_create_id(ec, &i, context, ps));
+ return mkId(i);
+ }
+
+ boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) {
+ ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c));
+ return boost::shared_ptr< ::ibv_pd >(pd, deallocPd);
+ }
+
+ boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) {
+ ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c));
+ return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel);
+ }
+
+ boost::shared_ptr< ::ibv_cq >
+ mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) {
+ ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0));
+ return boost::shared_ptr< ::ibv_cq >(cq, destroyCq);
+ }
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h Thu May 13 02:26:21 2010
@@ -21,49 +21,19 @@
#ifndef RDMA_FACTORIES_H
#define RDMA_FACTORIES_H
-#include "qpid/sys/rdma/rdma_exception.h"
-
#include <rdma/rdma_cma.h>
#include <boost/shared_ptr.hpp>
namespace Rdma {
- // These allow us to use simple shared_ptrs to do ref counting
- void acker(::rdma_cm_event* e) throw ();
- void destroyEChannel(::rdma_event_channel* c) throw ();
- void destroyId(::rdma_cm_id* i) throw ();
- void deallocPd(::ibv_pd* p) throw ();
- void destroyCChannel(::ibv_comp_channel* c) throw ();
- void destroyCq(::ibv_cq* cq) throw ();
- void destroyQp(::ibv_qp* qp) throw ();
-
- inline boost::shared_ptr< ::rdma_event_channel > mkEChannel() {
- ::rdma_event_channel* c = CHECK_NULL(::rdma_create_event_channel());
- return boost::shared_ptr< ::rdma_event_channel >(c, destroyEChannel);
- }
-
- inline boost::shared_ptr< ::rdma_cm_id >
- mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) {
- ::rdma_cm_id* i;
- CHECK(::rdma_create_id(ec, &i, context, ps));
- return boost::shared_ptr< ::rdma_cm_id >(i, destroyId);
- }
-
- inline boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) {
- ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c));
- return boost::shared_ptr< ::ibv_pd >(pd, deallocPd);
- }
-
- inline boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c) {
- ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c));
- return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel);
- }
-
- inline boost::shared_ptr< ::ibv_cq >
- mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) {
- ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0));
- return boost::shared_ptr< ::ibv_cq >(cq, destroyCq);
- }
+ boost::shared_ptr< ::rdma_event_channel > mkEChannel();
+ boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps);
+ boost::shared_ptr< ::rdma_cm_id > mkId(::rdma_cm_id* i);
+ boost::shared_ptr< ::rdma_cm_event > mkEvent(::rdma_cm_event* e);
+ boost::shared_ptr< ::ibv_qp > mkQp(::ibv_qp* qp);
+ boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c);
+ boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* c);
+ boost::shared_ptr< ::ibv_cq > mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc);
}
#endif // RDMA_FACTORIES_H
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp Thu May 13 02:26:21 2010
@@ -21,6 +21,17 @@
#include "qpid/sys/rdma/rdma_wrap.h"
+#include "qpid/sys/rdma/rdma_factories.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include <fcntl.h>
+#include <netdb.h>
+
+#include <iostream>
+#include <stdexcept>
+
namespace Rdma {
const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
0, // .private_data
@@ -39,20 +50,64 @@ namespace Rdma {
return count;
}
- ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
- // It's badly documented, but it seems from the librdma source code that all the following
- // event types have a valid param.conn
- switch (event->event) {
- case RDMA_CM_EVENT_CONNECT_REQUEST:
- case RDMA_CM_EVENT_ESTABLISHED:
- case RDMA_CM_EVENT_REJECTED:
- case RDMA_CM_EVENT_DISCONNECTED:
- case RDMA_CM_EVENT_CONNECT_ERROR:
- return event->param.conn;
- default:
- ::rdma_conn_param p = {};
- return p;
- }
+ Buffer::Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
+ bytes(b),
+ byteCount(s),
+ dataStart(0),
+ dataCount(0),
+ mr(CHECK_NULL(::ibv_reg_mr(
+ pd, bytes, byteCount,
+ ::IBV_ACCESS_LOCAL_WRITE)))
+ {}
+
+ Buffer::~Buffer() {
+ (void) ::ibv_dereg_mr(mr);
+ delete [] bytes;
+ }
+
+ QueuePairEvent::QueuePairEvent() :
+ dir(NONE)
+ {}
+
+ QueuePairEvent::QueuePairEvent(
+ const ::ibv_wc& w,
+ boost::shared_ptr< ::ibv_cq > c,
+ QueueDirection d) :
+ cq(c),
+ wc(w),
+ dir(d)
+ {
+ assert(dir != NONE);
+ }
+
+ QueuePairEvent::operator bool() const {
+ return dir != NONE;
+ }
+
+ bool QueuePairEvent::immPresent() const {
+ return wc.wc_flags & IBV_WC_WITH_IMM;
+ }
+
+ uint32_t QueuePairEvent::getImm() const {
+ return ntohl(wc.imm_data);
+ }
+
+ QueueDirection QueuePairEvent::getDirection() const {
+ return dir;
+ }
+
+ ::ibv_wc_opcode QueuePairEvent::getEventType() const {
+ return wc.opcode;
+ }
+
+ ::ibv_wc_status QueuePairEvent::getEventStatus() const {
+ return wc.status;
+ }
+
+ Buffer* QueuePairEvent::getBuffer() const {
+ Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
+ b->dataCount = wc.byte_len;
+ return b;
}
QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
@@ -84,7 +139,7 @@ namespace Rdma {
qp_attr.qp_type = IBV_QPT_RC;
CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr));
- qp = boost::shared_ptr< ::ibv_qp >(i->qp, destroyQp);
+ qp = mkQp(i->qp);
// Set the qp context to this so we can find ourselves again
qp->qp_context = this;
@@ -100,6 +155,62 @@ namespace Rdma {
qp->qp_context = 0;
}
+ // Create a buffer to use for writing
+ Buffer* QueuePair::createBuffer(int s) {
+ return new Buffer(pd.get(), new char[s], s);
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void QueuePair::nonblocking() {
+ ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ QueuePair::intrusive_ptr QueuePair::getNextChannelEvent() {
+ // First find out which cq has the event
+ ::ibv_cq* cq;
+ void* ctx;
+ int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
+ if (rc == -1 && errno == EAGAIN)
+ return 0;
+ CHECK(rc);
+
+ // Batch acknowledge the event
+ if (cq == scq.get()) {
+ if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingSendEvents);
+ outstandingSendEvents = 0;
+ }
+ } else if (cq == rcq.get()) {
+ if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingRecvEvents);
+ outstandingRecvEvents = 0;
+ }
+ }
+
+ return static_cast<QueuePair*>(ctx);
+ }
+
+ QueuePairEvent QueuePair::getNextEvent() {
+ ::ibv_wc w;
+ if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, scq, SEND);
+ else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, rcq, RECV);
+ else
+ return QueuePairEvent();
+ }
+
+ void QueuePair::notifyRecv() {
+ CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
+ }
+
+ void QueuePair::notifySend() {
+ CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
+ }
+
void QueuePair::postRecv(Buffer* buf) {
::ibv_recv_wr rwr = {};
::ibv_sge sge;
@@ -158,6 +269,229 @@ namespace Rdma {
if (badswr)
throw std::logic_error("ibv_post_send(): Bad swr");
}
+
+ ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
+ id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
+ Connection::find(e->id) : new Connection(e->id)),
+ listen_id(Connection::find(e->listen_id)),
+ event(mkEvent(e))
+ {}
+
+ ConnectionEvent::operator bool() const {
+ return event;
+ }
+
+ ::rdma_cm_event_type ConnectionEvent::getEventType() const {
+ return event->event;
+ }
+
+ ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
+ // It's badly documented, but it seems from the librdma source code that all the following
+ // event types have a valid param.conn
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ case RDMA_CM_EVENT_ESTABLISHED:
+ case RDMA_CM_EVENT_REJECTED:
+ case RDMA_CM_EVENT_DISCONNECTED:
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ return event->param.conn;
+ default:
+ ::rdma_conn_param p = {};
+ return p;
+ }
+ }
+
+ boost::intrusive_ptr<Connection> ConnectionEvent::getConnection () const {
+ return id;
+ }
+
+ boost::intrusive_ptr<Connection> ConnectionEvent::getListenId() const {
+ return listen_id;
+ }
+
+ // Wrap the passed in rdma_cm_id with a Connection
+ // this basically happens only on connection request
+ Connection::Connection(::rdma_cm_id* i) :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ id(mkId(i)),
+ context(0)
+ {
+ impl->fd = id->channel->fd;
+
+ // Just overwrite the previous context as it will
+ // have come from the listening connection
+ if (i)
+ i->context = this;
+ }
+
+ Connection::Connection() :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ channel(mkEChannel()),
+ id(mkId(channel.get(), this, RDMA_PS_TCP)),
+ context(0)
+ {
+ impl->fd = channel->fd;
+ }
+
+ Connection::~Connection() {
+ // Reset the id context in case someone else has it
+ id->context = 0;
+ }
+
+ void Connection::ensureQueuePair() {
+ assert(id.get());
+
+ // Only allocate a queue pair if there isn't one already
+ if (qp)
+ return;
+
+ qp = new QueuePair(id);
+ }
+
+ Connection::intrusive_ptr Connection::make() {
+ return new Connection();
+ }
+
+ Connection::intrusive_ptr Connection::find(::rdma_cm_id* i) {
+ if (!i)
+ return 0;
+ Connection* id = static_cast< Connection* >(i->context);
+ if (!id)
+ throw std::logic_error("Couldn't find existing Connection");
+ return id;
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void Connection::nonblocking() {
+ assert(id.get());
+ ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ ConnectionEvent Connection::getNextEvent() {
+ assert(id.get());
+ ::rdma_cm_event* e;
+ int rc = ::rdma_get_cm_event(id->channel, &e);
+ if (GETERR(rc) == EAGAIN)
+ return ConnectionEvent();
+ CHECK(rc);
+ return ConnectionEvent(e);
+ }
+
+ void Connection::bind(const qpid::sys::SocketAddress& src_addr) const {
+ assert(id.get());
+ CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr));
+ }
+
+ void Connection::listen(int backlog) const {
+ assert(id.get());
+ CHECK(::rdma_listen(id.get(), backlog));
+ }
+
+ void Connection::resolve_addr(
+ const qpid::sys::SocketAddress& dst_addr,
+ int timeout_ms) const
+ {
+ assert(id.get());
+ CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms));
+ }
+
+ void Connection::resolve_route(int timeout_ms) const {
+ assert(id.get());
+ CHECK(::rdma_resolve_route(id.get(), timeout_ms));
+ }
+
+ void Connection::disconnect() const {
+ assert(id.get());
+ int rc = ::rdma_disconnect(id.get());
+ // iWarp doesn't let you disconnect a disconnected connection
+ // but Infiniband can do so it's okay to call rdma_disconnect()
+ // in response to a disconnect event, but we may get an error
+ if (GETERR(rc) == EINVAL)
+ return;
+ CHECK(rc);
+ }
+
+ // TODO: Currently you can only connect with the default connection parameters
+ void Connection::connect(const void* data, size_t len) {
+ assert(id.get());
+ // Need to have a queue pair before we can connect
+ ensureQueuePair();
+
+ ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
+ p.private_data = data;
+ p.private_data_len = len;
+ CHECK(::rdma_connect(id.get(), &p));
+ }
+
+ void Connection::connect() {
+ connect(0, 0);
+ }
+
+ void Connection::accept(const ::rdma_conn_param& param, const void* data, size_t len) {
+ assert(id.get());
+ // Need to have a queue pair before we can accept
+ ensureQueuePair();
+
+ ::rdma_conn_param p = param;
+ p.private_data = data;
+ p.private_data_len = len;
+ CHECK(::rdma_accept(id.get(), &p));
+ }
+
+ void Connection::accept(const ::rdma_conn_param& param) {
+ accept(param, 0, 0);
+ }
+
+ void Connection::reject(const void* data, size_t len) const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), data, len));
+ }
+
+ void Connection::reject() const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), 0, 0));
+ }
+
+ QueuePair::intrusive_ptr Connection::getQueuePair() {
+ assert(id.get());
+
+ ensureQueuePair();
+
+ return qp;
+ }
+
+ std::string Connection::getLocalName() const {
+ ::sockaddr* addr = ::rdma_get_local_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
+
+ std::string Connection::getPeerName() const {
+ ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
}
std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=943771&r1=943770&r2=943771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h Thu May 13 02:26:21 2010
@@ -21,25 +21,19 @@
#ifndef RDMA_WRAP_H
#define RDMA_WRAP_H
-#include "qpid/sys/rdma/rdma_factories.h"
-
#include <rdma/rdma_cma.h>
#include "qpid/RefCounted.h"
#include "qpid/sys/IOHandle.h"
-#include "qpid/sys/posix/PrivatePosix.h"
-
-#include <fcntl.h>
-
-#include <netdb.h>
-#include <vector>
-#include <algorithm>
-#include <iostream>
-#include <stdexcept>
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
+namespace qpid {
+namespace sys {
+ class SocketAddress;
+}}
+
namespace Rdma {
const int DEFAULT_TIMEOUT = 2000; // 2 secs
const int DEFAULT_BACKLOG = 100;
@@ -57,20 +51,8 @@ namespace Rdma {
int32_t dataStart;
int32_t dataCount;
- Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
- bytes(b),
- byteCount(s),
- dataStart(0),
- dataCount(0),
- mr(CHECK_NULL(::ibv_reg_mr(
- pd, bytes, byteCount,
- ::IBV_ACCESS_LOCAL_WRITE)))
- {}
-
- ~Buffer() {
- (void) ::ibv_dereg_mr(mr);
- delete [] bytes;
- }
+ Buffer(::ibv_pd* pd, char* const b, const int32_t s);
+ ~Buffer();
private:
::ibv_mr* mr;
@@ -91,57 +73,28 @@ namespace Rdma {
friend class QueuePair;
- QueuePairEvent() :
- dir(NONE)
- {}
-
+ QueuePairEvent();
QueuePairEvent(
const ::ibv_wc& w,
boost::shared_ptr< ::ibv_cq > c,
- QueueDirection d) :
- cq(c),
- wc(w),
- dir(d)
- {
- assert(dir != NONE);
- }
+ QueueDirection d);
public:
- operator bool() const {
- return dir != NONE;
- }
-
- bool immPresent() const {
- return wc.wc_flags & IBV_WC_WITH_IMM;
- }
-
- uint32_t getImm() const {
- return ntohl(wc.imm_data);
- }
-
- QueueDirection getDirection() const {
- return dir;
- }
-
- ::ibv_wc_opcode getEventType() const {
- return wc.opcode;
- }
-
- ::ibv_wc_status getEventStatus() const {
- return wc.status;
- }
-
- Buffer* getBuffer() const {
- Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
- b->dataCount = wc.byte_len;
- return b;
- }
+ operator bool() const;
+ bool immPresent() const;
+ uint32_t getImm() const;
+ QueueDirection getDirection() const;
+ ::ibv_wc_opcode getEventType() const;
+ ::ibv_wc_status getEventStatus() const;
+ Buffer* getBuffer() const;
};
// Wrapper for a queue pair - this has the functionality for
// putting buffers on the receive queue and for sending buffers
// to the other end of the connection.
class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
+ friend class Connection;
+
boost::shared_ptr< ::ibv_pd > pd;
boost::shared_ptr< ::ibv_comp_channel > cchannel;
boost::shared_ptr< ::ibv_cq > scq;
@@ -150,8 +103,6 @@ namespace Rdma {
int outstandingSendEvents;
int outstandingRecvEvents;
- friend class Connection;
-
QueuePair(boost::shared_ptr< ::rdma_cm_id > id);
~QueuePair();
@@ -159,52 +110,17 @@ namespace Rdma {
typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
// Create a buffer to use for writing
- Buffer* createBuffer(int s) {
- return new Buffer(pd.get(), new char[s], s);
- }
+ Buffer* createBuffer(int s);
// Make channel non-blocking by making
// associated fd nonblocking
- void nonblocking() {
- ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
- }
+ void nonblocking();
// If we get EAGAIN because the channel has been set non blocking
// and we'd have to wait then return an empty event
- QueuePair::intrusive_ptr getNextChannelEvent() {
- // First find out which cq has the event
- ::ibv_cq* cq;
- void* ctx;
- int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
- if (rc == -1 && errno == EAGAIN)
- return 0;
- CHECK(rc);
-
- // Batch acknowledge the event
- if (cq == scq.get()) {
- if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
- ::ibv_ack_cq_events(cq, outstandingSendEvents);
- outstandingSendEvents = 0;
- }
- } else if (cq == rcq.get()) {
- if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
- ::ibv_ack_cq_events(cq, outstandingRecvEvents);
- outstandingRecvEvents = 0;
- }
- }
+ QueuePair::intrusive_ptr getNextChannelEvent();
- return static_cast<QueuePair*>(ctx);
- }
-
- QueuePairEvent getNextEvent() {
- ::ibv_wc w;
- if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
- return QueuePairEvent(w, scq, SEND);
- else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
- return QueuePairEvent(w, rcq, RECV);
- else
- return QueuePairEvent();
- }
+ QueuePairEvent getNextEvent();
void postRecv(Buffer* buf);
void postSend(Buffer* buf);
@@ -227,23 +143,11 @@ namespace Rdma {
// Default copy, assignment and destructor ok
public:
- operator bool() const {
- return event;
- }
-
- ::rdma_cm_event_type getEventType() const {
- return event->event;
- }
-
+ operator bool() const;
+ ::rdma_cm_event_type getEventType() const;
::rdma_conn_param getConnectionParam() const;
-
- boost::intrusive_ptr<Connection> getConnection () const {
- return id;
- }
-
- boost::intrusive_ptr<Connection> getListenId() const {
- return listen_id;
- }
+ boost::intrusive_ptr<Connection> getConnection () const;
+ boost::intrusive_ptr<Connection> getListenId() const;
};
// For the moment this is a fairly simple wrapper for rdma_cm_id.
@@ -264,60 +168,17 @@ namespace Rdma {
// Wrap the passed in rdma_cm_id with a Connection
// this basically happens only on connection request
- Connection(::rdma_cm_id* i) :
- qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
- id(i, destroyId),
- context(0)
- {
- impl->fd = id->channel->fd;
-
- // Just overwrite the previous context as it will
- // have come from the listening connection
- if (i)
- i->context = this;
- }
+ Connection(::rdma_cm_id* i);
+ Connection();
+ ~Connection();
- Connection() :
- qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
- channel(mkEChannel()),
- id(mkId(channel.get(), this, RDMA_PS_TCP)),
- context(0)
- {
- impl->fd = channel->fd;
- }
-
- ~Connection() {
- // Reset the id context in case someone else has it
- id->context = 0;
- }
-
- // Default destructor fine
-
- void ensureQueuePair() {
- assert(id.get());
-
- // Only allocate a queue pair if there isn't one already
- if (qp)
- return;
-
- qp = new QueuePair(id);
- }
+ void ensureQueuePair();
public:
typedef boost::intrusive_ptr<Connection> intrusive_ptr;
- static intrusive_ptr make() {
- return new Connection();
- }
-
- static intrusive_ptr find(::rdma_cm_id* i) {
- if (!i)
- return 0;
- Connection* id = static_cast< Connection* >(i->context);
- if (!id)
- throw std::logic_error("Couldn't find existing Connection");
- return id;
- }
+ static intrusive_ptr make();
+ static intrusive_ptr find(::rdma_cm_id* i);
template <typename T>
void addContext(T* c) {
@@ -333,169 +194,48 @@ namespace Rdma {
// Make channel non-blocking by making
// associated fd nonblocking
- void nonblocking() {
- assert(id.get());
- ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
- }
+ void nonblocking();
// If we get EAGAIN because the channel has been set non blocking
// and we'd have to wait then return an empty event
- ConnectionEvent getNextEvent() {
- assert(id.get());
- ::rdma_cm_event* e;
- int rc = ::rdma_get_cm_event(id->channel, &e);
- if (GETERR(rc) == EAGAIN)
- return ConnectionEvent();
- CHECK(rc);
- return ConnectionEvent(e);
- }
-
- void bind(const qpid::sys::SocketAddress& src_addr) const {
- assert(id.get());
- CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr));
- }
-
- void listen(int backlog = DEFAULT_BACKLOG) const {
- assert(id.get());
- CHECK(::rdma_listen(id.get(), backlog));
- }
+ ConnectionEvent getNextEvent();
+ void bind(const qpid::sys::SocketAddress& src_addr) const;
+ void listen(int backlog = DEFAULT_BACKLOG) const;
void resolve_addr(
const qpid::sys::SocketAddress& dst_addr,
- int timeout_ms = DEFAULT_TIMEOUT) const
- {
- assert(id.get());
- CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms));
- }
-
- void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const {
- assert(id.get());
- CHECK(::rdma_resolve_route(id.get(), timeout_ms));
- }
-
- void disconnect() const {
- assert(id.get());
- int rc = ::rdma_disconnect(id.get());
- // iWarp doesn't let you disconnect a disconnected connection
- // but Infiniband can do so it's okay to call rdma_disconnect()
- // in response to a disconnect event, but we may get an error
- if (GETERR(rc) == EINVAL)
- return;
- CHECK(rc);
- }
+ int timeout_ms = DEFAULT_TIMEOUT) const;
+ void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const;
+ void disconnect() const;
// TODO: Currently you can only connect with the default connection parameters
- void connect() {
- assert(id.get());
-
- // Need to have a queue pair before we can connect
- ensureQueuePair();
-
- ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
- CHECK(::rdma_connect(id.get(), &p));
- }
-
+ void connect(const void* data, size_t len);
+ void connect();
template <typename T>
void connect(const T* data) {
- assert(id.get());
- // Need to have a queue pair before we can connect
- ensureQueuePair();
-
- ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
- p.private_data = data;
- p.private_data_len = sizeof(T);
- CHECK(::rdma_connect(id.get(), &p));
- }
+ connect(data, sizeof(T));
+ }
// TODO: Not sure how to default accept params - they come from the connection request
// event
+ void accept(const ::rdma_conn_param& param, const void* data, size_t len);
+ void accept(const ::rdma_conn_param& param);
template <typename T>
void accept(const ::rdma_conn_param& param, const T* data) {
- assert(id.get());
- // Need to have a queue pair before we can accept
- ensureQueuePair();
-
- ::rdma_conn_param p = param;
- p.private_data = data;
- p.private_data_len = sizeof(T);
- CHECK(::rdma_accept(id.get(), &p));
- }
-
- void accept(const ::rdma_conn_param& param) {
- assert(id.get());
- // Need to have a queue pair before we can accept
- ensureQueuePair();
-
- ::rdma_conn_param p = param;
- p.private_data = 0;
- p.private_data_len = 0;
- CHECK(::rdma_accept(id.get(), &p));
+ accept(param, data, sizeof(T));
}
+ void reject(const void* data, size_t len) const;
+ void reject() const;
template <typename T>
void reject(const T* data) const {
- assert(id.get());
- CHECK(::rdma_reject(id.get(), data, sizeof(T)));
- }
-
- void reject() const {
- assert(id.get());
- CHECK(::rdma_reject(id.get(), 0, 0));
+ reject(data, sizeof(T));
}
- QueuePair::intrusive_ptr getQueuePair() {
- assert(id.get());
-
- ensureQueuePair();
-
- return qp;
- }
-
- std::string getLocalName() const {
- ::sockaddr* addr = ::rdma_get_local_addr(id.get());
- char hostName[NI_MAXHOST];
- char portName[NI_MAXSERV];
- CHECK_IBV(::getnameinfo(
- addr, sizeof(::sockaddr_storage),
- hostName, sizeof(hostName),
- portName, sizeof(portName),
- NI_NUMERICHOST | NI_NUMERICSERV));
- std::string r(hostName);
- r += ":";
- r += portName;
- return r;
- }
-
- std::string getPeerName() const {
- ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
- char hostName[NI_MAXHOST];
- char portName[NI_MAXSERV];
- CHECK_IBV(::getnameinfo(
- addr, sizeof(::sockaddr_storage),
- hostName, sizeof(hostName),
- portName, sizeof(portName),
- NI_NUMERICHOST | NI_NUMERICSERV));
- std::string r(hostName);
- r += ":";
- r += portName;
- return r;
- }
+ QueuePair::intrusive_ptr getQueuePair();
+ std::string getLocalName() const;
+ std::string getPeerName() const;
};
-
- inline void QueuePair::notifyRecv() {
- CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
- }
-
- inline void QueuePair::notifySend() {
- CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
- }
-
- inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
- id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
- Connection::find(e->id) : new Connection(e->id)),
- listen_id(Connection::find(e->listen_id)),
- event(e, acker)
- {}
}
std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org