You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/09/11 08:16:21 UTC

svn commit: r694143 - in /incubator/qpid/trunk/qpid/cpp: ./ src/ src/qpid/client/ src/qpid/sys/ src/qpid/sys/rdma/

Author: astitcher
Date: Wed Sep 10 23:16:19 2008
New Revision: 694143

URL: http://svn.apache.org/viewvc?rev=694143&view=rev
Log:
Implementation of AMQP over RDMA protocols (Infiniband)

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/configure.ac
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/acl.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h

Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Wed Sep 10 23:16:19 2008
@@ -278,7 +278,7 @@
      ;;
    esac],
   [
-   with_RDMA=no
+   with_RDMA=yes
    AC_CHECK_LIB([ibverbs],[ibv_create_qp],,[with_RDMA=no])
    AC_CHECK_LIB([rdmacm],[rdma_create_id],,[with_RDMA=no])
    AC_CHECK_HEADERS([infiniband/verbs.h],,[with_RDMA=no])

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Sep 10 23:16:19 2008
@@ -127,6 +127,7 @@
   qpid/sys/rdma/rdma_factories.cpp \
   qpid/sys/rdma/RdmaIO.cpp \
   qpid/sys/rdma/RdmaIO.h \
+  qpid/sys/rdma/rdma_wrap.cpp \
   qpid/sys/rdma/rdma_wrap.h
 librdmawrap_la_LIBADD = \
   libqpidcommon.la \
@@ -139,11 +140,31 @@
 librdmawrap_la_LDFLAGS = \
   -no-undefined
 
+rdma_la_SOURCES = \
+  qpid/sys/RdmaIOPlugin.cpp
+rdma_la_LIBADD = \
+  libqpidbroker.la \
+  librdmawrap.la
+rdma_la_LDFLAGS = $(PLUGINLDFLAGS)
+rdma_la_CXXFLAGS = \
+  $(AM_CXXFLAGS) -Wno-missing-field-initializers
+dmodule_LTLIBRARIES += \
+  rdma.la
+
+rdmaconnector_la_SOURCES = \
+  qpid/client/RdmaConnector.cpp
+rdmaconnector_la_LIBADD = \
+  libqpidclient.la \
+  librdmawrap.la
+rdmaconnector_la_LDFLAGS = $(PLUGINLDFLAGS)
+rdmaconnector_la_CXXFLAGS = \
+  $(AM_CXXFLAGS) -Wno-missing-field-initializers
+cmodule_LTLIBRARIES += \
+  rdmaconnector.la
+
 # RDMA test/sample programs
 noinst_PROGRAMS += RdmaServer RdmaClient
 RdmaServer_SOURCES = qpid/sys/rdma/RdmaServer.cpp
-RdmaServer_CXXFLAGS = \
-  $(AM_CXXFLAGS) -Wno-missing-field-initializers
 RdmaServer_LDADD = \
   librdmawrap.la libqpidcommon.la
 RdmaClient_SOURCES = qpid/sys/rdma/RdmaClient.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/acl.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/acl.mk?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/acl.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/acl.mk Wed Sep 10 23:16:19 2008
@@ -12,5 +12,5 @@
   qpid/acl/AclReader.cpp \
   qpid/acl/AclReader.h
 
-acl_la_LIBADD= libqpidbroker.la
+acl_la_LIBADD = libqpidbroker.la
 acl_la_LDFLAGS = $(PLUGINLDFLAGS)

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=694143&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Wed Sep 10 23:16:19 2008
@@ -0,0 +1,427 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connector.h"
+
+#include "Bounds.h"
+#include "ConnectionImpl.h"
+#include "ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/Msg.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+
+// This stuff needs to abstracted out of here to a platform specific file
+#include <netdb.h>
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::format;
+using boost::str;
+
+class RdmaConnector : public Connector, private sys::Runnable
+{
+    struct Buff;
+
+    /** Batch up frames for writing to aio. */
+    class Writer : public framing::FrameHandler {
+        typedef Rdma::Buffer BufferBase;
+        typedef std::deque<framing::AMQFrame> Frames;
+
+        const uint16_t maxFrameSize;
+        sys::Mutex lock;
+        Rdma::AsynchIO* aio;
+        BufferBase* buffer;
+        Frames frames;
+        size_t lastEof; // Position after last EOF in frames
+        framing::Buffer encode;
+        size_t framesEncoded;
+        std::string identifier;
+        Bounds* bounds;        
+        
+        void writeOne();
+        void newBuffer();
+
+      public:
+        
+        Writer(uint16_t maxFrameSize, Bounds*);
+        ~Writer();
+        void init(std::string id, Rdma::AsynchIO*);
+        void handle(framing::AMQFrame&);
+        void write(Rdma::AsynchIO&);
+    };
+    
+    const uint16_t maxFrameSize;
+    framing::ProtocolVersion version;
+    bool initiated;
+
+    sys::Mutex pollingLock;    
+    bool polling;
+    bool joined;
+
+    sys::ShutdownHandler* shutdownHandler;
+    framing::InputHandler* input;
+    framing::InitiationHandler* initialiser;
+    framing::OutputHandler* output;
+
+    Writer writer;
+    
+    sys::Thread receiver;
+
+    Rdma::AsynchIO* aio;
+    sys::Poller::shared_ptr poller;
+
+    ~RdmaConnector();
+
+    void run();
+    void handleClosed();
+    bool closeInternal();
+
+    // Callbacks
+    void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+    void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
+    void disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&);
+    void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+
+    void readbuff(Rdma::AsynchIO&, Rdma::Buffer*);
+    void writebuff(Rdma::AsynchIO&);
+    void writeDataBlock(const framing::AMQDataBlock& data);
+    void eof(Rdma::AsynchIO&);
+
+    std::string identifier;
+
+    ConnectionImpl* impl;
+    
+    void connect(const std::string& host, int port);
+    void close();
+    void send(framing::AMQFrame& frame);
+
+    void setInputHandler(framing::InputHandler* handler);
+    void setShutdownHandler(sys::ShutdownHandler* handler);
+    sys::ShutdownHandler* getShutdownHandler() const;
+    framing::OutputHandler* getOutputHandler();
+    const std::string& getIdentifier() const;
+
+public:
+    RdmaConnector(framing::ProtocolVersion pVersion,
+              const ConnectionSettings&, 
+              ConnectionImpl*);
+};
+
+// Static constructor which registers connector here
+namespace {
+    Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+        return new RdmaConnector(v, s, c);
+    }
+
+    struct StaticInit {
+        StaticInit() {
+            Connector::registerFactory("rdma", &create);
+            Connector::registerFactory("ib", &create);
+        };
+    } init;
+}
+
+
+RdmaConnector::RdmaConnector(ProtocolVersion ver,
+                     const ConnectionSettings& settings,
+                     ConnectionImpl* cimpl)
+    : maxFrameSize(settings.maxFrameSize),
+      version(ver), 
+      initiated(false),
+      polling(false),
+      joined(true),
+      shutdownHandler(0),
+      writer(maxFrameSize, cimpl),
+      aio(0),
+      impl(cimpl)
+{
+    QPID_LOG(debug, "RdmaConnector created for " << version);
+}
+
+RdmaConnector::~RdmaConnector() {
+    close();
+}
+
+void RdmaConnector::connect(const std::string& host, int port){
+    Mutex::ScopedLock l(pollingLock);
+    assert(!polling);
+    assert(joined);
+    poller = Poller::shared_ptr(new Poller);
+
+    // This stuff needs to abstracted out of here to a platform specific file
+    ::addrinfo *res;
+    ::addrinfo hints;
+    hints.ai_flags = 0;
+    hints.ai_family = AF_INET;
+    hints.ai_socktype = SOCK_STREAM;
+    hints.ai_protocol = 0;
+    int n = ::getaddrinfo(host.c_str(), boost::lexical_cast<std::string>(port).c_str(), &hints, &res);
+    if (n<0) {
+        throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+    }
+
+    Rdma::Connector* c = new Rdma::Connector(
+        *res->ai_addr,
+        Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
+        boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
+        boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
+        boost::bind(&RdmaConnector::disconnected, this, poller, _1),
+        boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
+    c->start(poller);
+
+    polling = true;
+    joined = false;
+    receiver = Thread(this);
+}
+
+// The following only gets run when connected
+void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
+    Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
+
+    aio = new Rdma::AsynchIO(ci->getQueuePair(),
+        cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES,
+        boost::bind(&RdmaConnector::readbuff, this, _1, _2),
+        boost::bind(&RdmaConnector::writebuff, this, _1),
+        0, // write buffers full
+        boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection
+    aio->start(poller);
+
+    identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName());
+    writer.init(identifier, aio);
+    ProtocolInitiation init(version);
+    writeDataBlock(init);
+}
+
+void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
+    QPID_LOG(trace, "Connection Error " << identifier);
+    eof(*aio);
+}
+
+void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) {
+    eof(*aio);
+}
+
+void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) {
+    QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
+    eof(*aio);
+}
+
+bool RdmaConnector::closeInternal() {
+    bool ret;
+    {
+    Mutex::ScopedLock l(pollingLock);
+    ret = polling;
+    if (polling) {
+        polling = false;
+        poller->shutdown();
+    }
+    if (joined || receiver.id() == Thread::current().id()) {
+        return ret;
+    }
+    joined = true;
+    }
+
+    receiver.join();
+    return ret;
+}
+        
+void RdmaConnector::close() {
+    closeInternal();
+}
+
+void RdmaConnector::setInputHandler(InputHandler* handler){
+    input = handler;
+}
+
+void RdmaConnector::setShutdownHandler(ShutdownHandler* handler){
+    shutdownHandler = handler;
+}
+
+OutputHandler* RdmaConnector::getOutputHandler(){ 
+    return this; 
+}
+
+sys::ShutdownHandler* RdmaConnector::getShutdownHandler() const {
+    return shutdownHandler;
+}
+
+const std::string& RdmaConnector::getIdentifier() const { 
+    return identifier;
+}
+
+void RdmaConnector::send(AMQFrame& frame) {
+    writer.handle(frame);
+}
+
+void RdmaConnector::handleClosed() {
+    if (closeInternal() && shutdownHandler)
+        shutdownHandler->shutdown();
+}
+
+RdmaConnector::Writer::Writer(uint16_t s, Bounds* b) :
+    maxFrameSize(s),
+    aio(0),
+    buffer(0), 
+    lastEof(0), 
+    bounds(b)
+{
+}
+
+RdmaConnector::Writer::~Writer() {
+    if (aio)
+        aio->returnBuffer(buffer);
+}
+
+void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) {
+    Mutex::ScopedLock l(lock);
+    identifier = id;
+    aio = a;
+    newBuffer();
+}
+void RdmaConnector::Writer::handle(framing::AMQFrame& frame) { 
+    Mutex::ScopedLock l(lock);
+    frames.push_back(frame);
+    // Don't bother to send anything unless we're at the end of a frameset (assembly in 0-10 terminology)
+    if (frame.getEof()) {
+        lastEof = frames.size();
+        QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
+        aio->notifyPendingWrite();
+    }
+    QPID_LOG(trace, "SENT " << identifier << ": " << frame);
+}
+
+void RdmaConnector::Writer::writeOne() {
+    assert(buffer);
+    QPID_LOG(trace, "Write buffer " << encode.getPosition()
+             << " bytes " << framesEncoded << " frames ");    
+    framesEncoded = 0;
+
+    buffer->dataStart = 0;
+    buffer->dataCount = encode.getPosition();
+    aio->queueWrite(buffer);
+    newBuffer();
+}
+
+void RdmaConnector::Writer::newBuffer() {
+    buffer = aio->getBuffer();
+    encode = framing::Buffer(buffer->bytes, buffer->byteCount);
+    framesEncoded = 0;
+}
+
+// Called in IO thread. (write idle routine)
+// This is NOT only called in response to previously calling notifyPendingWrite
+void RdmaConnector::Writer::write(Rdma::AsynchIO&) {
+    Mutex::ScopedLock l(lock);
+    assert(buffer);
+    // If nothing to do return immediately
+    if (lastEof==0)
+        return;
+    size_t bytesWritten = 0;
+    while (aio->writable() && !frames.empty()) {
+        const AMQFrame* frame = &frames.front();        
+        uint32_t size = frame->size();
+        while (size <= encode.available()) {
+            frame->encode(encode);
+            frames.pop_front();
+            ++framesEncoded;
+            bytesWritten += size;
+            if (frames.empty())
+                break;
+            frame = &frames.front();        
+            size = frame->size();
+        }
+        lastEof -= framesEncoded;
+        writeOne();
+    }
+    if (bounds) bounds->reduce(bytesWritten);
+}
+
+void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
+    framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+    if (!initiated) {
+        framing::ProtocolInitiation protocolInit;
+        if (protocolInit.decode(in)) {
+            //TODO: check the version is correct
+            QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+        }
+        initiated = true;
+    }
+    AMQFrame frame;
+    while(frame.decode(in)){
+        QPID_LOG(trace, "RECV " << identifier << ": " << frame);
+        input->received(frame);
+    }
+}
+
+void RdmaConnector::writebuff(Rdma::AsynchIO& aio_) {
+    writer.write(aio_);
+}
+
+void RdmaConnector::writeDataBlock(const AMQDataBlock& data) {
+    Rdma::Buffer* buff = aio->getBuffer();
+    framing::Buffer out(buff->bytes, buff->byteCount);
+    data.encode(out);
+    buff->dataCount = data.size();
+    aio->queueWrite(buff);
+}
+
+void RdmaConnector::eof(Rdma::AsynchIO&) {
+    handleClosed();
+}
+
+// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
+// will never be called
+void RdmaConnector::run(){
+    // Keep the connection impl in memory until run() completes.
+    //GRS: currently the ConnectionImpls destructor is where the Io thread is joined
+    //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
+    //assert(protect);
+    try {
+        Dispatcher d(poller);
+	
+        //aio->start(poller);
+        d.run();
+        //aio->queueForDeletion();
+    } catch (const std::exception& e) {
+        {
+        // We're no longer polling
+        Mutex::ScopedLock l(pollingLock);
+        polling = false;
+        }
+        QPID_LOG(error, e.what());
+        handleClosed();
+    }
+}
+
+
+}} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Wed Sep 10 23:16:19 2008
@@ -137,8 +137,8 @@
 void SessionImpl::detach() //call with lock held 
 {
     if (state == ATTACHED) {
-        proxy.detach(id.getName());
         setState(DETACHING);
+        proxy.detach(id.getName());
     }
 }
 
@@ -630,7 +630,8 @@
 inline void SessionImpl::waitFor(State s) //call with lock held
 {
     // We can be DETACHED at any time
-    state.waitFor(States(s, DETACHED));
+    if (s == DETACHED) state.waitFor(DETACHED);
+    else state.waitFor(States(s, DETACHED));
     check();
 }
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=694143&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Wed Sep 10 23:16:19 2008
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ProtocolFactory.h"
+
+#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/OutputControl.h"
+
+#include <boost/bind.hpp>
+#include <memory>
+
+#include <netdb.h>
+
+using std::auto_ptr;
+using std::string;
+using std::stringstream;
+
+namespace qpid {
+namespace sys {
+
+class RdmaIOHandler : public OutputControl {
+    Rdma::Connection::intrusive_ptr connection;
+    std::string identifier;
+    Rdma::AsynchIO* aio;
+    ConnectionCodec::Factory* factory;
+    ConnectionCodec* codec;
+    bool readError;
+    bool isClient;
+
+    void write(const framing::ProtocolInitiation&);
+
+  public:
+    RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, ConnectionCodec::Factory* f);
+    ~RdmaIOHandler();
+    void init(Rdma::AsynchIO* a);
+    void start(Poller::shared_ptr poller) {aio->start(poller);}
+
+    void setClient() { isClient = true; }
+
+    // Output side
+    void close();
+    void activateOutput();
+
+    // Input side
+    void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff);
+    
+    // Notifications
+    void full(Rdma::AsynchIO& aio);
+    void idle(Rdma::AsynchIO& aio);
+    void error(Rdma::AsynchIO& aio);    
+};
+
+RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::ConnectionCodec::Factory* f) :
+    connection(c),
+    identifier(c->getPeerName()),
+    factory(f),
+    codec(0),
+    readError(false),
+    isClient(false)
+{
+}
+
+void RdmaIOHandler::init(Rdma::AsynchIO* a) {
+    aio = a;
+}
+
+RdmaIOHandler::~RdmaIOHandler() {
+    if (codec)
+        codec->closed();
+    delete codec;
+    
+    aio->deferDelete();
+}
+
+void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
+{
+    QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");
+    Rdma::Buffer* buff = aio->getBuffer();
+    framing::Buffer out(buff->bytes, buff->byteCount);
+    data.encode(out);
+    buff->dataCount = data.size();
+    aio->queueWrite(buff);
+}
+
+void RdmaIOHandler::close() {
+    aio->queueWriteClose();
+}
+
+void RdmaIOHandler::activateOutput() {
+    aio->notifyPendingWrite();
+}
+
+void RdmaIOHandler::idle(Rdma::AsynchIO&) {
+    if (!aio->writable()) {
+        return;
+    }
+    if (isClient && codec == 0) {
+        codec = factory->create(*this, identifier);
+        write(framing::ProtocolInitiation(codec->getVersion()));
+        return;
+    }
+    if (codec == 0) return;
+    if (codec->canEncode()) {
+        // Try and get a queued buffer if not then construct new one
+        Rdma::Buffer* buff = aio->getBuffer();
+        size_t encoded=codec->encode(buff->bytes, buff->byteCount);
+        buff->dataCount = encoded;
+        aio->queueWrite(buff);
+    }
+    if (codec->isClosed())
+        aio->queueWriteClose();
+}
+
+void RdmaIOHandler::error(Rdma::AsynchIO&) {
+    close();
+}
+
+void RdmaIOHandler::full(Rdma::AsynchIO&) {
+    QPID_LOG(debug, "buffer full [" << identifier << "]");
+}
+
+// The logic here is subtly different from TCP as RDMA is message oriented
+// so we define that an RDMA message is a frame - in this case there is no putting back
+// of any message remainder - there shouldn't be any. And what we read here can't be
+// smaller than a frame
+void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
+    if (readError) {
+        return;
+    }
+    size_t decoded = 0;
+    if (codec) {                // Already initiated
+        try {
+            decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+        }catch(const std::exception& e){
+            QPID_LOG(error, e.what());
+            readError = true;
+            aio->queueWriteClose();
+        }
+    }else{
+        framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+        framing::ProtocolInitiation protocolInit;
+        if (protocolInit.decode(in)) {
+            decoded = in.getPosition();
+            QPID_LOG(debug, "RECV [" << identifier << "] INIT(" << protocolInit << ")");
+            try {
+                codec = factory->create(protocolInit.getVersion(), *this, identifier);
+                if (!codec) {
+                    //TODO: may still want to revise this...
+                    //send valid version header & close connection.
+                    write(framing::ProtocolInitiation(framing::highestProtocolVersion));
+                    readError = true;
+                    aio->queueWriteClose();                
+                }
+            } catch (const std::exception& e) {
+                QPID_LOG(error, e.what());
+                readError = true;
+                aio->queueWriteClose();
+            }
+        }
+    }
+}
+
+class RdmaIOProtocolFactory : public ProtocolFactory {
+    auto_ptr<Rdma::Listener> listener;
+    const uint16_t listeningPort;
+
+  public:
+    RdmaIOProtocolFactory(int16_t port, int backlog);
+    void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
+    void connect(Poller::shared_ptr, const string& host, int16_t port, ConnectionCodec::Factory*, ConnectFailedCallback);
+
+    uint16_t getPort() const;
+    string getHost() const;
+
+  private:
+    bool request(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
+    void established(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&);
+    void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
+    void connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
+    void disconnected(Rdma::Connection::intrusive_ptr&);
+    void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+};
+
+// Static instance to initialise plugin
+static class RdmaIOPlugin : public Plugin {
+    void earlyInitialize(Target&) {
+    }
+    
+    void initialize(Target& target) {
+        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+        // Only provide to a Broker
+        if (broker) {
+            const broker::Broker::Options& opts = broker->getOptions();
+            ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog));
+            QPID_LOG(info, "Listening on RDMA port " << protocol->getPort());
+            broker->registerProtocolFactory(protocol);
+        }
+    }
+} rdmaPlugin;
+
+RdmaIOProtocolFactory::RdmaIOProtocolFactory(int16_t port, int /*backlog*/) :
+    listeningPort(port)
+{}
+
+void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
+    RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
+    async->start(poller);
+}
+
+bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
+        ConnectionCodec::Factory* f) {
+    RdmaIOHandler* async = new RdmaIOHandler(ci, f);
+    Rdma::AsynchIO* aio =
+        new Rdma::AsynchIO(ci->getQueuePair(),
+            cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
+            boost::bind(&RdmaIOHandler::readbuff, async, _1, _2),
+            boost::bind(&RdmaIOHandler::idle, async, _1),
+            0, // boost::bind(&RdmaIOHandler::full, async, _1),
+            boost::bind(&RdmaIOHandler::error, async, _1));
+    async->init(aio);
+
+    // Record aio so we can get it back from a connection
+    ci->addContext(async);
+    return true;
+}
+
+void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
+}
+
+void RdmaIOProtocolFactory::disconnected(Rdma::Connection::intrusive_ptr& ci) {
+    // If we've got a connection already tear it down, otherwise ignore
+    RdmaIOHandler* async =  ci->getContext<RdmaIOHandler>();
+    if (async) {
+        async->close();
+    }
+    delete async;
+}
+
+uint16_t RdmaIOProtocolFactory::getPort() const {
+    return listeningPort; // Immutable no need for lock.
+}
+
+string RdmaIOProtocolFactory::getHost() const {
+    //return listener.getSockname();
+    return "";
+}
+
+void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+    ::sockaddr_in sin;
+
+    sin.sin_family = AF_INET;
+    sin.sin_port = htons(listeningPort);
+    sin.sin_addr.s_addr = INADDR_ANY;
+
+    listener.reset(
+        new Rdma::Listener((const sockaddr&)(sin), 
+            Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
+            boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1),
+            boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
+            boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
+            boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact)));
+                           
+    listener->start(poller);
+}
+
+// Only used for outgoing connections (in federation)
+void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
+}
+
+// Do the same as connection request and established but mark a client too
+void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
+        ConnectionCodec::Factory* f) {
+    (void) request(ci, cp, f);
+    RdmaIOHandler* async =  ci->getContext<RdmaIOHandler>();
+    async->setClient();
+    established(poller, ci);
+}
+ 
+void RdmaIOProtocolFactory::connect(
+    Poller::shared_ptr poller,
+    const std::string& host, int16_t p,
+    ConnectionCodec::Factory* f,
+    ConnectFailedCallback)
+{
+    ::addrinfo *res;
+    ::addrinfo hints = {};
+    hints.ai_family = AF_INET;
+    hints.ai_socktype = SOCK_STREAM;
+    stringstream ss; ss << p;
+    string port = ss.str();
+    int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
+    if (n<0) {
+        throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+    }
+
+    Rdma::Connector c(
+            *res->ai_addr,
+            Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES),
+            boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f),
+            boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
+            boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
+            boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2));
+}
+
+}} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp Wed Sep 10 23:16:19 2008
@@ -50,8 +50,6 @@
 int64_t sbytes = 0;
 int64_t rmsgs = 0;
 int64_t rbytes = 0;
-int64_t cmsgs = 0;
-int writable = true;
 
 int target = 1000000;
 int msgsize = 200;
@@ -62,17 +60,15 @@
 vector<char> testString;
 
 void write(Rdma::AsynchIO& aio) {
-    if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
-        while (writable) {
-            if (smsgs >= target)
-                return;
-            Rdma::Buffer* b = aio.getBuffer();
-            std::copy(testString.begin(), testString.end(), b->bytes);
-            b->dataCount = msgsize;
-            aio.queueWrite(b);
-            ++smsgs;
-            sbytes += b->byteCount;
-        }
+    while (aio.writable()) {
+        if (smsgs >= target)
+            return;
+        Rdma::Buffer* b = aio.getBuffer();
+        std::copy(testString.begin(), testString.end(), b->bytes);
+        b->dataCount = msgsize;
+        aio.queueWrite(b);
+        ++smsgs;
+        sbytes += msgsize;
     }
 }
 
@@ -82,39 +78,46 @@
 
 void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
     ++rmsgs;
-    rbytes += b->byteCount;
+    rbytes += b->dataCount;
 
     // When all messages have been recvd stop
     if (rmsgs < target) {
         write(aio);
     } else {
         fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
-        if (cmsgs >= target)
+        if (aio.incompletedWrites() == 0)
             p->shutdown();
     }
 }
 
-void full(Rdma::AsynchIO&) {
-    writable = false;
+void full(Rdma::AsynchIO& a, Rdma::Buffer* b) {
+    // Warn as we shouldn't get here anymore
+    cerr << "!";
+
+    // Don't need to keep buffer just adjust the counts
+    --smsgs;
+    sbytes -= b->dataCount;
+
+    // Give buffer back
+    a.returnBuffer(b);
 }
 
 void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
-    writable = true;
-    ++cmsgs;
     if (smsgs < target) {
         write(aio);
     } else {
         sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
-        if (rmsgs >= target && cmsgs >= target)
+        if (rmsgs >= target && aio.incompletedWrites() == 0)
             p->shutdown();
     }
 }
 
-void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
     cout << "Connected\n";
     Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
 
-    Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize,
+    Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(),
+        cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES,
         boost::bind(&data, poller, _1, _2),
         boost::bind(&idle, poller, _1),
         &full,
@@ -131,12 +134,12 @@
     p->shutdown();
 }
 
-void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ErrorType) {
     cout << "Connection error\n";
     p->shutdown();
 }
 
-void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
     cout << "Connection rejected\n";
     p->shutdown();
 }
@@ -164,7 +167,7 @@
     // Make a random message of that size
     testString.resize(msgsize);
     for (int i = 0; i < msgsize; ++i) {
-        testString[i] = 32 + rand() & 0x3f;
+        testString[i] = 32 + (rand() & 0x3f);
     }
 
     try {
@@ -173,10 +176,11 @@
 
         Rdma::Connector c(
             *res->ai_addr,
-            boost::bind(&connected, p, _1),
-            boost::bind(&connectionError, p, _1),
+            Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES),
+            boost::bind(&connected, p, _1, _2),
+            boost::bind(&connectionError, p, _1, _2),
             boost::bind(&disconnected, p, _1),
-            boost::bind(&rejected, p, _1));
+            boost::bind(&rejected, p, _1, _2));
 
         c.start(p);
         d.run();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Wed Sep 10 23:16:19 2008
@@ -20,13 +20,21 @@
  */
 #include "RdmaIO.h"
 
+#include "qpid/log/Statement.h"
+
+
 #include <iostream>
 #include <boost/bind.hpp>
 
+using qpid::sys::DispatchHandle;
+using qpid::sys::Poller;
+
 namespace Rdma {
     AsynchIO::AsynchIO(
             QueuePair::intrusive_ptr q,
-            int s,
+            int size,
+            int xCredit,
+            int rCount,
             ReadCallback rc,
             IdleCallback ic,
             FullCallback fc,
@@ -34,10 +42,15 @@
     ) :
         qp(q),
         dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
-        bufferSize(s),
-        recvBufferCount(DEFAULT_WR_ENTRIES),
-        xmitBufferCount(DEFAULT_WR_ENTRIES),
+        bufferSize(size),
+        recvCredit(0),
+        xmitCredit(xCredit),
+        recvBufferCount(rCount),
+        xmitBufferCount(xCredit),
         outstandingWrites(0),
+        closed(false),
+        deleting(false),
+        state(IDLE),
         readCallback(rc),
         idleCallback(ic),
         fullCallback(fc),
@@ -57,72 +70,232 @@
     }
 
     AsynchIO::~AsynchIO() {
+        // Warn if we are deleting whilst there are still unreclaimed write buffers
+        if ( outstandingWrites>0 )
+            QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
+
+        // Turn off callbacks (before doing the deletes)
+        dataHandle.stopWatch();
+        
         // The buffers ptr_deque automatically deletes all the buffers we've allocated
+        // TODO: It might turn out to be more efficient in high connection loads to reuse the
+        // buffers rather than having to reregister them all the time (this would be straightforward if all 
+        // connections haver the same buffer size and harder otherwise)
     }
 
     void AsynchIO::start(Poller::shared_ptr poller) {
         dataHandle.startWatch(poller);
     }
 
-    // TODO: Currently we don't prevent write buffer overrun we just advise
-    // when to stop writing.
-    void AsynchIO::queueWrite(Buffer* buff) {
-        qp->postSend(buff);
-        ++outstandingWrites;
-        if (outstandingWrites >= xmitBufferCount) {
-            fullCallback(*this);
+    // Mark for deletion/Delete this object when we have no outstanding writes
+    void AsynchIO::deferDelete() {
+        {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        if (outstandingWrites > 0 || state != IDLE) {
+            deleting = true;
+            return;
         }
+        state = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
+        }
+        delete this;
     }
 
-    void AsynchIO::notifyPendingWrite() {
-        // Just perform the idle callback (if possible)
-        if (outstandingWrites < xmitBufferCount) {
-            idleCallback(*this);
+    void AsynchIO::queueWrite(Buffer* buff) {
+        // Make sure we don't overrun our available buffers
+        // either at our end or the known available at the peers end
+        if (writable()) {
+            // TODO: We might want to batch up sending credit
+            if (recvCredit > 0) {
+                int creditSent = recvCredit & ~FlagsMask;
+                qp->postSend(creditSent, buff);
+                recvCredit -= creditSent;
+            } else {
+                qp->postSend(buff);
+            }
+            ++outstandingWrites;
+            --xmitCredit;
+        } else {
+            if (fullCallback) {
+                fullCallback(*this, buff);
+            } else {
+                QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away");
+                returnBuffer(buff);
+            }
         }
     }
 
+    // Mark now closed (so we don't accept any more writes or make any idle callbacks)
     void AsynchIO::queueWriteClose() {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        closed = true;
     }
 
-    Buffer* AsynchIO::getBuffer() {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
-        if (bufferQueue.empty()) {
-            Buffer* b = qp->createBuffer(bufferSize);
-            buffers.push_front(b);
-            b->dataCount = 0;
-            return b;
-        } else {
-            Buffer* b = bufferQueue.front();
-            bufferQueue.pop_front();
-            b->dataCount = 0;
-            b->dataStart = 0;
-            return b;
+    void AsynchIO::notifyPendingWrite() {
+        // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
+        // If we are then we just return as we know that  we will eventually do the idle callback anyway.
+        //
+        {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        // We can get here in any state (as the caller could be in any thread)
+        switch (state) {
+        case NOTIFY_WRITE:
+        case PENDING_NOTIFY:
+            // We only need to note a pending notify if we're already doing a notify as data processing
+            // is always followed by write notification processing
+            state = PENDING_NOTIFY;
+            return;
+        case PENDING_DATA:
+            return;
+        case DATA:
+            // Only need to return here as data processing will do the idleCallback itself anyway
+            return;
+        case IDLE:
+            state = NOTIFY_WRITE;
+            break;
+        case DELETED:
+            assert(state!=DELETED);
         }
+        }
+        
+        doWriteCallback();
 
+        // Keep track of what we need to do so that we can release the lock
+        enum {COMPLETION, NOTIFY} action;
+        {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        // If there was pending data whilst we were doing this, process it now
+        switch (state) {
+        case NOTIFY_WRITE:
+            state = IDLE;
+            return;
+        case PENDING_DATA:
+            action = COMPLETION;
+            break;
+        case PENDING_NOTIFY:
+            action = NOTIFY;
+            break;
+        default:
+            assert(state!=IDLE && state!=DATA && state!=DELETED);
+            return;
+        }
+        // Using NOTIFY_WRITE for both cases is a bit strange, but we're making sure we get the
+        // correct result if we reenter notifyPendingWrite(), in which case we want to
+        // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks
+        // not IDLE)
+        state = NOTIFY_WRITE;
+        }
+        do {
+            // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state
+            // so that we do need to process completions or notifications now
+            switch (action) {
+            case COMPLETION:
+                processCompletions();
+            case NOTIFY:
+                doWriteCallback();
+                break;
+            }
+            {
+            qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+            switch (state) {
+            case NOTIFY_WRITE:
+                state = IDLE;
+                goto exit;
+            case PENDING_DATA:
+                action = COMPLETION;
+                break;
+            case PENDING_NOTIFY:
+                action = NOTIFY;
+                break;
+            default:
+                assert(state!=IDLE && state!=DATA && state!=DELETED);
+                return;
+            }
+            state = NOTIFY_WRITE;
+            }
+        } while (true);
+    exit:
+        // If we just processed completions we might need to delete ourselves
+        if (action == COMPLETION && deleting && outstandingWrites == 0) {
+            delete this;
+        }
     }
 
-    void AsynchIO::dataEvent(DispatchHandle&) {
+    void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) {
+        // Keep track of writable notifications
+        {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        // We're already processing a notification
+        switch (state) {
+        case IDLE:
+            break;
+        default:
+            state = PENDING_DATA;
+            return;
+        }
+        // Can't get here in DATA state as that would violate the serialisation rules
+        assert( state==IDLE );
+        state  = DATA;
+        }
+
+        processCompletions();
+
+        {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+        assert( state==DATA );
+        state = NOTIFY_WRITE;
+        }
+
+        do {
+            doWriteCallback();
+
+            {
+            qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+            if ( state==NOTIFY_WRITE ) {
+                state = IDLE;
+                break;
+            }
+            // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
+            assert( state==PENDING_NOTIFY );
+            state = NOTIFY_WRITE;
+            }
+        } while (true);
+
+        // We might need to delete ourselves
+        if (deleting && outstandingWrites == 0) {
+            delete this;
+        }
+    }
+
+    void AsynchIO::processCompletions() {
         QueuePair::intrusive_ptr q = qp->getNextChannelEvent();
 
+        // Re-enable notification for queue:
+        // This needs to happen before we could do anything that could generate more work completion
+        // events (ie the callbacks etc. in the following).
+        // This can't make us reenter this code as the handle attached to the completion queue will still be
+        // disabled by the poller until we leave this code
+        qp->notifyRecv();
+        qp->notifySend();
+            
+        int recvEvents = 0;
+        int sendEvents = 0;
+
         // If no event do nothing
         if (!q)
             return;
 
         assert(q == qp);
 
-        // Re-enable notification for queue
-        qp->notifySend();
-        qp->notifyRecv();
-
         // Repeat until no more events
         do {
             QueuePairEvent e(qp->getNextEvent());
             if (!e)
-                return;
+                break;
 
             ::ibv_wc_status status = e.getEventStatus();
             if (status != IBV_WC_SUCCESS) {
                 errorCallback(*this);
+                // TODO: Probably need to flush queues at this point
                 return;
             }
 
@@ -131,46 +304,144 @@
             Buffer* b = e.getBuffer();
             QueueDirection dir = e.getDirection();
             if (dir == RECV) {
-                readCallback(*this, b);
+                ++recvEvents;
+
+                // Get our xmitCredit if it was sent
+                bool dataPresent = true;
+                if (e.immPresent() ) {
+                    xmitCredit += (e.getImm() & ~FlagsMask);
+                    dataPresent = ((e.getImm() & IgnoreData) == 0);
+                }
+                
+                // if there was no data sent then the message was only to update our credit
+                if ( dataPresent ) {
+                    readCallback(*this, b);
+                }
+
                 // At this point the buffer has been consumed so put it back on the recv queue
+                b->dataStart = 0;
+                b->dataCount = 0;
                 qp->postRecv(b);
+
+                // Received another message
+                ++recvCredit;
+                
+                // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently)
+                if (recvCredit > recvBufferCount/2) {
+                    // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message
+                    // but this is a little unlikely, as to get in this state we have to have received messages without sending any
+                    // for a while so its likely we've received an credit update from the far side.
+                    if (writable()) {
+                        Buffer* ob = getBuffer();
+                        // Have to send something as adapters hate it when you try to transfer 0 bytes
+                        *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit);
+                        ob->dataCount = sizeof(uint32_t);
+                        
+                        int creditSent = recvCredit & ~FlagsMask;
+                        qp->postSend(creditSent | IgnoreData, ob);
+                        recvCredit -= creditSent;
+                        ++outstandingWrites;
+                        --xmitCredit;
+                    } else {
+                        QPID_LOG(warning, "RDMA: qp=" << qp << ":  Unable to send unsolicited credit");
+                    }
+                }
             } else {
+                ++sendEvents;
                 {
                 qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
                 bufferQueue.push_front(b);
                 }
                 --outstandingWrites;
-                // TODO: maybe don't call idle unless we're low on write buffers
-                idleCallback(*this);
             }
         } while (true);
+
+        // Not sure if this is expected or not 
+        if (recvEvents == 0 && sendEvents == 0) {
+            QPID_LOG(debug, "RDMA: qp=" << qp << ":  Got channel event with no recv/send completions");
+        }
+    }
+
+    void AsynchIO::doWriteCallback() {
+        // TODO: maybe don't call idle unless we're low on write buffers
+        // Keep on calling the idle routine as long as we are writable and we got something to write last call
+        while (writable()) {
+            int xc = xmitCredit;
+            idleCallback(*this);
+            // Check whether we actually wrote anything
+            if (xmitCredit == xc) {
+                QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit);
+                return;
+            }
+        }
     }
 
+    Buffer* AsynchIO::getBuffer() {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
+        if (bufferQueue.empty()) {
+            Buffer* b = qp->createBuffer(bufferSize);
+            buffers.push_front(b);
+            return b;
+        } else {
+            Buffer* b = bufferQueue.front();
+            bufferQueue.pop_front();
+            b->dataCount = 0;
+            b->dataStart = 0;
+            return b;
+        }
+
+    }
+    
+    void AsynchIO::returnBuffer(Buffer* b) {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
+        bufferQueue.push_front(b);
+        b->dataCount = 0;
+        b->dataStart = 0;
+    }
+
+    ConnectionManager::ConnectionManager(
+        ErrorCallback errc,
+        DisconnectedCallback dc
+    ) :
+        ci(Connection::make()),
+        handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0),
+        errorCallback(errc),
+        disconnectedCallback(dc)
+    {
+        ci->nonblocking();
+    }
+    
+    void ConnectionManager::start(Poller::shared_ptr poller) {
+        startConnection(ci);
+        handle.startWatch(poller);
+    }
+
+    void ConnectionManager::event(DispatchHandle&) {
+        connectionEvent(ci);
+    }
+    
     Listener::Listener(
         const sockaddr& src,
-        ConnectedCallback cc,
+        const ConnectionParams& cp,
+        EstablishedCallback ec,
         ErrorCallback errc,
         DisconnectedCallback dc,
         ConnectionRequestCallback crc
     ) :
+        ConnectionManager(errc, dc),
         src_addr(src),
-        ci(Connection::make()),
-        handle(*ci, boost::bind(&Listener::connectionEvent, this, _1), 0, 0),
-        connectedCallback(cc),
-        errorCallback(errc),
-        disconnectedCallback(dc),
-        connectionRequestCallback(crc)
+        checkConnectionParams(cp),
+        connectionRequestCallback(crc),
+        establishedCallback(ec)
     {
-        ci->nonblocking();
     }
 
-    void Listener::start(Poller::shared_ptr poller) {
+    void Listener::startConnection(Connection::intrusive_ptr ci) {
         ci->bind(src_addr);
         ci->listen();
-        handle.startWatch(poller);
     }
 
-    void Listener::connectionEvent(DispatchHandle&) {
+    void Listener::connectionEvent(Connection::intrusive_ptr ci) {
         ConnectionEvent e(ci->getNextEvent());
 
         // If (for whatever reason) there was no event do nothing
@@ -181,65 +452,75 @@
         // you get from CONNECT_REQUEST has the same context info
         // as its parent listening rdma_cm_id
         ::rdma_cm_event_type eventType = e.getEventType();
+        ::rdma_conn_param conn_param = e.getConnectionParam();
         Rdma::Connection::intrusive_ptr id = e.getConnection();
 
         switch (eventType) {
         case RDMA_CM_EVENT_CONNECT_REQUEST: {
-            bool accept = true;
-            // Extract connection parameters and private data from event
-            ::rdma_conn_param conn_param = e.getConnectionParam();
+            // Make sure peer has sent params we can use
+            if (!conn_param.private_data || conn_param.private_data_len < sizeof(ConnectionParams)) {
+                id->reject();
+                break;
+            } 
+            ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data);
+
+            // Reject if requested msg size is bigger than we allow
+            if (cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize) {
+                id->reject(&checkConnectionParams);
+                break;
+            }
 
+            bool accept = true;
             if (connectionRequestCallback)
-                //TODO: pass private data to callback (and accept new private data for accept somehow)
-                accept = connectionRequestCallback(id);
+                accept = connectionRequestCallback(id, cp);
+
             if (accept) {
                 // Accept connection
-                id->accept(conn_param);
+                cp.initialXmitCredit = checkConnectionParams.initialXmitCredit;
+                id->accept(conn_param, &cp);
             } else {
-                //Reject connection
+                // Reject connection
                 id->reject();
             }
-
             break;
         }
         case RDMA_CM_EVENT_ESTABLISHED:
-            connectedCallback(id);
+            establishedCallback(id);
             break;
         case RDMA_CM_EVENT_DISCONNECTED:
             disconnectedCallback(id);
             break;
         case RDMA_CM_EVENT_CONNECT_ERROR:
-            errorCallback(id);
+            errorCallback(id, CONNECT_ERROR);
             break;
         default:
-            std::cerr << "Warning: unexpected response to listen - " << eventType << "\n";
+            // Unexpected response
+            errorCallback(id, UNKNOWN);
+            //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n";
         }
     }
 
     Connector::Connector(
         const sockaddr& dst,
+        const ConnectionParams& cp,
         ConnectedCallback cc,
         ErrorCallback errc,
         DisconnectedCallback dc,
         RejectedCallback rc
     ) :
+        ConnectionManager(errc, dc),
         dst_addr(dst),
-        ci(Connection::make()),
-        handle(*ci, boost::bind(&Connector::connectionEvent, this, _1), 0, 0),
-        connectedCallback(cc),
-        errorCallback(errc),
-        disconnectedCallback(dc),
-        rejectedCallback(rc)
+        connectionParams(cp),
+        rejectedCallback(rc),
+        connectedCallback(cc)
     {
-        ci->nonblocking();
     }
 
-    void Connector::start(Poller::shared_ptr poller) {
+    void Connector::startConnection(Connection::intrusive_ptr ci) {
         ci->resolve_addr(dst_addr);
-        handle.startWatch(poller);
     }
 
-    void Connector::connectionEvent(DispatchHandle&) {
+    void Connector::connectionEvent(Connection::intrusive_ptr ci) {
         ConnectionEvent e(ci->getNextEvent());
 
         // If (for whatever reason) there was no event do nothing
@@ -247,6 +528,8 @@
             return;
 
         ::rdma_cm_event_type eventType = e.getEventType();
+        ::rdma_conn_param conn_param = e.getConnectionParam();
+        Rdma::Connection::intrusive_ptr id = e.getConnection();
         switch (eventType) {
         case RDMA_CM_EVENT_ADDR_RESOLVED:
             // RESOLVE_ADDR
@@ -254,38 +537,46 @@
             break;
         case RDMA_CM_EVENT_ADDR_ERROR:
             // RESOLVE_ADDR
-            errorCallback(ci);
+            errorCallback(ci, ADDR_ERROR);
             break;
         case RDMA_CM_EVENT_ROUTE_RESOLVED:
             // RESOLVE_ROUTE:
-            ci->connect();
+            ci->connect(&connectionParams);
             break;
         case RDMA_CM_EVENT_ROUTE_ERROR:
             // RESOLVE_ROUTE:
-            errorCallback(ci);
+            errorCallback(ci, ROUTE_ERROR);
             break;
         case RDMA_CM_EVENT_CONNECT_ERROR:
             // CONNECTING
-            errorCallback(ci);
+            errorCallback(ci, CONNECT_ERROR);
             break;
         case RDMA_CM_EVENT_UNREACHABLE:
             // CONNECTING
-            errorCallback(ci);
+            errorCallback(ci, UNREACHABLE);
             break;
-        case RDMA_CM_EVENT_REJECTED:
+        case RDMA_CM_EVENT_REJECTED: {
             // CONNECTING
-            rejectedCallback(ci);
+            // Extract private data from event
+            assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams));
+            ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data);
+            rejectedCallback(ci, cp);
             break;
-        case RDMA_CM_EVENT_ESTABLISHED:
+        }
+        case RDMA_CM_EVENT_ESTABLISHED: {
             // CONNECTING
-            connectedCallback(ci);
+            // Extract private data from event
+            assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams));
+            ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data);
+            connectedCallback(ci, cp);
             break;
+        }
         case RDMA_CM_EVENT_DISCONNECTED:
             // ESTABLISHED
             disconnectedCallback(ci);
             break;
         default:
-            std::cerr << "Warning: unexpected event in connect: " << eventType << "\n";
+            QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType);
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Wed Sep 10 23:16:19 2008
@@ -32,32 +32,29 @@
 #include <boost/ptr_container/ptr_deque.hpp>
 #include <deque>
 
-using qpid::sys::DispatchHandle;
-using qpid::sys::Poller;
-
 namespace Rdma {
 
     class Connection;
 
-    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ConnectedCallback;
-    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ErrorCallback;
-    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback;
-    typedef boost::function1<bool, Rdma::Connection::intrusive_ptr&> ConnectionRequestCallback;
-    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> RejectedCallback;
-
     class AsynchIO
     {
         typedef boost::function1<void, AsynchIO&> ErrorCallback;
         typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
         typedef boost::function1<void, AsynchIO&>  IdleCallback;
-        typedef boost::function1<void, AsynchIO&>  FullCallback;
+        typedef boost::function2<void, AsynchIO&, Buffer*>  FullCallback;
 
         QueuePair::intrusive_ptr qp;
-        DispatchHandle dataHandle;
+        qpid::sys::DispatchHandleRef dataHandle;
         int bufferSize;
+        int recvCredit;
+        int xmitCredit;
         int recvBufferCount;
         int xmitBufferCount;
         int outstandingWrites;
+        bool closed; // TODO: Perhaps (probably) this state can be merged with the following...
+        bool deleting; // TODO: Perhaps (probably) this state can be merged with the following...
+        enum { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED } state;
+        qpid::sys::Mutex stateLock;
         std::deque<Buffer*> bufferQueue;
         qpid::sys::Mutex bufferQueueLock;
         boost::ptr_deque<Buffer> buffers;
@@ -70,70 +67,145 @@
     public:
         AsynchIO(
             QueuePair::intrusive_ptr q,
-            int s,
+            int size,
+            int xCredit,
+            int rCount,
             ReadCallback rc,
             IdleCallback ic,
             FullCallback fc,
             ErrorCallback ec
         );
-        ~AsynchIO();
 
-        void start(Poller::shared_ptr poller);
+        void start(qpid::sys::Poller::shared_ptr poller);
+        bool writable() const;
         void queueWrite(Buffer* buff);
         void notifyPendingWrite();
         void queueWriteClose();
+        void deferDelete();
+        int incompletedWrites() const;
         Buffer* getBuffer();
+        void returnBuffer(Buffer*);
 
     private:
-        void dataEvent(DispatchHandle& handle);
+        // Don't let anyone else delete us to make sure there can't be outstanding callbacks
+        ~AsynchIO();
+
+        // Constants for the peer-peer command messages
+        // These are sent in the high bits if the imm data of an rdma message
+        // The low bits are used to send the credit
+        const static int FlagsMask = 0x10000000; // Mask for all flag bits - be sure to update this if you add more command bits
+        const static int IgnoreData = 0x10000000; // Message contains no application data
+
+        void dataEvent(qpid::sys::DispatchHandle& handle);
+        void processCompletions();
+        void doWriteCallback();
     };
 
-    class Listener
-    {
-        sockaddr src_addr;
+    inline bool AsynchIO::writable() const {
+        return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0);
+    }
+    
+    inline int AsynchIO::incompletedWrites() const {
+        return outstandingWrites;
+    }
+
+    // These are the parameters necessary to start the conversation
+    // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer
+    // * Each peer HAS to know the initial "credit" it has for transmitting to its peer 
+    struct ConnectionParams {
+        int maxRecvBufferSize;
+        int initialXmitCredit ;
+        
+        ConnectionParams(int s, int c) :
+            maxRecvBufferSize(s),
+            initialXmitCredit(c)
+        {}
+    };
+
+    enum ErrorType {
+        ADDR_ERROR,
+        ROUTE_ERROR,
+        CONNECT_ERROR,
+        UNREACHABLE,
+        UNKNOWN
+    };
+
+    typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, ErrorType> ErrorCallback;
+    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback;
+    
+    class ConnectionManager {
         Connection::intrusive_ptr ci;
-        DispatchHandle handle;
-        ConnectedCallback connectedCallback;
+        qpid::sys::DispatchHandle handle;
+
+    protected:
         ErrorCallback errorCallback;
         DisconnectedCallback disconnectedCallback;
+   
+   public:
+        ConnectionManager(
+            ErrorCallback errc,
+            DisconnectedCallback dc
+        );
+        
+        virtual ~ConnectionManager() {}
+
+        void start(qpid::sys::Poller::shared_ptr poller);
+
+    private:
+        void event(qpid::sys::DispatchHandle& handle);
+
+        virtual void startConnection(Connection::intrusive_ptr ci) = 0;
+        virtual void connectionEvent(Connection::intrusive_ptr ci) = 0;
+    };
+    
+    typedef boost::function2<bool, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectionRequestCallback;
+    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> EstablishedCallback;
+
+    class Listener : public ConnectionManager
+    {
+        sockaddr src_addr;
+        ConnectionParams checkConnectionParams;
         ConnectionRequestCallback connectionRequestCallback;
+        EstablishedCallback establishedCallback;
 
     public:
         Listener(
             const sockaddr& src,
-            ConnectedCallback cc,
+            const ConnectionParams& cp,
+            EstablishedCallback ec,
             ErrorCallback errc,
             DisconnectedCallback dc,
             ConnectionRequestCallback crc = 0
         );
-        void start(Poller::shared_ptr poller);
 
     private:
-        void connectionEvent(DispatchHandle& handle);
+        void startConnection(Connection::intrusive_ptr ci);
+        void connectionEvent(Connection::intrusive_ptr ci);
     };
 
-    class Connector
+    typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> RejectedCallback;
+    typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectedCallback;
+
+    class Connector : public ConnectionManager
     {
         sockaddr dst_addr;
-        Connection::intrusive_ptr ci;
-        DispatchHandle handle;
-        ConnectedCallback connectedCallback;
-        ErrorCallback errorCallback;
-        DisconnectedCallback disconnectedCallback;
+        ConnectionParams connectionParams;
         RejectedCallback rejectedCallback;
+        ConnectedCallback connectedCallback;
 
     public:
         Connector(
             const sockaddr& dst,
+            const ConnectionParams& cp,
             ConnectedCallback cc,
             ErrorCallback errc,
             DisconnectedCallback dc,
             RejectedCallback rc = 0
         );
-        void start(Poller::shared_ptr poller);
 
     private:
-        void connectionEvent(DispatchHandle& handle);
+        void startConnection(Connection::intrusive_ptr ci);
+        void connectionEvent(Connection::intrusive_ptr ci);
     };
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp Wed Sep 10 23:16:19 2008
@@ -42,12 +42,10 @@
 struct ConRec {
     Rdma::Connection::intrusive_ptr connection;
     Rdma::AsynchIO* data;
-    bool writable;
     queue<Rdma::Buffer*> queuedWrites;
 
     ConRec(Rdma::Connection::intrusive_ptr c) :
-        connection(c),
-        writable(true)
+        connection(c)
     {}
 };
 
@@ -55,50 +53,53 @@
     cout << "Data error:\n";
 }
 
+void idle(ConRec* cr, Rdma::AsynchIO& a) {
+    // Need to make sure full is not called as it would reorder messages
+    while (!cr->queuedWrites.empty() && a.writable()) {
+        Rdma::Buffer* buf = cr->queuedWrites.front();
+        cr->queuedWrites.pop();
+        a.queueWrite(buf);
+    }
+}
+
 void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
     // Echo data back
     Rdma::Buffer* buf = a.getBuffer();
     std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes);
     buf->dataCount = b->dataCount;
-    if (cr->queuedWrites.empty() && cr->writable) {
+    if (cr->queuedWrites.empty()) {
+        // If can't write then full will be called and push buffer on back of queue
         a.queueWrite(buf);
     } else {
         cr->queuedWrites.push(buf);
+        // Try to empty queue
+        idle(cr, a);
     }
 }
 
-void full(ConRec* cr, Rdma::AsynchIO&) {
-    cr->writable = false;
-}
-
-void idle(ConRec* cr, Rdma::AsynchIO& a) {
-    cr->writable = true;
-    while (!cr->queuedWrites.empty() && cr->writable) {
-        Rdma::Buffer* buf = cr->queuedWrites.front();
-        cr->queuedWrites.pop();
-        a.queueWrite(buf);
-    }
+void full(ConRec* cr, Rdma::AsynchIO&, Rdma::Buffer* buf) {
+    cr->queuedWrites.push(buf);
 }
 
 void disconnected(Rdma::Connection::intrusive_ptr& ci) {
     ConRec* cr = ci->getContext<ConRec>();
     cr->connection->disconnect();
-    delete cr->data;
+    cr->data->queueWriteClose();
     delete cr;
     cout << "Disconnected: " << cr << "\n";
 }
 
-void connectionError(Rdma::Connection::intrusive_ptr& ci) {
+void connectionError(Rdma::Connection::intrusive_ptr& ci, Rdma::ErrorType) {
     ConRec* cr = ci->getContext<ConRec>();
     cr->connection->disconnect();
     if (cr) {
-        delete cr->data;
+        cr->data->queueWriteClose();
         delete cr;
     }
     cout << "Connection error: " << cr << "\n";
 }
 
-bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
+bool connectionRequest(Rdma::Connection::intrusive_ptr& ci,  const Rdma::ConnectionParams& cp) {
     cout << "Incoming connection: ";
 
     // For fun reject alternate connection attempts
@@ -109,10 +110,11 @@
     if (x) {
         ConRec* cr = new ConRec(ci);
         Rdma::AsynchIO* aio =
-            new Rdma::AsynchIO(ci->getQueuePair(), 8000,
+            new Rdma::AsynchIO(ci->getQueuePair(),
+                cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
                 boost::bind(data, cr, _1, _2),
                 boost::bind(idle, cr, _1),
-                boost::bind(full, cr, _1),
+                boost::bind(full, cr, _1, _2),
                 dataError);
         ci->addContext(cr);
         cr->data = aio;
@@ -149,6 +151,7 @@
         Dispatcher d(p);
 
         Rdma::Listener a((const sockaddr&)(sin),
+            Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES),
             boost::bind(connected, p, _1),
             connectionError,
             disconnected,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp Wed Sep 10 23:16:19 2008
@@ -56,4 +56,9 @@
             (void) ::ibv_destroy_cq(cq);
     }
 
+    void destroyQp(::ibv_qp* qp) throw () {
+        if (qp)
+            (void) ::ibv_destroy_qp(qp);
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h Wed Sep 10 23:16:19 2008
@@ -35,6 +35,7 @@
     void deallocPd(::ibv_pd* p) throw ();
     void destroyCChannel(::ibv_comp_channel* c) throw ();
     void destroyCq(::ibv_cq* cq) throw ();
+    void destroyQp(::ibv_qp* qp) throw ();
 
     inline boost::shared_ptr< ::rdma_event_channel > mkEChannel() {
         return

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp?rev=694143&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp Wed Sep 10 23:16:19 2008
@@ -0,0 +1,155 @@
+#include "rdma_wrap.h"
+
+namespace Rdma {
+    const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
+        0,    // .private_data
+        0,    // .private_data_len
+        4,    // .responder_resources
+        4,    // .initiator_depth
+        0,    // .flow_control
+        5,    // .retry_count
+        7     // .rnr_retry_count
+    };
+
+    ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
+        // It's badly documented, but it seems from the librdma source code that all the following
+        // event types have a valid param.conn
+        switch (event->event) {
+        case RDMA_CM_EVENT_CONNECT_REQUEST:
+        case RDMA_CM_EVENT_ESTABLISHED:
+        case RDMA_CM_EVENT_REJECTED:
+        case RDMA_CM_EVENT_DISCONNECTED:
+        case RDMA_CM_EVENT_CONNECT_ERROR:
+            return event->param.conn;
+        default:
+            ::rdma_conn_param p = {};
+            return p;
+        }
+    }
+
+    QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
+        qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+        pd(allocPd(i->verbs)),
+        cchannel(mkCChannel(i->verbs)),
+        scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+        rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+        outstandingSendEvents(0),
+        outstandingRecvEvents(0)
+    {
+        impl->fd = cchannel->fd;
+
+        // Set cq context to this QueuePair object so we can find
+        // ourselves again
+        scq->cq_context = this;
+        rcq->cq_context = this;
+
+        ::ibv_qp_init_attr qp_attr = {};
+
+        // TODO: make a default struct for this
+        qp_attr.cap.max_send_wr  = DEFAULT_WR_ENTRIES;
+        qp_attr.cap.max_send_sge = 4;
+        qp_attr.cap.max_recv_wr  = DEFAULT_WR_ENTRIES;
+        qp_attr.cap.max_recv_sge = 4;
+
+        qp_attr.send_cq      = scq.get();
+        qp_attr.recv_cq      = rcq.get();
+        qp_attr.qp_type      = IBV_QPT_RC;
+
+        CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr));
+        qp = boost::shared_ptr< ::ibv_qp >(i->qp, destroyQp);
+
+        // Set the qp context to this so we can find ourselves again
+        qp->qp_context = this;
+    }
+
+    QueuePair::~QueuePair() {
+        if (outstandingSendEvents > 0)
+            ::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
+        if (outstandingRecvEvents > 0)
+            ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
+
+        // Reset back pointer in case someone else has the qp
+        qp->qp_context = 0;
+    }
+
+    void QueuePair::postRecv(Buffer* buf) {
+        ::ibv_recv_wr rwr = {};
+        ::ibv_sge sge;
+
+        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+        sge.length = buf->dataCount;
+        sge.lkey = buf->mr->lkey;
+
+        rwr.wr_id = reinterpret_cast<uint64_t>(buf);
+        rwr.sg_list = &sge;
+        rwr.num_sge = 1;
+
+        ::ibv_recv_wr* badrwr = 0;
+        CHECK_IBV(::ibv_post_recv(qp.get(), &rwr, &badrwr));
+        if (badrwr)
+            throw std::logic_error("ibv_post_recv(): Bad rwr");
+    }
+
+    void QueuePair::postSend(Buffer* buf) {
+        ::ibv_send_wr swr = {};
+        ::ibv_sge sge;
+
+        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+        sge.length = buf->dataCount;
+        sge.lkey = buf->mr->lkey;
+
+        swr.wr_id = reinterpret_cast<uint64_t>(buf);
+        swr.opcode = IBV_WR_SEND;
+        swr.send_flags = IBV_SEND_SIGNALED;
+        swr.sg_list = &sge;
+        swr.num_sge = 1;
+
+        ::ibv_send_wr* badswr = 0;
+        CHECK_IBV(::ibv_post_send(qp.get(), &swr, &badswr));
+        if (badswr)
+            throw std::logic_error("ibv_post_send(): Bad swr");
+    }
+
+    void QueuePair::postSend(uint32_t imm, Buffer* buf) {
+        ::ibv_send_wr swr = {};
+        ::ibv_sge sge;
+
+        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+        sge.length = buf->dataCount;
+        sge.lkey = buf->mr->lkey;
+        swr.send_flags = IBV_SEND_SIGNALED;        
+
+        swr.wr_id = reinterpret_cast<uint64_t>(buf);
+        swr.imm_data = htonl(imm);
+        swr.opcode = IBV_WR_SEND_WITH_IMM;
+        swr.sg_list = &sge;
+        swr.num_sge = 1;
+
+        ::ibv_send_wr* badswr = 0;
+        CHECK_IBV(::ibv_post_send(qp.get(), &swr, &badswr));
+        if (badswr)
+            throw std::logic_error("ibv_post_send(): Bad swr");
+    }
+}
+
+std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
+#   define CHECK_TYPE(t) case t: o << #t; break;
+    switch(t) {
+        CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
+        CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
+        CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
+        CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
+        CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
+        CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
+        CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
+        CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
+        CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
+        CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
+        CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
+        CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
+        CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
+        CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
+    }
+#   undef CHECK_TYPE
+    return o;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=694143&r1=694142&r2=694143&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h Wed Sep 10 23:16:19 2008
@@ -31,6 +31,8 @@
 
 #include <fcntl.h>
 
+#include <netdb.h>
+
 #include <vector>
 #include <algorithm>
 #include <iostream>
@@ -43,15 +45,7 @@
     const int DEFAULT_BACKLOG = 100;
     const int DEFAULT_CQ_ENTRIES = 256;
     const int DEFAULT_WR_ENTRIES = 64;
-    const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
-        0,    // .private_data
-        0,    // .private_data_len
-        4,    // .responder_resources
-        4,    // .initiator_depth
-        0,    // .flow_control
-        5,    // .retry_count
-        7     // .rnr_retry_count
-    };
+    extern const ::rdma_conn_param DEFAULT_CONNECT_PARAM;
 
     struct Buffer {
         friend class QueuePair;
@@ -115,6 +109,14 @@
             return dir != NONE;
         }
 
+        bool immPresent() const {
+            return wc.wc_flags & IBV_WC_WITH_IMM;
+        }
+        
+        uint32_t getImm() const {
+            return ntohl(wc.imm_data);
+        }
+
         QueueDirection getDirection() const {
             return dir;
         }
@@ -137,15 +139,12 @@
     // Wrapper for a queue pair - this has the functionality for
     // putting buffers on the receive queue and for sending buffers
     // to the other end of the connection.
-    //
-    // Currently QueuePairs are contained inside Connections and have no
-    // separate lifetime
     class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
         boost::shared_ptr< ::ibv_pd > pd;
         boost::shared_ptr< ::ibv_comp_channel > cchannel;
         boost::shared_ptr< ::ibv_cq > scq;
         boost::shared_ptr< ::ibv_cq > rcq;
-        boost::shared_ptr< ::rdma_cm_id > id;
+        boost::shared_ptr< ::ibv_qp > qp;
         int outstandingSendEvents;
         int outstandingRecvEvents;
 
@@ -207,6 +206,7 @@
 
         void postRecv(Buffer* buf);
         void postSend(Buffer* buf);
+        void postSend(uint32_t imm, Buffer* buf);
         void notifyRecv();
         void notifySend();
     };
@@ -233,14 +233,7 @@
             return event->event;
         }
 
-        ::rdma_conn_param getConnectionParam() const {
-            if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) {
-                return event->param.conn;
-            } else {
-                ::rdma_conn_param p = {};
-                return p;
-            }
-        }
+        ::rdma_conn_param getConnectionParam() const;
 
         boost::intrusive_ptr<Connection> getConnection () const {
             return id;
@@ -291,6 +284,11 @@
             impl->fd = channel->fd;
 	}
 
+        ~Connection() {
+            // Reset the id context in case someone else has it
+            id->context = 0;
+        }
+
         // Default destructor fine
 
         void ensureQueuePair() {
@@ -445,52 +443,38 @@
 
             return qp;
         }
+        
+        std::string getLocalName() const {
+            ::sockaddr* addr = ::rdma_get_local_addr(id.get());
+            char hostName[NI_MAXHOST];
+            char portName[NI_MAXSERV];
+            CHECK_IBV(::getnameinfo(
+                addr, sizeof(::sockaddr_storage),
+                hostName, sizeof(hostName),
+                portName, sizeof(portName),
+                NI_NUMERICHOST | NI_NUMERICSERV));
+            std::string r(hostName);
+            r += ":";
+            r += portName;
+            return r;
+        }
+        
+        std::string getPeerName() const {
+            ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
+            char hostName[NI_MAXHOST];
+            char portName[NI_MAXSERV];
+            CHECK_IBV(::getnameinfo(
+                addr, sizeof(::sockaddr_storage),
+                hostName, sizeof(hostName),
+                portName, sizeof(portName),
+                NI_NUMERICHOST | NI_NUMERICSERV));
+            std::string r(hostName);
+            r += ":";
+            r += portName;
+            return r;
+        }
     };
 
-    inline QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
-        qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
-        pd(allocPd(i->verbs)),
-        cchannel(mkCChannel(i->verbs)),
-        scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
-        rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
-        id(i),
-        outstandingSendEvents(0),
-        outstandingRecvEvents(0)
-    {
-        impl->fd = cchannel->fd;
-
-        // Set cq context to this QueuePair object so we can find
-        // ourselves again
-        scq->cq_context = this;
-        rcq->cq_context = this;
-
-        ::ibv_qp_init_attr qp_attr = {};
-
-        // TODO: make a default struct for this
-        qp_attr.cap.max_send_wr  = DEFAULT_WR_ENTRIES;
-        qp_attr.cap.max_send_sge = 4;
-        qp_attr.cap.max_recv_wr  = DEFAULT_WR_ENTRIES;
-        qp_attr.cap.max_recv_sge = 4;
-
-        qp_attr.send_cq      = scq.get();
-        qp_attr.recv_cq      = rcq.get();
-        qp_attr.qp_type      = IBV_QPT_RC;
-
-        CHECK(::rdma_create_qp(id.get(), pd.get(), &qp_attr));
-
-        // Set the qp context to this so we can find ourselves again
-        id->qp->qp_context = this;
-    }
-
-    inline QueuePair::~QueuePair() {
-        if (outstandingSendEvents > 0)
-            ::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
-        if (outstandingRecvEvents > 0)
-            ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
-
-        ::rdma_destroy_qp(id.get());
-    }
-
     inline void QueuePair::notifyRecv() {
         CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
     }
@@ -499,44 +483,6 @@
         CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
     }
 
-    inline void QueuePair::postRecv(Buffer* buf) {
-        ::ibv_recv_wr rwr = {};
-        ::ibv_sge sge;
-
-        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
-        sge.length = buf->dataCount;
-        sge.lkey = buf->mr->lkey;
-
-        rwr.wr_id = reinterpret_cast<uint64_t>(buf);
-        rwr.sg_list = &sge;
-        rwr.num_sge = 1;
-
-        ::ibv_recv_wr* badrwr = 0;
-        CHECK_IBV(::ibv_post_recv(id->qp, &rwr, &badrwr));
-        if (badrwr)
-            throw std::logic_error("ibv_post_recv(): Bad rwr");
-    }
-
-    inline void QueuePair::postSend(Buffer* buf) {
-        ::ibv_send_wr swr = {};
-        ::ibv_sge sge;
-
-        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
-        sge.length = buf->dataCount;
-        sge.lkey = buf->mr->lkey;
-
-        swr.wr_id = reinterpret_cast<uint64_t>(buf);
-        swr.opcode = IBV_WR_SEND;
-        swr.send_flags = IBV_SEND_SIGNALED;
-        swr.sg_list = &sge;
-        swr.num_sge = 1;
-
-        ::ibv_send_wr* badswr = 0;
-        CHECK_IBV(::ibv_post_send(id->qp, &swr, &badswr));
-        if (badswr)
-            throw std::logic_error("ibv_post_send(): Bad swr");
-    }
-
     inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
         id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
                 Connection::find(e->id) : new Connection(e->id)),
@@ -545,26 +491,6 @@
     {}
 }
 
-inline std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
-#   define CHECK_TYPE(t) case t: o << #t; break;
-    switch(t) {
-        CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
-        CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
-        CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
-        CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
-        CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
-        CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
-        CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
-        CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
-        CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
-        CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
-        CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
-        CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
-        CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
-        CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
-    }
-#   undef CHECK_TYPE
-    return o;
-}
+std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t);
 
 #endif // RDMA_WRAP_H