You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/10/28 23:02:53 UTC
svn commit: r708696 -
/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
Author: astitcher
Date: Tue Oct 28 15:02:53 2008
New Revision: 708696
URL: http://svn.apache.org/viewvc?rev=708696&view=rev
Log:
Make federation work over Rdma links
- Some refactoring of Rdma code for simplicity
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=708696&r1=708695&r2=708696&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Oct 28 15:02:53 2008
@@ -47,7 +47,6 @@
ConnectionCodec::Factory* factory;
ConnectionCodec* codec;
bool readError;
- bool isClient;
void write(const framing::ProtocolInitiation&);
@@ -57,14 +56,14 @@
void init(Rdma::AsynchIO* a);
void start(Poller::shared_ptr poller) {aio->start(poller);}
- void setClient() { isClient = true; }
-
// Output side
void close();
void activateOutput();
+ void initProtocolOut();
// Input side
void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff);
+ void initProtocolIn(Rdma::Buffer* buff);
// Notifications
void full(Rdma::AsynchIO& aio);
@@ -77,8 +76,7 @@
identifier(c->getPeerName()),
factory(f),
codec(0),
- readError(false),
- isClient(false)
+ readError(false)
{
}
@@ -118,11 +116,6 @@
if ( !(aio->writable() && aio->bufferAvailable()) ) {
return;
}
- if (isClient && codec == 0) {
- codec = factory->create(*this, identifier);
- write(framing::ProtocolInitiation(codec->getVersion()));
- return;
- }
if (codec == 0) return;
if (codec->canEncode()) {
Rdma::Buffer* buff = aio->getBuffer();
@@ -134,6 +127,15 @@
aio->queueWriteClose();
}
+void RdmaIOHandler::initProtocolOut() {
+ // We mustn't have already started the conversation
+ // but we must be able to send
+ assert( codec == 0 );
+ assert( aio->writable() && aio->bufferAvailable() );
+ codec = factory->create(*this, identifier);
+ write(framing::ProtocolInitiation(codec->getVersion()));
+}
+
void RdmaIOHandler::error(Rdma::AsynchIO&) {
close();
}
@@ -151,34 +153,36 @@
return;
}
size_t decoded = 0;
- if (codec) { // Already initiated
- try {
+ try {
+ if (codec) {
decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
- }catch(const std::exception& e){
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
+ }else{
+ // Need to start protocol processing
+ initProtocolIn(buff);
}
- }else{
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- decoded = in.getPosition();
- QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
- try {
- codec = factory->create(protocolInit.getVersion(), *this, identifier);
- if (!codec) {
- //TODO: may still want to revise this...
- //send valid version header & close connection.
- write(framing::ProtocolInitiation(framing::highestProtocolVersion));
- readError = true;
- aio->queueWriteClose();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
- }
+ }catch(const std::exception& e){
+ QPID_LOG(error, e.what());
+ readError = true;
+ aio->queueWriteClose();
+ }
+}
+
+void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ framing::ProtocolInitiation protocolInit;
+ size_t decoded = 0;
+ if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
+ QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
+
+ codec = factory->create(protocolInit.getVersion(), *this, identifier);
+
+ // If we failed to create the codec then we don't understand the offered protocol version
+ if (!codec) {
+ // send valid version header & close connection.
+ write(framing::ProtocolInitiation(framing::highestProtocolVersion));
+ readError = true;
+ aio->queueWriteClose();
}
}
}
@@ -201,7 +205,7 @@
void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
void connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
void disconnected(Rdma::Connection::intrusive_ptr&);
- void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+ void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectFailedCallback);
};
// Static instance to initialise plugin
@@ -247,7 +251,7 @@
ci->addContext(async);
return true;
} catch (const Rdma::Exception& e) {
- QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma excepion): " << e.what());
+ QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma exception): " << e.what());
} catch (const std::exception& e) {
QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what());
}
@@ -291,28 +295,29 @@
boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact)));
-
+
listener->start(poller);
}
// Only used for outgoing connections (in federation)
-void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
+void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectFailedCallback failed) {
+ failed(-1, "Connection rejected");
}
// Do the same as connection request and established but mark a client too
void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
ConnectionCodec::Factory* f) {
(void) request(ci, cp, f);
- RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
- async->setClient();
established(poller, ci);
+ RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
+ async->initProtocolOut();
}
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t p,
ConnectionCodec::Factory* f,
- ConnectFailedCallback)
+ ConnectFailedCallback failed)
{
::addrinfo *res;
::addrinfo hints = {};
@@ -325,13 +330,16 @@
throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n)));
}
- Rdma::Connector c(
+ Rdma::Connector* c =
+ new Rdma::Connector(
*res->ai_addr,
Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f),
boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
- boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2));
+ boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed));
+
+ c->start(poller);
}
}} // namespace qpid::sys