You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/11/15 15:42:14 UTC

svn commit: r1409812 - in /qpid/trunk/qpid/cpp/src: qpid/messaging/amqp/DriverImpl.cpp qpid/messaging/amqp/SslTransport.cpp qpid/messaging/amqp/SslTransport.h qpid/messaging/amqp/TcpTransport.cpp qpid/messaging/amqp/TcpTransport.h ssl.cmake ssl.mk

Author: gsim
Date: Thu Nov 15 14:42:13 2012
New Revision: 1409812

URL: http://svn.apache.org/viewvc?rev=1409812&view=rev
Log:
QPID-4368: Fix 1.0 related client IO inline with changes for QPID-4272

Modified:
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
    qpid/trunk/qpid/cpp/src/ssl.cmake
    qpid/trunk/qpid/cpp/src/ssl.mk

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp Thu Nov 15 14:42:13 2012
@@ -53,7 +53,7 @@ void DriverImpl::stop()
 boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection)
 {
     boost::shared_ptr<Transport> t(Transport::create(protocol, connection, poller));
-    if (!t) throw new qpid::messaging::ConnectionError("No such transport: " + protocol);
+    if (!t) throw qpid::messaging::ConnectionError("No such transport: " + protocol);
     return t;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp Thu Nov 15 14:42:13 2012
@@ -20,7 +20,8 @@
  */
 #include "SslTransport.h"
 #include "TransportContext.h"
-#include "qpid/sys/ssl/SslIo.h"
+#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/ConnectionCodec.h"
 #include "qpid/sys/Poller.h"
 #include "qpid/log/Statement.h"
@@ -51,18 +52,19 @@ struct StaticInit
 }
 
 
-SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), aio(0), poller(p) {}
+SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), connector(0), aio(0), poller(p) {}
 
 void SslTransport::connect(const std::string& host, const std::string& port)
 {
+    assert(!connector);
     assert(!aio);
-    try {
-        socket.connect(host, port);
-        connected(socket);
-    } catch (const std::exception& e) {
-        failed(e.what());
-    }
+    connector = AsynchConnector::create(
+        socket,
+        host, port,
+        boost::bind(&SslTransport::connected, this, _1),
+        boost::bind(&SslTransport::failed, this, _3));
 
+    connector->start(poller);
 }
 
 void SslTransport::failed(const std::string& msg)
@@ -72,22 +74,22 @@ void SslTransport::failed(const std::str
     context.closed();
 }
 
-void SslTransport::connected(const SslSocket&)
+void SslTransport::connected(const Socket&)
 {
     context.opened();
-    aio = new SslIO(socket,
-                        boost::bind(&SslTransport::read, this, _1, _2),
-                        boost::bind(&SslTransport::eof, this, _1),
-                        boost::bind(&SslTransport::disconnected, this, _1),
-                        boost::bind(&SslTransport::socketClosed, this, _1, _2),
-                        0, // nobuffs
-                        boost::bind(&SslTransport::write, this, _1));
+    aio = AsynchIO::create(socket,
+                           boost::bind(&SslTransport::read, this, _1, _2),
+                           boost::bind(&SslTransport::eof, this, _1),
+                           boost::bind(&SslTransport::disconnected, this, _1),
+                           boost::bind(&SslTransport::socketClosed, this, _1, _2),
+                           0, // nobuffs
+                           boost::bind(&SslTransport::write, this, _1));
     aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes
     id = boost::str(boost::format("[%1%]") % socket.getFullAddress());
     aio->start(poller);
 }
 
-void SslTransport::read(SslIO&, SslIO::BufferBase* buffer)
+void SslTransport::read(AsynchIO&, AsynchIO::BufferBase* buffer)
 {
     int32_t decoded = context.getCodec().decode(buffer->bytes+buffer->dataStart, buffer->dataCount);
     if (decoded < buffer->dataCount) {
@@ -101,10 +103,10 @@ void SslTransport::read(SslIO&, SslIO::B
     }
 }
 
-void SslTransport::write(SslIO&)
+void SslTransport::write(AsynchIO&)
 {
     if (context.getCodec().canEncode()) {
-        SslIO::BufferBase* buffer = aio->getQueuedBuffer();
+        AsynchIO::BufferBase* buffer = aio->getQueuedBuffer();
         if (buffer) {
             size_t encoded = context.getCodec().encode(buffer->bytes, buffer->byteCount);
 
@@ -123,18 +125,18 @@ void SslTransport::close()
         aio->queueWriteClose();
 }
 
-void SslTransport::eof(SslIO&)
+void SslTransport::eof(AsynchIO&)
 {
     close();
 }
 
-void SslTransport::disconnected(SslIO&)
+void SslTransport::disconnected(AsynchIO&)
 {
     close();
     socketClosed(*aio, socket);
 }
 
-void SslTransport::socketClosed(SslIO&, const SslSocket&)
+void SslTransport::socketClosed(AsynchIO&, const Socket&)
 {
     if (aio)
         aio->queueForDeletion();

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h Thu Nov 15 14:42:13 2012
@@ -30,10 +30,9 @@ namespace qpid {
 namespace sys {
 class ConnectionCodec;
 class Poller;
-namespace ssl {
-class SslIO;
-class SslIOBufferBase;
-}
+class AsynchConnector;
+class AsynchIO;
+class AsynchIOBufferBase;
 }
 
 namespace messaging {
@@ -54,18 +53,19 @@ class SslTransport : public Transport
   private:
     qpid::sys::ssl::SslSocket socket;
     TransportContext& context;
-    qpid::sys::ssl::SslIO* aio;
+    qpid::sys::AsynchConnector* connector;
+    qpid::sys::AsynchIO* aio;
     boost::shared_ptr<qpid::sys::Poller> poller;
     bool closed;
     std::string id;
 
-    void connected(const qpid::sys::ssl::SslSocket&);
+    void connected(const qpid::sys::Socket&);
     void failed(const std::string& msg);
-    void read(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*);
-    void write(qpid::sys::ssl::SslIO&);
-    void eof(qpid::sys::ssl::SslIO&);
-    void disconnected(qpid::sys::ssl::SslIO&);
-    void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&);
+    void read(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+    void write(qpid::sys::AsynchIO&);
+    void eof(qpid::sys::AsynchIO&);
+    void disconnected(qpid::sys::AsynchIO&);
+    void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&);
 
   friend class DriverImpl;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp Thu Nov 15 14:42:13 2012
@@ -48,14 +48,14 @@ struct StaticInit
 } init;
 }
 
-TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), connector(0), aio(0), poller(p) {}
+TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p) {}
 
 void TcpTransport::connect(const std::string& host, const std::string& port)
 {
     assert(!connector);
     assert(!aio);
     connector = AsynchConnector::create(
-        socket,
+        *socket,
         host, port,
         boost::bind(&TcpTransport::connected, this, _1),
         boost::bind(&TcpTransport::failed, this, _3));
@@ -67,7 +67,7 @@ void TcpTransport::failed(const std::str
 {
     QPID_LOG(debug, "Failed to connect: " << msg);
     connector = 0;
-    socket.close();
+    socket->close();
     context.closed();
 }
 
@@ -75,7 +75,7 @@ void TcpTransport::connected(const Socke
 {
     context.opened();
     connector = 0;
-    aio = AsynchIO::create(socket,
+    aio = AsynchIO::create(*socket,
                            boost::bind(&TcpTransport::read, this, _1, _2),
                            boost::bind(&TcpTransport::eof, this, _1),
                            boost::bind(&TcpTransport::disconnected, this, _1),
@@ -83,7 +83,7 @@ void TcpTransport::connected(const Socke
                            0, // nobuffs
                            boost::bind(&TcpTransport::write, this, _1));
     aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes
-    id = boost::str(boost::format("[%1%]") % socket.getFullAddress());
+    id = boost::str(boost::format("[%1%]") % socket->getFullAddress());
     aio->start(poller);
 }
 
@@ -131,7 +131,7 @@ void TcpTransport::eof(AsynchIO&)
 void TcpTransport::disconnected(AsynchIO&)
 {
     close();
-    socketClosed(*aio, socket);
+    socketClosed(*aio, *socket);
 }
 
 void TcpTransport::socketClosed(AsynchIO&, const Socket&)

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h Thu Nov 15 14:42:13 2012
@@ -24,6 +24,7 @@
 #include "qpid/messaging/amqp/Transport.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Socket.h"
+#include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
@@ -51,7 +52,7 @@ class TcpTransport : public Transport
     void giveReadCredit(int32_t) {}
 
   private:
-    qpid::sys::Socket socket;
+    boost::scoped_ptr<qpid::sys::Socket> socket;
     TransportContext& context;
     qpid::sys::AsynchConnector* connector;
     qpid::sys::AsynchIO* aio;

Modified: qpid/trunk/qpid/cpp/src/ssl.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/ssl.cmake?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/ssl.cmake (original)
+++ qpid/trunk/qpid/cpp/src/ssl.cmake Thu Nov 15 14:42:13 2012
@@ -100,7 +100,7 @@ if (BUILD_SSL)
              DESTINATION ${QPIDD_MODULE_DIR}
              COMPONENT ${QPID_COMPONENT_BROKER})
 
-    add_library (sslconnector MODULE qpid/client/SslConnector.cpp)
+    add_library (sslconnector MODULE qpid/client/SslConnector.cpp qpid/messaging/amqp/SslTransport.cpp)
     target_link_libraries (sslconnector qpidclient sslcommon)
     set_target_properties (sslconnector PROPERTIES
                            PREFIX ""

Modified: qpid/trunk/qpid/cpp/src/ssl.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/ssl.mk?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/ssl.mk (original)
+++ qpid/trunk/qpid/cpp/src/ssl.mk Thu Nov 15 14:42:13 2012
@@ -48,6 +48,12 @@ dmoduleexec_LTLIBRARIES += ssl.la
 sslconnector_la_SOURCES = \
   qpid/client/SslConnector.cpp
 
+if HAVE_PROTON
+sslconnector_la_SOURCES += \
+  qpid/messaging/amqp/SslTransport.cpp
+endif #HAVE_PROTON
+
+
 sslconnector_la_LIBADD = \
   libqpidclient.la \
   libsslcommon.la



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org