You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/09/11 08:16:21 UTC
svn commit: r694143 - in /incubator/qpid/trunk/qpid/cpp: ./ src/
src/qpid/client/ src/qpid/sys/ src/qpid/sys/rdma/
Author: astitcher
Date: Wed Sep 10 23:16:19 2008
New Revision: 694143
URL: http://svn.apache.org/viewvc?rev=694143&view=rev
Log:
Implementation of AMQP over RDMA protocols (Infiniband)
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/configure.ac
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/acl.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Wed Sep 10 23:16:19 2008
@@ -278,7 +278,7 @@
;;
esac],
[
- with_RDMA=no
+ with_RDMA=yes
AC_CHECK_LIB([ibverbs],[ibv_create_qp],,[with_RDMA=no])
AC_CHECK_LIB([rdmacm],[rdma_create_id],,[with_RDMA=no])
AC_CHECK_HEADERS([infiniband/verbs.h],,[with_RDMA=no])
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Sep 10 23:16:19 2008
@@ -127,6 +127,7 @@
qpid/sys/rdma/rdma_factories.cpp \
qpid/sys/rdma/RdmaIO.cpp \
qpid/sys/rdma/RdmaIO.h \
+ qpid/sys/rdma/rdma_wrap.cpp \
qpid/sys/rdma/rdma_wrap.h
librdmawrap_la_LIBADD = \
libqpidcommon.la \
@@ -139,11 +140,31 @@
librdmawrap_la_LDFLAGS = \
-no-undefined
+rdma_la_SOURCES = \
+ qpid/sys/RdmaIOPlugin.cpp
+rdma_la_LIBADD = \
+ libqpidbroker.la \
+ librdmawrap.la
+rdma_la_LDFLAGS = $(PLUGINLDFLAGS)
+rdma_la_CXXFLAGS = \
+ $(AM_CXXFLAGS) -Wno-missing-field-initializers
+dmodule_LTLIBRARIES += \
+ rdma.la
+
+rdmaconnector_la_SOURCES = \
+ qpid/client/RdmaConnector.cpp
+rdmaconnector_la_LIBADD = \
+ libqpidclient.la \
+ librdmawrap.la
+rdmaconnector_la_LDFLAGS = $(PLUGINLDFLAGS)
+rdmaconnector_la_CXXFLAGS = \
+ $(AM_CXXFLAGS) -Wno-missing-field-initializers
+cmodule_LTLIBRARIES += \
+ rdmaconnector.la
+
# RDMA test/sample programs
noinst_PROGRAMS += RdmaServer RdmaClient
RdmaServer_SOURCES = qpid/sys/rdma/RdmaServer.cpp
-RdmaServer_CXXFLAGS = \
- $(AM_CXXFLAGS) -Wno-missing-field-initializers
RdmaServer_LDADD = \
librdmawrap.la libqpidcommon.la
RdmaClient_SOURCES = qpid/sys/rdma/RdmaClient.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/acl.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/acl.mk?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/acl.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/acl.mk Wed Sep 10 23:16:19 2008
@@ -12,5 +12,5 @@
qpid/acl/AclReader.cpp \
qpid/acl/AclReader.h
-acl_la_LIBADD= libqpidbroker.la
+acl_la_LIBADD = libqpidbroker.la
acl_la_LDFLAGS = $(PLUGINLDFLAGS)
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=694143&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Wed Sep 10 23:16:19 2008
@@ -0,0 +1,427 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connector.h"
+
+#include "Bounds.h"
+#include "ConnectionImpl.h"
+#include "ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/Msg.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+
+// This stuff needs to abstracted out of here to a platform specific file
+#include <netdb.h>
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::format;
+using boost::str;
+
+class RdmaConnector : public Connector, private sys::Runnable
+{
+ struct Buff;
+
+ /** Batch up frames for writing to aio. */
+ class Writer : public framing::FrameHandler {
+ typedef Rdma::Buffer BufferBase;
+ typedef std::deque<framing::AMQFrame> Frames;
+
+ const uint16_t maxFrameSize;
+ sys::Mutex lock;
+ Rdma::AsynchIO* aio;
+ BufferBase* buffer;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ framing::Buffer encode;
+ size_t framesEncoded;
+ std::string identifier;
+ Bounds* bounds;
+
+ void writeOne();
+ void newBuffer();
+
+ public:
+
+ Writer(uint16_t maxFrameSize, Bounds*);
+ ~Writer();
+ void init(std::string id, Rdma::AsynchIO*);
+ void handle(framing::AMQFrame&);
+ void write(Rdma::AsynchIO&);
+ };
+
+ const uint16_t maxFrameSize;
+ framing::ProtocolVersion version;
+ bool initiated;
+
+ sys::Mutex pollingLock;
+ bool polling;
+ bool joined;
+
+ sys::ShutdownHandler* shutdownHandler;
+ framing::InputHandler* input;
+ framing::InitiationHandler* initialiser;
+ framing::OutputHandler* output;
+
+ Writer writer;
+
+ sys::Thread receiver;
+
+ Rdma::AsynchIO* aio;
+ sys::Poller::shared_ptr poller;
+
+ ~RdmaConnector();
+
+ void run();
+ void handleClosed();
+ bool closeInternal();
+
+ // Callbacks
+ void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+ void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
+ void disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&);
+ void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+
+ void readbuff(Rdma::AsynchIO&, Rdma::Buffer*);
+ void writebuff(Rdma::AsynchIO&);
+ void writeDataBlock(const framing::AMQDataBlock& data);
+ void eof(Rdma::AsynchIO&);
+
+ std::string identifier;
+
+ ConnectionImpl* impl;
+
+ void connect(const std::string& host, int port);
+ void close();
+ void send(framing::AMQFrame& frame);
+
+ void setInputHandler(framing::InputHandler* handler);
+ void setShutdownHandler(sys::ShutdownHandler* handler);
+ sys::ShutdownHandler* getShutdownHandler() const;
+ framing::OutputHandler* getOutputHandler();
+ const std::string& getIdentifier() const;
+
+public:
+ RdmaConnector(framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
+ ConnectionImpl*);
+};
+
+// Static constructor which registers connector here
+namespace {
+ Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new RdmaConnector(v, s, c);
+ }
+
+ struct StaticInit {
+ StaticInit() {
+ Connector::registerFactory("rdma", &create);
+ Connector::registerFactory("ib", &create);
+ };
+ } init;
+}
+
+
+RdmaConnector::RdmaConnector(ProtocolVersion ver,
+ const ConnectionSettings& settings,
+ ConnectionImpl* cimpl)
+ : maxFrameSize(settings.maxFrameSize),
+ version(ver),
+ initiated(false),
+ polling(false),
+ joined(true),
+ shutdownHandler(0),
+ writer(maxFrameSize, cimpl),
+ aio(0),
+ impl(cimpl)
+{
+ QPID_LOG(debug, "RdmaConnector created for " << version);
+}
+
+RdmaConnector::~RdmaConnector() {
+ close();
+}
+
+void RdmaConnector::connect(const std::string& host, int port){
+ Mutex::ScopedLock l(pollingLock);
+ assert(!polling);
+ assert(joined);
+ poller = Poller::shared_ptr(new Poller);
+
+ // This stuff needs to abstracted out of here to a platform specific file
+ ::addrinfo *res;
+ ::addrinfo hints;
+ hints.ai_flags = 0;
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = 0;
+ int n = ::getaddrinfo(host.c_str(), boost::lexical_cast<std::string>(port).c_str(), &hints, &res);
+ if (n<0) {
+ throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+ }
+
+ Rdma::Connector* c = new Rdma::Connector(
+ *res->ai_addr,
+ Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
+ boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
+ boost::bind(&RdmaConnector::disconnected, this, poller, _1),
+ boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
+ c->start(poller);
+
+ polling = true;
+ joined = false;
+ receiver = Thread(this);
+}
+
+// The following only gets run when connected
+void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
+ Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
+
+ aio = new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&RdmaConnector::readbuff, this, _1, _2),
+ boost::bind(&RdmaConnector::writebuff, this, _1),
+ 0, // write buffers full
+ boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection
+ aio->start(poller);
+
+ identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName());
+ writer.init(identifier, aio);
+ ProtocolInitiation init(version);
+ writeDataBlock(init);
+}
+
+void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
+ QPID_LOG(trace, "Connection Error " << identifier);
+ eof(*aio);
+}
+
+void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) {
+ eof(*aio);
+}
+
+void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) {
+ QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
+ eof(*aio);
+}
+
+bool RdmaConnector::closeInternal() {
+ bool ret;
+ {
+ Mutex::ScopedLock l(pollingLock);
+ ret = polling;
+ if (polling) {
+ polling = false;
+ poller->shutdown();
+ }
+ if (joined || receiver.id() == Thread::current().id()) {
+ return ret;
+ }
+ joined = true;
+ }
+
+ receiver.join();
+ return ret;
+}
+
+void RdmaConnector::close() {
+ closeInternal();
+}
+
+void RdmaConnector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void RdmaConnector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* RdmaConnector::getOutputHandler(){
+ return this;
+}
+
+sys::ShutdownHandler* RdmaConnector::getShutdownHandler() const {
+ return shutdownHandler;
+}
+
+const std::string& RdmaConnector::getIdentifier() const {
+ return identifier;
+}
+
+void RdmaConnector::send(AMQFrame& frame) {
+ writer.handle(frame);
+}
+
+void RdmaConnector::handleClosed() {
+ if (closeInternal() && shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
+RdmaConnector::Writer::Writer(uint16_t s, Bounds* b) :
+ maxFrameSize(s),
+ aio(0),
+ buffer(0),
+ lastEof(0),
+ bounds(b)
+{
+}
+
+RdmaConnector::Writer::~Writer() {
+ if (aio)
+ aio->returnBuffer(buffer);
+}
+
+void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) {
+ Mutex::ScopedLock l(lock);
+ identifier = id;
+ aio = a;
+ newBuffer();
+}
+void RdmaConnector::Writer::handle(framing::AMQFrame& frame) {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ // Don't bother to send anything unless we're at the end of a frameset (assembly in 0-10 terminology)
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
+ aio->notifyPendingWrite();
+ }
+ QPID_LOG(trace, "SENT " << identifier << ": " << frame);
+}
+
+void RdmaConnector::Writer::writeOne() {
+ assert(buffer);
+ QPID_LOG(trace, "Write buffer " << encode.getPosition()
+ << " bytes " << framesEncoded << " frames ");
+ framesEncoded = 0;
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encode.getPosition();
+ aio->queueWrite(buffer);
+ newBuffer();
+}
+
+void RdmaConnector::Writer::newBuffer() {
+ buffer = aio->getBuffer();
+ encode = framing::Buffer(buffer->bytes, buffer->byteCount);
+ framesEncoded = 0;
+}
+
+// Called in IO thread. (write idle routine)
+// This is NOT only called in response to previously calling notifyPendingWrite
+void RdmaConnector::Writer::write(Rdma::AsynchIO&) {
+ Mutex::ScopedLock l(lock);
+ assert(buffer);
+ // If nothing to do return immediately
+ if (lastEof==0)
+ return;
+ size_t bytesWritten = 0;
+ while (aio->writable() && !frames.empty()) {
+ const AMQFrame* frame = &frames.front();
+ uint32_t size = frame->size();
+ while (size <= encode.available()) {
+ frame->encode(encode);
+ frames.pop_front();
+ ++framesEncoded;
+ bytesWritten += size;
+ if (frames.empty())
+ break;
+ frame = &frames.front();
+ size = frame->size();
+ }
+ lastEof -= framesEncoded;
+ writeOne();
+ }
+ if (bounds) bounds->reduce(bytesWritten);
+}
+
+void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+ if (!initiated) {
+ framing::ProtocolInitiation protocolInit;
+ if (protocolInit.decode(in)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+ }
+ initiated = true;
+ }
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV " << identifier << ": " << frame);
+ input->received(frame);
+ }
+}
+
+void RdmaConnector::writebuff(Rdma::AsynchIO& aio_) {
+ writer.write(aio_);
+}
+
+void RdmaConnector::writeDataBlock(const AMQDataBlock& data) {
+ Rdma::Buffer* buff = aio->getBuffer();
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
+void RdmaConnector::eof(Rdma::AsynchIO&) {
+ handleClosed();
+}
+
+// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
+// will never be called
+void RdmaConnector::run(){
+ // Keep the connection impl in memory until run() completes.
+ //GRS: currently the ConnectionImpls destructor is where the Io thread is joined
+ //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
+ //assert(protect);
+ try {
+ Dispatcher d(poller);
+
+ //aio->start(poller);
+ d.run();
+ //aio->queueForDeletion();
+ } catch (const std::exception& e) {
+ {
+ // We're no longer polling
+ Mutex::ScopedLock l(pollingLock);
+ polling = false;
+ }
+ QPID_LOG(error, e.what());
+ handleClosed();
+ }
+}
+
+
+}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Wed Sep 10 23:16:19 2008
@@ -137,8 +137,8 @@
void SessionImpl::detach() //call with lock held
{
if (state == ATTACHED) {
- proxy.detach(id.getName());
setState(DETACHING);
+ proxy.detach(id.getName());
}
}
@@ -630,7 +630,8 @@
inline void SessionImpl::waitFor(State s) //call with lock held
{
// We can be DETACHED at any time
- state.waitFor(States(s, DETACHED));
+ if (s == DETACHED) state.waitFor(DETACHED);
+ else state.waitFor(States(s, DETACHED));
check();
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=694143&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Wed Sep 10 23:16:19 2008
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ProtocolFactory.h"
+
+#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/OutputControl.h"
+
+#include <boost/bind.hpp>
+#include <memory>
+
+#include <netdb.h>
+
+using std::auto_ptr;
+using std::string;
+using std::stringstream;
+
+namespace qpid {
+namespace sys {
+
+class RdmaIOHandler : public OutputControl {
+ Rdma::Connection::intrusive_ptr connection;
+ std::string identifier;
+ Rdma::AsynchIO* aio;
+ ConnectionCodec::Factory* factory;
+ ConnectionCodec* codec;
+ bool readError;
+ bool isClient;
+
+ void write(const framing::ProtocolInitiation&);
+
+ public:
+ RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, ConnectionCodec::Factory* f);
+ ~RdmaIOHandler();
+ void init(Rdma::AsynchIO* a);
+ void start(Poller::shared_ptr poller) {aio->start(poller);}
+
+ void setClient() { isClient = true; }
+
+ // Output side
+ void close();
+ void activateOutput();
+
+ // Input side
+ void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff);
+
+ // Notifications
+ void full(Rdma::AsynchIO& aio);
+ void idle(Rdma::AsynchIO& aio);
+ void error(Rdma::AsynchIO& aio);
+};
+
+RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::ConnectionCodec::Factory* f) :
+ connection(c),
+ identifier(c->getPeerName()),
+ factory(f),
+ codec(0),
+ readError(false),
+ isClient(false)
+{
+}
+
+void RdmaIOHandler::init(Rdma::AsynchIO* a) {
+ aio = a;
+}
+
+RdmaIOHandler::~RdmaIOHandler() {
+ if (codec)
+ codec->closed();
+ delete codec;
+
+ aio->deferDelete();
+}
+
+void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
+{
+ QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");
+ Rdma::Buffer* buff = aio->getBuffer();
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
+void RdmaIOHandler::close() {
+ aio->queueWriteClose();
+}
+
+void RdmaIOHandler::activateOutput() {
+ aio->notifyPendingWrite();
+}
+
+void RdmaIOHandler::idle(Rdma::AsynchIO&) {
+ if (!aio->writable()) {
+ return;
+ }
+ if (isClient && codec == 0) {
+ codec = factory->create(*this, identifier);
+ write(framing::ProtocolInitiation(codec->getVersion()));
+ return;
+ }
+ if (codec == 0) return;
+ if (codec->canEncode()) {
+ // Try and get a queued buffer if not then construct new one
+ Rdma::Buffer* buff = aio->getBuffer();
+ size_t encoded=codec->encode(buff->bytes, buff->byteCount);
+ buff->dataCount = encoded;
+ aio->queueWrite(buff);
+ }
+ if (codec->isClosed())
+ aio->queueWriteClose();
+}
+
+void RdmaIOHandler::error(Rdma::AsynchIO&) {
+ close();
+}
+
+void RdmaIOHandler::full(Rdma::AsynchIO&) {
+ QPID_LOG(debug, "buffer full [" << identifier << "]");
+}
+
+// The logic here is subtly different from TCP as RDMA is message oriented
+// so we define that an RDMA message is a frame - in this case there is no putting back
+// of any message remainder - there shouldn't be any. And what we read here can't be
+// smaller than a frame
+void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
+ if (readError) {
+ return;
+ }
+ size_t decoded = 0;
+ if (codec) { // Already initiated
+ try {
+ decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+ }catch(const std::exception& e){
+ QPID_LOG(error, e.what());
+ readError = true;
+ aio->queueWriteClose();
+ }
+ }else{
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ framing::ProtocolInitiation protocolInit;
+ if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
+ QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")");
+ try {
+ codec = factory->create(protocolInit.getVersion(), *this, identifier);
+ if (!codec) {
+ //TODO: may still want to revise this...
+ //send valid version header & close connection.
+ write(framing::ProtocolInitiation(framing::highestProtocolVersion));
+ readError = true;
+ aio->queueWriteClose();
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, e.what());
+ readError = true;
+ aio->queueWriteClose();
+ }
+ }
+ }
+}
+
+class RdmaIOProtocolFactory : public ProtocolFactory {
+ auto_ptr<Rdma::Listener> listener;
+ const uint16_t listeningPort;
+
+ public:
+ RdmaIOProtocolFactory(int16_t port, int backlog);
+ void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
+ void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback);
+
+ uint16_t getPort() const;
+ string getHost() const;
+
+ private:
+ bool request(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
+ void established(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&);
+ void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
+ void connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
+ void disconnected(Rdma::Connection::intrusive_ptr&);
+ void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+};
+
+// Static instance to initialise plugin
+static class RdmaIOPlugin : public Plugin {
+ void earlyInitialize(Target&) {
+ }
+
+ void initialize(Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ // Only provide to a Broker
+ if (broker) {
+ const broker::Broker::Options& opts = broker->getOptions();
+ ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog));
+ QPID_LOG(info, "Listening on RDMA port " << protocol->getPort());
+ broker->registerProtocolFactory(protocol);
+ }
+ }
+} rdmaPlugin;
+
+RdmaIOProtocolFactory::RdmaIOProtocolFactory(int16_t port, int /*backlog*/) :
+ listeningPort(port)
+{}
+
+void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
+ RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
+ async->start(poller);
+}
+
+bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
+ ConnectionCodec::Factory* f) {
+ RdmaIOHandler* async = new RdmaIOHandler(ci, f);
+ Rdma::AsynchIO* aio =
+ new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
+ boost::bind(&RdmaIOHandler::idle, async, _1),
+ 0, // boost::bind(&RdmaIOHandler::full, async, _1),
+ boost::bind(&RdmaIOHandler::error, async, _1));
+ async->init(aio);
+
+ // Record aio so we can get it back from a connection
+ ci->addContext(async);
+ return true;
+}
+
+void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
+}
+
+void RdmaIOProtocolFactory::disconnected(Rdma::Connection::intrusive_ptr& ci) {
+ // If we've got a connection already tear it down, otherwise ignore
+ RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
+ if (async) {
+ async->close();
+ }
+ delete async;
+}
+
+uint16_t RdmaIOProtocolFactory::getPort() const {
+ return listeningPort; // Immutable no need for lock.
+}
+
+string RdmaIOProtocolFactory::getHost() const {
+ //return listener.getSockname();
+ return "";
+}
+
+void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+ ::sockaddr_in sin;
+
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(listeningPort);
+ sin.sin_addr.s_addr = INADDR_ANY;
+
+ listener.reset(
+ new Rdma::Listener((const sockaddr&)(sin),
+ Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1),
+ boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
+ boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
+ boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact)));
+
+ listener->start(poller);
+}
+
+// Only used for outgoing connections (in federation)
+void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
+}
+
+// Do the same as connection request and established but mark a client too
+void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
+ ConnectionCodec::Factory* f) {
+ (void) request(ci, cp, f);
+ RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
+ async->setClient();
+ established(poller, ci);
+}
+
+void RdmaIOProtocolFactory::connect(
+ Poller::shared_ptr poller,
+ const std::string& host, int16_t p,
+ ConnectionCodec::Factory* f,
+ ConnectFailedCallback)
+{
+ ::addrinfo *res;
+ ::addrinfo hints = {};
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ stringstream ss; ss << p;
+ string port = ss.str();
+ int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
+ if (n<0) {
+ throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+ }
+
+ Rdma::Connector c(
+ *res->ai_addr,
+ Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f),
+ boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
+ boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
+ boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2));
+}
+
+}} // namespace qpid::sys
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp Wed Sep 10 23:16:19 2008
@@ -50,8 +50,6 @@
int64_t sbytes = 0;
int64_t rmsgs = 0;
int64_t rbytes = 0;
-int64_t cmsgs = 0;
-int writable = true;
int target = 1000000;
int msgsize = 200;
@@ -62,17 +60,15 @@
vector<char> testString;
void write(Rdma::AsynchIO& aio) {
- if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
- while (writable) {
- if (smsgs >= target)
- return;
- Rdma::Buffer* b = aio.getBuffer();
- std::copy(testString.begin(), testString.end(), b->bytes);
- b->dataCount = msgsize;
- aio.queueWrite(b);
- ++smsgs;
- sbytes += b->byteCount;
- }
+ while (aio.writable()) {
+ if (smsgs >= target)
+ return;
+ Rdma::Buffer* b = aio.getBuffer();
+ std::copy(testString.begin(), testString.end(), b->bytes);
+ b->dataCount = msgsize;
+ aio.queueWrite(b);
+ ++smsgs;
+ sbytes += msgsize;
}
}
@@ -82,39 +78,46 @@
void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
++rmsgs;
- rbytes += b->byteCount;
+ rbytes += b->dataCount;
// When all messages have been recvd stop
if (rmsgs < target) {
write(aio);
} else {
fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
- if (cmsgs >= target)
+ if (aio.incompletedWrites() == 0)
p->shutdown();
}
}
-void full(Rdma::AsynchIO&) {
- writable = false;
+void full(Rdma::AsynchIO& a, Rdma::Buffer* b) {
+ // Warn as we shouldn't get here anymore
+ cerr << "!";
+
+ // Don't need to keep buffer just adjust the counts
+ --smsgs;
+ sbytes -= b->dataCount;
+
+ // Give buffer back
+ a.returnBuffer(b);
}
void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
- writable = true;
- ++cmsgs;
if (smsgs < target) {
write(aio);
} else {
sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
- if (rmsgs >= target && cmsgs >= target)
+ if (rmsgs >= target && aio.incompletedWrites() == 0)
p->shutdown();
}
}
-void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
cout << "Connected\n";
Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
- Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize,
+ Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES,
boost::bind(&data, poller, _1, _2),
boost::bind(&idle, poller, _1),
&full,
@@ -131,12 +134,12 @@
p->shutdown();
}
-void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ErrorType) {
cout << "Connection error\n";
p->shutdown();
}
-void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
cout << "Connection rejected\n";
p->shutdown();
}
@@ -164,7 +167,7 @@
// Make a random message of that size
testString.resize(msgsize);
for (int i = 0; i < msgsize; ++i) {
- testString[i] = 32 + rand() & 0x3f;
+ testString[i] = 32 + (rand() & 0x3f);
}
try {
@@ -173,10 +176,11 @@
Rdma::Connector c(
*res->ai_addr,
- boost::bind(&connected, p, _1),
- boost::bind(&connectionError, p, _1),
+ Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&connected, p, _1, _2),
+ boost::bind(&connectionError, p, _1, _2),
boost::bind(&disconnected, p, _1),
- boost::bind(&rejected, p, _1));
+ boost::bind(&rejected, p, _1, _2));
c.start(p);
d.run();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Wed Sep 10 23:16:19 2008
@@ -20,13 +20,21 @@
*/
#include "RdmaIO.h"
+#include "qpid/log/Statement.h"
+
+
#include <iostream>
#include <boost/bind.hpp>
+using qpid::sys::DispatchHandle;
+using qpid::sys::Poller;
+
namespace Rdma {
AsynchIO::AsynchIO(
QueuePair::intrusive_ptr q,
- int s,
+ int size,
+ int xCredit,
+ int rCount,
ReadCallback rc,
IdleCallback ic,
FullCallback fc,
@@ -34,10 +42,15 @@
) :
qp(q),
dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
- bufferSize(s),
- recvBufferCount(DEFAULT_WR_ENTRIES),
- xmitBufferCount(DEFAULT_WR_ENTRIES),
+ bufferSize(size),
+ recvCredit(0),
+ xmitCredit(xCredit),
+ recvBufferCount(rCount),
+ xmitBufferCount(xCredit),
outstandingWrites(0),
+ closed(false),
+ deleting(false),
+ state(IDLE),
readCallback(rc),
idleCallback(ic),
fullCallback(fc),
@@ -57,72 +70,232 @@
}
AsynchIO::~AsynchIO() {
+ // Warn if we are deleting whilst there are still unreclaimed write buffers
+ if ( outstandingWrites>0 )
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
+
+ // Turn off callbacks (before doing the deletes)
+ dataHandle.stopWatch();
+
// The buffers ptr_deque automatically deletes all the buffers we've allocated
+ // TODO: It might turn out to be more efficient in high connection loads to reuse the
+ // buffers rather than having to reregister them all the time (this would be straightforward if all
+ // connections haver the same buffer size and harder otherwise)
}
void AsynchIO::start(Poller::shared_ptr poller) {
dataHandle.startWatch(poller);
}
- // TODO: Currently we don't prevent write buffer overrun we just advise
- // when to stop writing.
- void AsynchIO::queueWrite(Buffer* buff) {
- qp->postSend(buff);
- ++outstandingWrites;
- if (outstandingWrites >= xmitBufferCount) {
- fullCallback(*this);
+ // Mark for deletion/Delete this object when we have no outstanding writes
+ void AsynchIO::deferDelete() {
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ if (outstandingWrites > 0 || state != IDLE) {
+ deleting = true;
+ return;
}
+ state = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
+ }
+ delete this;
}
- void AsynchIO::notifyPendingWrite() {
- // Just perform the idle callback (if possible)
- if (outstandingWrites < xmitBufferCount) {
- idleCallback(*this);
+ void AsynchIO::queueWrite(Buffer* buff) {
+ // Make sure we don't overrun our available buffers
+ // either at our end or the known available at the peers end
+ if (writable()) {
+ // TODO: We might want to batch up sending credit
+ if (recvCredit > 0) {
+ int creditSent = recvCredit & ~FlagsMask;
+ qp->postSend(creditSent, buff);
+ recvCredit -= creditSent;
+ } else {
+ qp->postSend(buff);
+ }
+ ++outstandingWrites;
+ --xmitCredit;
+ } else {
+ if (fullCallback) {
+ fullCallback(*this, buff);
+ } else {
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away");
+ returnBuffer(buff);
+ }
}
}
+ // Mark now closed (so we don't accept any more writes or make any idle callbacks)
void AsynchIO::queueWriteClose() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ closed = true;
}
- Buffer* AsynchIO::getBuffer() {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
- if (bufferQueue.empty()) {
- Buffer* b = qp->createBuffer(bufferSize);
- buffers.push_front(b);
- b->dataCount = 0;
- return b;
- } else {
- Buffer* b = bufferQueue.front();
- bufferQueue.pop_front();
- b->dataCount = 0;
- b->dataStart = 0;
- return b;
+ void AsynchIO::notifyPendingWrite() {
+ // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
+ // If we are then we just return as we know that we will eventually do the idle callback anyway.
+ //
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ // We can get here in any state (as the caller could be in any thread)
+ switch (state) {
+ case NOTIFY_WRITE:
+ case PENDING_NOTIFY:
+ // We only need to note a pending notify if we're already doing a notify as data processing
+ // is always followed by write notification processing
+ state = PENDING_NOTIFY;
+ return;
+ case PENDING_DATA:
+ return;
+ case DATA:
+ // Only need to return here as data processing will do the idleCallback itself anyway
+ return;
+ case IDLE:
+ state = NOTIFY_WRITE;
+ break;
+ case DELETED:
+ assert(state!=DELETED);
}
+ }
+
+ doWriteCallback();
+ // Keep track of what we need to do so that we can release the lock
+ enum {COMPLETION, NOTIFY} action;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ // If there was pending data whilst we were doing this, process it now
+ switch (state) {
+ case NOTIFY_WRITE:
+ state = IDLE;
+ return;
+ case PENDING_DATA:
+ action = COMPLETION;
+ break;
+ case PENDING_NOTIFY:
+ action = NOTIFY;
+ break;
+ default:
+ assert(state!=IDLE && state!=DATA && state!=DELETED);
+ return;
+ }
+ // Using NOTIFY_WRITE for both cases is a bit strange, but we're making sure we get the
+ // correct result if we reenter notifyPendingWrite(), in which case we want to
+ // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks
+ // not IDLE)
+ state = NOTIFY_WRITE;
+ }
+ do {
+ // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state
+ // so that we do need to process completions or notifications now
+ switch (action) {
+ case COMPLETION:
+ processCompletions();
+ case NOTIFY:
+ doWriteCallback();
+ break;
+ }
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ switch (state) {
+ case NOTIFY_WRITE:
+ state = IDLE;
+ goto exit;
+ case PENDING_DATA:
+ action = COMPLETION;
+ break;
+ case PENDING_NOTIFY:
+ action = NOTIFY;
+ break;
+ default:
+ assert(state!=IDLE && state!=DATA && state!=DELETED);
+ return;
+ }
+ state = NOTIFY_WRITE;
+ }
+ } while (true);
+ exit:
+ // If we just processed completions we might need to delete ourselves
+ if (action == COMPLETION && deleting && outstandingWrites == 0) {
+ delete this;
+ }
}
- void AsynchIO::dataEvent(DispatchHandle&) {
+ void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) {
+ // Keep track of writable notifications
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ // We're already processing a notification
+ switch (state) {
+ case IDLE:
+ break;
+ default:
+ state = PENDING_DATA;
+ return;
+ }
+ // Can't get here in DATA state as that would violate the serialisation rules
+ assert( state==IDLE );
+ state = DATA;
+ }
+
+ processCompletions();
+
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ assert( state==DATA );
+ state = NOTIFY_WRITE;
+ }
+
+ do {
+ doWriteCallback();
+
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ if ( state==NOTIFY_WRITE ) {
+ state = IDLE;
+ break;
+ }
+ // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
+ assert( state==PENDING_NOTIFY );
+ state = NOTIFY_WRITE;
+ }
+ } while (true);
+
+ // We might need to delete ourselves
+ if (deleting && outstandingWrites == 0) {
+ delete this;
+ }
+ }
+
+ void AsynchIO::processCompletions() {
QueuePair::intrusive_ptr q = qp->getNextChannelEvent();
+ // Re-enable notification for queue:
+ // This needs to happen before we could do anything that could generate more work completion
+ // events (ie the callbacks etc. in the following).
+ // This can't make us reenter this code as the handle attached to the completion queue will still be
+ // disabled by the poller until we leave this code
+ qp->notifyRecv();
+ qp->notifySend();
+
+ int recvEvents = 0;
+ int sendEvents = 0;
+
// If no event do nothing
if (!q)
return;
assert(q == qp);
- // Re-enable notification for queue
- qp->notifySend();
- qp->notifyRecv();
-
// Repeat until no more events
do {
QueuePairEvent e(qp->getNextEvent());
if (!e)
- return;
+ break;
::ibv_wc_status status = e.getEventStatus();
if (status != IBV_WC_SUCCESS) {
errorCallback(*this);
+ // TODO: Probably need to flush queues at this point
return;
}
@@ -131,46 +304,144 @@
Buffer* b = e.getBuffer();
QueueDirection dir = e.getDirection();
if (dir == RECV) {
- readCallback(*this, b);
+ ++recvEvents;
+
+ // Get our xmitCredit if it was sent
+ bool dataPresent = true;
+ if (e.immPresent() ) {
+ xmitCredit += (e.getImm() & ~FlagsMask);
+ dataPresent = ((e.getImm() & IgnoreData) == 0);
+ }
+
+ // if there was no data sent then the message was only to update our credit
+ if ( dataPresent ) {
+ readCallback(*this, b);
+ }
+
// At this point the buffer has been consumed so put it back on the recv queue
+ b->dataStart = 0;
+ b->dataCount = 0;
qp->postRecv(b);
+
+ // Received another message
+ ++recvCredit;
+
+ // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently)
+ if (recvCredit > recvBufferCount/2) {
+ // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message
+ // but this is a little unlikely, as to get in this state we have to have received messages without sending any
+ // for a while so its likely we've received an credit update from the far side.
+ if (writable()) {
+ Buffer* ob = getBuffer();
+ // Have to send something as adapters hate it when you try to transfer 0 bytes
+ *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit);
+ ob->dataCount = sizeof(uint32_t);
+
+ int creditSent = recvCredit & ~FlagsMask;
+ qp->postSend(creditSent | IgnoreData, ob);
+ recvCredit -= creditSent;
+ ++outstandingWrites;
+ --xmitCredit;
+ } else {
+ QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit");
+ }
+ }
} else {
+ ++sendEvents;
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
bufferQueue.push_front(b);
}
--outstandingWrites;
- // TODO: maybe don't call idle unless we're low on write buffers
- idleCallback(*this);
}
} while (true);
+
+ // Not sure if this is expected or not
+ if (recvEvents == 0 && sendEvents == 0) {
+ QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions");
+ }
+ }
+
+ void AsynchIO::doWriteCallback() {
+ // TODO: maybe don't call idle unless we're low on write buffers
+ // Keep on calling the idle routine as long as we are writable and we got something to write last call
+ while (writable()) {
+ int xc = xmitCredit;
+ idleCallback(*this);
+ // Check whether we actually wrote anything
+ if (xmitCredit == xc) {
+ QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit);
+ return;
+ }
+ }
}
+ Buffer* AsynchIO::getBuffer() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
+ if (bufferQueue.empty()) {
+ Buffer* b = qp->createBuffer(bufferSize);
+ buffers.push_front(b);
+ return b;
+ } else {
+ Buffer* b = bufferQueue.front();
+ bufferQueue.pop_front();
+ b->dataCount = 0;
+ b->dataStart = 0;
+ return b;
+ }
+
+ }
+
+ void AsynchIO::returnBuffer(Buffer* b) {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
+ bufferQueue.push_front(b);
+ b->dataCount = 0;
+ b->dataStart = 0;
+ }
+
+ ConnectionManager::ConnectionManager(
+ ErrorCallback errc,
+ DisconnectedCallback dc
+ ) :
+ ci(Connection::make()),
+ handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0),
+ errorCallback(errc),
+ disconnectedCallback(dc)
+ {
+ ci->nonblocking();
+ }
+
+ void ConnectionManager::start(Poller::shared_ptr poller) {
+ startConnection(ci);
+ handle.startWatch(poller);
+ }
+
+ void ConnectionManager::event(DispatchHandle&) {
+ connectionEvent(ci);
+ }
+
Listener::Listener(
const sockaddr& src,
- ConnectedCallback cc,
+ const ConnectionParams& cp,
+ EstablishedCallback ec,
ErrorCallback errc,
DisconnectedCallback dc,
ConnectionRequestCallback crc
) :
+ ConnectionManager(errc, dc),
src_addr(src),
- ci(Connection::make()),
- handle(*ci, boost::bind(&Listener::connectionEvent, this, _1), 0, 0),
- connectedCallback(cc),
- errorCallback(errc),
- disconnectedCallback(dc),
- connectionRequestCallback(crc)
+ checkConnectionParams(cp),
+ connectionRequestCallback(crc),
+ establishedCallback(ec)
{
- ci->nonblocking();
}
- void Listener::start(Poller::shared_ptr poller) {
+ void Listener::startConnection(Connection::intrusive_ptr ci) {
ci->bind(src_addr);
ci->listen();
- handle.startWatch(poller);
}
- void Listener::connectionEvent(DispatchHandle&) {
+ void Listener::connectionEvent(Connection::intrusive_ptr ci) {
ConnectionEvent e(ci->getNextEvent());
// If (for whatever reason) there was no event do nothing
@@ -181,65 +452,75 @@
// you get from CONNECT_REQUEST has the same context info
// as its parent listening rdma_cm_id
::rdma_cm_event_type eventType = e.getEventType();
+ ::rdma_conn_param conn_param = e.getConnectionParam();
Rdma::Connection::intrusive_ptr id = e.getConnection();
switch (eventType) {
case RDMA_CM_EVENT_CONNECT_REQUEST: {
- bool accept = true;
- // Extract connection parameters and private data from event
- ::rdma_conn_param conn_param = e.getConnectionParam();
+ // Make sure peer has sent params we can use
+ if (!conn_param.private_data || conn_param.private_data_len < sizeof(ConnectionParams)) {
+ id->reject();
+ break;
+ }
+ ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data);
+
+ // Reject if requested msg size is bigger than we allow
+ if (cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize) {
+ id->reject(&checkConnectionParams);
+ break;
+ }
+ bool accept = true;
if (connectionRequestCallback)
- //TODO: pass private data to callback (and accept new private data for accept somehow)
- accept = connectionRequestCallback(id);
+ accept = connectionRequestCallback(id, cp);
+
if (accept) {
// Accept connection
- id->accept(conn_param);
+ cp.initialXmitCredit = checkConnectionParams.initialXmitCredit;
+ id->accept(conn_param, &cp);
} else {
- //Reject connection
+ // Reject connection
id->reject();
}
-
break;
}
case RDMA_CM_EVENT_ESTABLISHED:
- connectedCallback(id);
+ establishedCallback(id);
break;
case RDMA_CM_EVENT_DISCONNECTED:
disconnectedCallback(id);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
- errorCallback(id);
+ errorCallback(id, CONNECT_ERROR);
break;
default:
- std::cerr << "Warning: unexpected response to listen - " << eventType << "\n";
+ // Unexpected response
+ errorCallback(id, UNKNOWN);
+ //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n";
}
}
Connector::Connector(
const sockaddr& dst,
+ const ConnectionParams& cp,
ConnectedCallback cc,
ErrorCallback errc,
DisconnectedCallback dc,
RejectedCallback rc
) :
+ ConnectionManager(errc, dc),
dst_addr(dst),
- ci(Connection::make()),
- handle(*ci, boost::bind(&Connector::connectionEvent, this, _1), 0, 0),
- connectedCallback(cc),
- errorCallback(errc),
- disconnectedCallback(dc),
- rejectedCallback(rc)
+ connectionParams(cp),
+ rejectedCallback(rc),
+ connectedCallback(cc)
{
- ci->nonblocking();
}
- void Connector::start(Poller::shared_ptr poller) {
+ void Connector::startConnection(Connection::intrusive_ptr ci) {
ci->resolve_addr(dst_addr);
- handle.startWatch(poller);
}
- void Connector::connectionEvent(DispatchHandle&) {
+ void Connector::connectionEvent(Connection::intrusive_ptr ci) {
ConnectionEvent e(ci->getNextEvent());
// If (for whatever reason) there was no event do nothing
@@ -247,6 +528,8 @@
return;
::rdma_cm_event_type eventType = e.getEventType();
+ ::rdma_conn_param conn_param = e.getConnectionParam();
+ Rdma::Connection::intrusive_ptr id = e.getConnection();
switch (eventType) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
// RESOLVE_ADDR
@@ -254,38 +537,46 @@
break;
case RDMA_CM_EVENT_ADDR_ERROR:
// RESOLVE_ADDR
- errorCallback(ci);
+ errorCallback(ci, ADDR_ERROR);
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
// RESOLVE_ROUTE:
- ci->connect();
+ ci->connect(&connectionParams);
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
// RESOLVE_ROUTE:
- errorCallback(ci);
+ errorCallback(ci, ROUTE_ERROR);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
// CONNECTING
- errorCallback(ci);
+ errorCallback(ci, CONNECT_ERROR);
break;
case RDMA_CM_EVENT_UNREACHABLE:
// CONNECTING
- errorCallback(ci);
+ errorCallback(ci, UNREACHABLE);
break;
- case RDMA_CM_EVENT_REJECTED:
+ case RDMA_CM_EVENT_REJECTED: {
// CONNECTING
- rejectedCallback(ci);
+ // Extract private data from event
+ assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams));
+ ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data);
+ rejectedCallback(ci, cp);
break;
- case RDMA_CM_EVENT_ESTABLISHED:
+ }
+ case RDMA_CM_EVENT_ESTABLISHED: {
// CONNECTING
- connectedCallback(ci);
+ // Extract private data from event
+ assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams));
+ ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data);
+ connectedCallback(ci, cp);
break;
+ }
case RDMA_CM_EVENT_DISCONNECTED:
// ESTABLISHED
disconnectedCallback(ci);
break;
default:
- std::cerr << "Warning: unexpected event in connect: " << eventType << "\n";
+ QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType);
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Wed Sep 10 23:16:19 2008
@@ -32,32 +32,29 @@
#include <boost/ptr_container/ptr_deque.hpp>
#include <deque>
-using qpid::sys::DispatchHandle;
-using qpid::sys::Poller;
-
namespace Rdma {
class Connection;
- typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ConnectedCallback;
- typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ErrorCallback;
- typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback;
- typedef boost::function1<bool, Rdma::Connection::intrusive_ptr&> ConnectionRequestCallback;
- typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> RejectedCallback;
-
class AsynchIO
{
typedef boost::function1<void, AsynchIO&> ErrorCallback;
typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
typedef boost::function1<void, AsynchIO&> IdleCallback;
- typedef boost::function1<void, AsynchIO&> FullCallback;
+ typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback;
QueuePair::intrusive_ptr qp;
- DispatchHandle dataHandle;
+ qpid::sys::DispatchHandleRef dataHandle;
int bufferSize;
+ int recvCredit;
+ int xmitCredit;
int recvBufferCount;
int xmitBufferCount;
int outstandingWrites;
+ bool closed; // TODO: Perhaps (probably) this state can be merged with the following...
+ bool deleting; // TODO: Perhaps (probably) this state can be merged with the following...
+ enum { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED } state;
+ qpid::sys::Mutex stateLock;
std::deque<Buffer*> bufferQueue;
qpid::sys::Mutex bufferQueueLock;
boost::ptr_deque<Buffer> buffers;
@@ -70,70 +67,145 @@
public:
AsynchIO(
QueuePair::intrusive_ptr q,
- int s,
+ int size,
+ int xCredit,
+ int rCount,
ReadCallback rc,
IdleCallback ic,
FullCallback fc,
ErrorCallback ec
);
- ~AsynchIO();
- void start(Poller::shared_ptr poller);
+ void start(qpid::sys::Poller::shared_ptr poller);
+ bool writable() const;
void queueWrite(Buffer* buff);
void notifyPendingWrite();
void queueWriteClose();
+ void deferDelete();
+ int incompletedWrites() const;
Buffer* getBuffer();
+ void returnBuffer(Buffer*);
private:
- void dataEvent(DispatchHandle& handle);
+ // Don't let anyone else delete us to make sure there can't be outstanding callbacks
+ ~AsynchIO();
+
+ // Constants for the peer-peer command messages
+ // These are sent in the high bits if the imm data of an rdma message
+ // The low bits are used to send the credit
+ const static int FlagsMask = 0x10000000; // Mask for all flag bits - be sure to update this if you add more command bits
+ const static int IgnoreData = 0x10000000; // Message contains no application data
+
+ void dataEvent(qpid::sys::DispatchHandle& handle);
+ void processCompletions();
+ void doWriteCallback();
};
- class Listener
- {
- sockaddr src_addr;
+ inline bool AsynchIO::writable() const {
+ return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0);
+ }
+
+ inline int AsynchIO::incompletedWrites() const {
+ return outstandingWrites;
+ }
+
+ // These are the parameters necessary to start the conversation
+ // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer
+ // * Each peer HAS to know the initial "credit" it has for transmitting to its peer
+ struct ConnectionParams {
+ int maxRecvBufferSize;
+ int initialXmitCredit ;
+
+ ConnectionParams(int s, int c) :
+ maxRecvBufferSize(s),
+ initialXmitCredit(c)
+ {}
+ };
+
+ enum ErrorType {
+ ADDR_ERROR,
+ ROUTE_ERROR,
+ CONNECT_ERROR,
+ UNREACHABLE,
+ UNKNOWN
+ };
+
+ typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, ErrorType> ErrorCallback;
+ typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback;
+
+ class ConnectionManager {
Connection::intrusive_ptr ci;
- DispatchHandle handle;
- ConnectedCallback connectedCallback;
+ qpid::sys::DispatchHandle handle;
+
+ protected:
ErrorCallback errorCallback;
DisconnectedCallback disconnectedCallback;
+
+ public:
+ ConnectionManager(
+ ErrorCallback errc,
+ DisconnectedCallback dc
+ );
+
+ virtual ~ConnectionManager() {}
+
+ void start(qpid::sys::Poller::shared_ptr poller);
+
+ private:
+ void event(qpid::sys::DispatchHandle& handle);
+
+ virtual void startConnection(Connection::intrusive_ptr ci) = 0;
+ virtual void connectionEvent(Connection::intrusive_ptr ci) = 0;
+ };
+
+ typedef boost::function2<bool, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectionRequestCallback;
+ typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> EstablishedCallback;
+
+ class Listener : public ConnectionManager
+ {
+ sockaddr src_addr;
+ ConnectionParams checkConnectionParams;
ConnectionRequestCallback connectionRequestCallback;
+ EstablishedCallback establishedCallback;
public:
Listener(
const sockaddr& src,
- ConnectedCallback cc,
+ const ConnectionParams& cp,
+ EstablishedCallback ec,
ErrorCallback errc,
DisconnectedCallback dc,
ConnectionRequestCallback crc = 0
);
- void start(Poller::shared_ptr poller);
private:
- void connectionEvent(DispatchHandle& handle);
+ void startConnection(Connection::intrusive_ptr ci);
+ void connectionEvent(Connection::intrusive_ptr ci);
};
- class Connector
+ typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> RejectedCallback;
+ typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectedCallback;
+
+ class Connector : public ConnectionManager
{
sockaddr dst_addr;
- Connection::intrusive_ptr ci;
- DispatchHandle handle;
- ConnectedCallback connectedCallback;
- ErrorCallback errorCallback;
- DisconnectedCallback disconnectedCallback;
+ ConnectionParams connectionParams;
RejectedCallback rejectedCallback;
+ ConnectedCallback connectedCallback;
public:
Connector(
const sockaddr& dst,
+ const ConnectionParams& cp,
ConnectedCallback cc,
ErrorCallback errc,
DisconnectedCallback dc,
RejectedCallback rc = 0
);
- void start(Poller::shared_ptr poller);
private:
- void connectionEvent(DispatchHandle& handle);
+ void startConnection(Connection::intrusive_ptr ci);
+ void connectionEvent(Connection::intrusive_ptr ci);
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp Wed Sep 10 23:16:19 2008
@@ -42,12 +42,10 @@
struct ConRec {
Rdma::Connection::intrusive_ptr connection;
Rdma::AsynchIO* data;
- bool writable;
queue<Rdma::Buffer*> queuedWrites;
ConRec(Rdma::Connection::intrusive_ptr c) :
- connection(c),
- writable(true)
+ connection(c)
{}
};
@@ -55,50 +53,53 @@
cout << "Data error:\n";
}
+void idle(ConRec* cr, Rdma::AsynchIO& a) {
+ // Need to make sure full is not called as it would reorder messages
+ while (!cr->queuedWrites.empty() && a.writable()) {
+ Rdma::Buffer* buf = cr->queuedWrites.front();
+ cr->queuedWrites.pop();
+ a.queueWrite(buf);
+ }
+}
+
void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
// Echo data back
Rdma::Buffer* buf = a.getBuffer();
std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes);
buf->dataCount = b->dataCount;
- if (cr->queuedWrites.empty() && cr->writable) {
+ if (cr->queuedWrites.empty()) {
+ // If can't write then full will be called and push buffer on back of queue
a.queueWrite(buf);
} else {
cr->queuedWrites.push(buf);
+ // Try to empty queue
+ idle(cr, a);
}
}
-void full(ConRec* cr, Rdma::AsynchIO&) {
- cr->writable = false;
-}
-
-void idle(ConRec* cr, Rdma::AsynchIO& a) {
- cr->writable = true;
- while (!cr->queuedWrites.empty() && cr->writable) {
- Rdma::Buffer* buf = cr->queuedWrites.front();
- cr->queuedWrites.pop();
- a.queueWrite(buf);
- }
+void full(ConRec* cr, Rdma::AsynchIO&, Rdma::Buffer* buf) {
+ cr->queuedWrites.push(buf);
}
void disconnected(Rdma::Connection::intrusive_ptr& ci) {
ConRec* cr = ci->getContext<ConRec>();
cr->connection->disconnect();
- delete cr->data;
+ cr->data->queueWriteClose();
delete cr;
cout << "Disconnected: " << cr << "\n";
}
-void connectionError(Rdma::Connection::intrusive_ptr& ci) {
+void connectionError(Rdma::Connection::intrusive_ptr& ci, Rdma::ErrorType) {
ConRec* cr = ci->getContext<ConRec>();
cr->connection->disconnect();
if (cr) {
- delete cr->data;
+ cr->data->queueWriteClose();
delete cr;
}
cout << "Connection error: " << cr << "\n";
}
-bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
+bool connectionRequest(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
cout << "Incoming connection: ";
// For fun reject alternate connection attempts
@@ -109,10 +110,11 @@
if (x) {
ConRec* cr = new ConRec(ci);
Rdma::AsynchIO* aio =
- new Rdma::AsynchIO(ci->getQueuePair(), 8000,
+ new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
boost::bind(data, cr, _1, _2),
boost::bind(idle, cr, _1),
- boost::bind(full, cr, _1),
+ boost::bind(full, cr, _1, _2),
dataError);
ci->addContext(cr);
cr->data = aio;
@@ -149,6 +151,7 @@
Dispatcher d(p);
Rdma::Listener a((const sockaddr&)(sin),
+ Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(connected, p, _1),
connectionError,
disconnected,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp Wed Sep 10 23:16:19 2008
@@ -56,4 +56,9 @@
(void) ::ibv_destroy_cq(cq);
}
+ void destroyQp(::ibv_qp* qp) throw () {
+ if (qp)
+ (void) ::ibv_destroy_qp(qp);
+ }
+
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h Wed Sep 10 23:16:19 2008
@@ -35,6 +35,7 @@
void deallocPd(::ibv_pd* p) throw ();
void destroyCChannel(::ibv_comp_channel* c) throw ();
void destroyCq(::ibv_cq* cq) throw ();
+ void destroyQp(::ibv_qp* qp) throw ();
inline boost::shared_ptr< ::rdma_event_channel > mkEChannel() {
return
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp?rev=694143&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp Wed Sep 10 23:16:19 2008
@@ -0,0 +1,155 @@
+#include "rdma_wrap.h"
+
+namespace Rdma {
+ const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
+ 0, // .private_data
+ 0, // .private_data_len
+ 4, // .responder_resources
+ 4, // .initiator_depth
+ 0, // .flow_control
+ 5, // .retry_count
+ 7 // .rnr_retry_count
+ };
+
+ ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
+ // It's badly documented, but it seems from the librdma source code that all the following
+ // event types have a valid param.conn
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ case RDMA_CM_EVENT_ESTABLISHED:
+ case RDMA_CM_EVENT_REJECTED:
+ case RDMA_CM_EVENT_DISCONNECTED:
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ return event->param.conn;
+ default:
+ ::rdma_conn_param p = {};
+ return p;
+ }
+ }
+
+ QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ pd(allocPd(i->verbs)),
+ cchannel(mkCChannel(i->verbs)),
+ scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+ rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+ outstandingSendEvents(0),
+ outstandingRecvEvents(0)
+ {
+ impl->fd = cchannel->fd;
+
+ // Set cq context to this QueuePair object so we can find
+ // ourselves again
+ scq->cq_context = this;
+ rcq->cq_context = this;
+
+ ::ibv_qp_init_attr qp_attr = {};
+
+ // TODO: make a default struct for this
+ qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES;
+ qp_attr.cap.max_send_sge = 4;
+ qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES;
+ qp_attr.cap.max_recv_sge = 4;
+
+ qp_attr.send_cq = scq.get();
+ qp_attr.recv_cq = rcq.get();
+ qp_attr.qp_type = IBV_QPT_RC;
+
+ CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr));
+ qp = boost::shared_ptr< ::ibv_qp >(i->qp, destroyQp);
+
+ // Set the qp context to this so we can find ourselves again
+ qp->qp_context = this;
+ }
+
+ QueuePair::~QueuePair() {
+ if (outstandingSendEvents > 0)
+ ::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
+ if (outstandingRecvEvents > 0)
+ ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
+
+ // Reset back pointer in case someone else has the qp
+ qp->qp_context = 0;
+ }
+
+ void QueuePair::postRecv(Buffer* buf) {
+ ::ibv_recv_wr rwr = {};
+ ::ibv_sge sge;
+
+ sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+ sge.length = buf->dataCount;
+ sge.lkey = buf->mr->lkey;
+
+ rwr.wr_id = reinterpret_cast<uint64_t>(buf);
+ rwr.sg_list = &sge;
+ rwr.num_sge = 1;
+
+ ::ibv_recv_wr* badrwr = 0;
+ CHECK_IBV(::ibv_post_recv(qp.get(), &rwr, &badrwr));
+ if (badrwr)
+ throw std::logic_error("ibv_post_recv(): Bad rwr");
+ }
+
+ void QueuePair::postSend(Buffer* buf) {
+ ::ibv_send_wr swr = {};
+ ::ibv_sge sge;
+
+ sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+ sge.length = buf->dataCount;
+ sge.lkey = buf->mr->lkey;
+
+ swr.wr_id = reinterpret_cast<uint64_t>(buf);
+ swr.opcode = IBV_WR_SEND;
+ swr.send_flags = IBV_SEND_SIGNALED;
+ swr.sg_list = &sge;
+ swr.num_sge = 1;
+
+ ::ibv_send_wr* badswr = 0;
+ CHECK_IBV(::ibv_post_send(qp.get(), &swr, &badswr));
+ if (badswr)
+ throw std::logic_error("ibv_post_send(): Bad swr");
+ }
+
+ void QueuePair::postSend(uint32_t imm, Buffer* buf) {
+ ::ibv_send_wr swr = {};
+ ::ibv_sge sge;
+
+ sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+ sge.length = buf->dataCount;
+ sge.lkey = buf->mr->lkey;
+ swr.send_flags = IBV_SEND_SIGNALED;
+
+ swr.wr_id = reinterpret_cast<uint64_t>(buf);
+ swr.imm_data = htonl(imm);
+ swr.opcode = IBV_WR_SEND_WITH_IMM;
+ swr.sg_list = &sge;
+ swr.num_sge = 1;
+
+ ::ibv_send_wr* badswr = 0;
+ CHECK_IBV(::ibv_post_send(qp.get(), &swr, &badswr));
+ if (badswr)
+ throw std::logic_error("ibv_post_send(): Bad swr");
+ }
+}
+
+std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
+# define CHECK_TYPE(t) case t: o << #t; break;
+ switch(t) {
+ CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
+ CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
+ CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
+ CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
+ CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
+ CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
+ CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
+ CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
+ CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
+ CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
+ CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
+ }
+# undef CHECK_TYPE
+ return o;
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h Wed Sep 10 23:16:19 2008
@@ -31,6 +31,8 @@
#include <fcntl.h>
+#include <netdb.h>
+
#include <vector>
#include <algorithm>
#include <iostream>
@@ -43,15 +45,7 @@
const int DEFAULT_BACKLOG = 100;
const int DEFAULT_CQ_ENTRIES = 256;
const int DEFAULT_WR_ENTRIES = 64;
- const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
- 0, // .private_data
- 0, // .private_data_len
- 4, // .responder_resources
- 4, // .initiator_depth
- 0, // .flow_control
- 5, // .retry_count
- 7 // .rnr_retry_count
- };
+ extern const ::rdma_conn_param DEFAULT_CONNECT_PARAM;
struct Buffer {
friend class QueuePair;
@@ -115,6 +109,14 @@
return dir != NONE;
}
+ bool immPresent() const {
+ return wc.wc_flags & IBV_WC_WITH_IMM;
+ }
+
+ uint32_t getImm() const {
+ return ntohl(wc.imm_data);
+ }
+
QueueDirection getDirection() const {
return dir;
}
@@ -137,15 +139,12 @@
// Wrapper for a queue pair - this has the functionality for
// putting buffers on the receive queue and for sending buffers
// to the other end of the connection.
- //
- // Currently QueuePairs are contained inside Connections and have no
- // separate lifetime
class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
boost::shared_ptr< ::ibv_pd > pd;
boost::shared_ptr< ::ibv_comp_channel > cchannel;
boost::shared_ptr< ::ibv_cq > scq;
boost::shared_ptr< ::ibv_cq > rcq;
- boost::shared_ptr< ::rdma_cm_id > id;
+ boost::shared_ptr< ::ibv_qp > qp;
int outstandingSendEvents;
int outstandingRecvEvents;
@@ -207,6 +206,7 @@
void postRecv(Buffer* buf);
void postSend(Buffer* buf);
+ void postSend(uint32_t imm, Buffer* buf);
void notifyRecv();
void notifySend();
};
@@ -233,14 +233,7 @@
return event->event;
}
- ::rdma_conn_param getConnectionParam() const {
- if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) {
- return event->param.conn;
- } else {
- ::rdma_conn_param p = {};
- return p;
- }
- }
+ ::rdma_conn_param getConnectionParam() const;
boost::intrusive_ptr<Connection> getConnection () const {
return id;
@@ -291,6 +284,11 @@
impl->fd = channel->fd;
}
+ ~Connection() {
+ // Reset the id context in case someone else has it
+ id->context = 0;
+ }
+
// Default destructor fine
void ensureQueuePair() {
@@ -445,52 +443,38 @@
return qp;
}
+
+ std::string getLocalName() const {
+ ::sockaddr* addr = ::rdma_get_local_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
+
+ std::string getPeerName() const {
+ ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
};
- inline QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
- qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
- pd(allocPd(i->verbs)),
- cchannel(mkCChannel(i->verbs)),
- scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
- rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
- id(i),
- outstandingSendEvents(0),
- outstandingRecvEvents(0)
- {
- impl->fd = cchannel->fd;
-
- // Set cq context to this QueuePair object so we can find
- // ourselves again
- scq->cq_context = this;
- rcq->cq_context = this;
-
- ::ibv_qp_init_attr qp_attr = {};
-
- // TODO: make a default struct for this
- qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES;
- qp_attr.cap.max_send_sge = 4;
- qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES;
- qp_attr.cap.max_recv_sge = 4;
-
- qp_attr.send_cq = scq.get();
- qp_attr.recv_cq = rcq.get();
- qp_attr.qp_type = IBV_QPT_RC;
-
- CHECK(::rdma_create_qp(id.get(), pd.get(), &qp_attr));
-
- // Set the qp context to this so we can find ourselves again
- id->qp->qp_context = this;
- }
-
- inline QueuePair::~QueuePair() {
- if (outstandingSendEvents > 0)
- ::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
- if (outstandingRecvEvents > 0)
- ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
-
- ::rdma_destroy_qp(id.get());
- }
-
inline void QueuePair::notifyRecv() {
CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
}
@@ -499,44 +483,6 @@
CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
}
- inline void QueuePair::postRecv(Buffer* buf) {
- ::ibv_recv_wr rwr = {};
- ::ibv_sge sge;
-
- sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
- sge.length = buf->dataCount;
- sge.lkey = buf->mr->lkey;
-
- rwr.wr_id = reinterpret_cast<uint64_t>(buf);
- rwr.sg_list = &sge;
- rwr.num_sge = 1;
-
- ::ibv_recv_wr* badrwr = 0;
- CHECK_IBV(::ibv_post_recv(id->qp, &rwr, &badrwr));
- if (badrwr)
- throw std::logic_error("ibv_post_recv(): Bad rwr");
- }
-
- inline void QueuePair::postSend(Buffer* buf) {
- ::ibv_send_wr swr = {};
- ::ibv_sge sge;
-
- sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
- sge.length = buf->dataCount;
- sge.lkey = buf->mr->lkey;
-
- swr.wr_id = reinterpret_cast<uint64_t>(buf);
- swr.opcode = IBV_WR_SEND;
- swr.send_flags = IBV_SEND_SIGNALED;
- swr.sg_list = &sge;
- swr.num_sge = 1;
-
- ::ibv_send_wr* badswr = 0;
- CHECK_IBV(::ibv_post_send(id->qp, &swr, &badswr));
- if (badswr)
- throw std::logic_error("ibv_post_send(): Bad swr");
- }
-
inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
Connection::find(e->id) : new Connection(e->id)),
@@ -545,26 +491,6 @@
{}
}
-inline std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
-# define CHECK_TYPE(t) case t: o << #t; break;
- switch(t) {
- CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
- CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
- CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
- CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
- CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
- CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
- CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
- CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
- CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
- CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
- CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
- CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
- CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
- CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
- }
-# undef CHECK_TYPE
- return o;
-}
+std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t);
#endif // RDMA_WRAP_H