You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/10/28 23:02:53 UTC

svn commit: r708696 - /incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp

Author: astitcher
Date: Tue Oct 28 15:02:53 2008
New Revision: 708696

URL: http://svn.apache.org/viewvc?rev=708696&view=rev
Log:
Make federation work over Rdma links
- Some refactoring of Rdma code for simplicity

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=708696&r1=708695&r2=708696&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Oct 28 15:02:53 2008
@@ -47,7 +47,6 @@
     ConnectionCodec::Factory* factory;
     ConnectionCodec* codec;
     bool readError;
-    bool isClient;
 
     void write(const framing::ProtocolInitiation&);
 
@@ -57,14 +56,14 @@
     void init(Rdma::AsynchIO* a);
     void start(Poller::shared_ptr poller) {aio->start(poller);}
 
-    void setClient() { isClient = true; }
-
     // Output side
     void close();
     void activateOutput();
+    void initProtocolOut();
 
     // Input side
     void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff);
+    void initProtocolIn(Rdma::Buffer* buff);
     
     // Notifications
     void full(Rdma::AsynchIO& aio);
@@ -77,8 +76,7 @@
     identifier(c->getPeerName()),
     factory(f),
     codec(0),
-    readError(false),
-    isClient(false)
+    readError(false)
 {
 }
 
@@ -118,11 +116,6 @@
     if ( !(aio->writable() && aio->bufferAvailable()) ) {
         return;
     }
-    if (isClient && codec == 0) {
-        codec = factory->create(*this, identifier);
-        write(framing::ProtocolInitiation(codec->getVersion()));
-        return;
-    }
     if (codec == 0) return;
     if (codec->canEncode()) {
         Rdma::Buffer* buff = aio->getBuffer();
@@ -134,6 +127,15 @@
         aio->queueWriteClose();
 }
 
+void RdmaIOHandler::initProtocolOut() {
+    // We mustn't have already started the conversation
+    // but we must be able to send
+    assert( codec == 0 );
+    assert( aio->writable() && aio->bufferAvailable() );
+    codec = factory->create(*this, identifier);
+    write(framing::ProtocolInitiation(codec->getVersion()));
+}
+
 void RdmaIOHandler::error(Rdma::AsynchIO&) {
     close();
 }
@@ -151,34 +153,36 @@
         return;
     }
     size_t decoded = 0;
-    if (codec) {                // Already initiated
-        try {
+    try {
+        if (codec) {
             decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
-        }catch(const std::exception& e){
-            QPID_LOG(error, e.what());
-            readError = true;
-            aio->queueWriteClose();
+        }else{
+            // Need to start protocol processing 
+            initProtocolIn(buff);
         }
-    }else{
-        framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
-        framing::ProtocolInitiation protocolInit;
-        if (protocolInit.decode(in)) {
-            decoded = in.getPosition();
-            QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
-            try {
-                codec = factory->create(protocolInit.getVersion(), *this, identifier);
-                if (!codec) {
-                    //TODO: may still want to revise this...
-                    //send valid version header & close connection.
-                    write(framing::ProtocolInitiation(framing::highestProtocolVersion));
-                    readError = true;
-                    aio->queueWriteClose();                
-                }
-            } catch (const std::exception& e) {
-                QPID_LOG(error, e.what());
-                readError = true;
-                aio->queueWriteClose();
-            }
+    }catch(const std::exception& e){
+        QPID_LOG(error, e.what());
+        readError = true;
+        aio->queueWriteClose();
+    }
+}
+
+void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
+    framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+    framing::ProtocolInitiation protocolInit;
+    size_t decoded = 0;
+    if (protocolInit.decode(in)) {
+        decoded = in.getPosition();
+        QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
+
+        codec = factory->create(protocolInit.getVersion(), *this, identifier);
+        
+        // If we failed to create the codec then we don't understand the offered protocol version
+        if (!codec) {
+            // send valid version header & close connection.
+            write(framing::ProtocolInitiation(framing::highestProtocolVersion));
+            readError = true;
+            aio->queueWriteClose();                
         }
     }
 }
@@ -201,7 +205,7 @@
     void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
     void connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
     void disconnected(Rdma::Connection::intrusive_ptr&);
-    void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+    void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectFailedCallback);
 };
 
 // Static instance to initialise plugin
@@ -247,7 +251,7 @@
         ci->addContext(async);
         return true;
     } catch (const Rdma::Exception& e) {
-        QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma excepion): " << e.what());
+        QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma exception): " << e.what());
     } catch (const std::exception& e) {
         QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what());
     }
@@ -291,28 +295,29 @@
             boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
             boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
             boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact)));
-                           
+
     listener->start(poller);
 }
 
 // Only used for outgoing connections (in federation)
-void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
+void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectFailedCallback failed) {
+    failed(-1, "Connection rejected");
 }
 
 // Do the same as connection request and established but mark a client too
 void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
         ConnectionCodec::Factory* f) {
     (void) request(ci, cp, f);
-    RdmaIOHandler* async =  ci->getContext<RdmaIOHandler>();
-    async->setClient();
     established(poller, ci);
+    RdmaIOHandler* async =  ci->getContext<RdmaIOHandler>();
+    async->initProtocolOut();
 }
  
 void RdmaIOProtocolFactory::connect(
     Poller::shared_ptr poller,
     const std::string& host, int16_t p,
     ConnectionCodec::Factory* f,
-    ConnectFailedCallback)
+    ConnectFailedCallback failed)
 {
     ::addrinfo *res;
     ::addrinfo hints = {};
@@ -325,13 +330,16 @@
         throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n)));
     }
 
-    Rdma::Connector c(
+    Rdma::Connector* c =
+        new Rdma::Connector(
             *res->ai_addr,
             Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES),
             boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f),
             boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
             boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
-            boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2));
+            boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed));
+
+    c->start(poller);
 }
 
 }} // namespace qpid::sys