You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/11/15 15:42:14 UTC
svn commit: r1409812 - in /qpid/trunk/qpid/cpp/src:
qpid/messaging/amqp/DriverImpl.cpp qpid/messaging/amqp/SslTransport.cpp
qpid/messaging/amqp/SslTransport.h qpid/messaging/amqp/TcpTransport.cpp
qpid/messaging/amqp/TcpTransport.h ssl.cmake ssl.mk
Author: gsim
Date: Thu Nov 15 14:42:13 2012
New Revision: 1409812
URL: http://svn.apache.org/viewvc?rev=1409812&view=rev
Log:
QPID-4368: Fix 1.0 related client IO inline with changes for QPID-4272
Modified:
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
qpid/trunk/qpid/cpp/src/ssl.cmake
qpid/trunk/qpid/cpp/src/ssl.mk
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp Thu Nov 15 14:42:13 2012
@@ -53,7 +53,7 @@ void DriverImpl::stop()
boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection)
{
boost::shared_ptr<Transport> t(Transport::create(protocol, connection, poller));
- if (!t) throw new qpid::messaging::ConnectionError("No such transport: " + protocol);
+ if (!t) throw qpid::messaging::ConnectionError("No such transport: " + protocol);
return t;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.cpp Thu Nov 15 14:42:13 2012
@@ -20,7 +20,8 @@
*/
#include "SslTransport.h"
#include "TransportContext.h"
-#include "qpid/sys/ssl/SslIo.h"
+#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/Poller.h"
#include "qpid/log/Statement.h"
@@ -51,18 +52,19 @@ struct StaticInit
}
-SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), aio(0), poller(p) {}
+SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), connector(0), aio(0), poller(p) {}
void SslTransport::connect(const std::string& host, const std::string& port)
{
+ assert(!connector);
assert(!aio);
- try {
- socket.connect(host, port);
- connected(socket);
- } catch (const std::exception& e) {
- failed(e.what());
- }
+ connector = AsynchConnector::create(
+ socket,
+ host, port,
+ boost::bind(&SslTransport::connected, this, _1),
+ boost::bind(&SslTransport::failed, this, _3));
+ connector->start(poller);
}
void SslTransport::failed(const std::string& msg)
@@ -72,22 +74,22 @@ void SslTransport::failed(const std::str
context.closed();
}
-void SslTransport::connected(const SslSocket&)
+void SslTransport::connected(const Socket&)
{
context.opened();
- aio = new SslIO(socket,
- boost::bind(&SslTransport::read, this, _1, _2),
- boost::bind(&SslTransport::eof, this, _1),
- boost::bind(&SslTransport::disconnected, this, _1),
- boost::bind(&SslTransport::socketClosed, this, _1, _2),
- 0, // nobuffs
- boost::bind(&SslTransport::write, this, _1));
+ aio = AsynchIO::create(socket,
+ boost::bind(&SslTransport::read, this, _1, _2),
+ boost::bind(&SslTransport::eof, this, _1),
+ boost::bind(&SslTransport::disconnected, this, _1),
+ boost::bind(&SslTransport::socketClosed, this, _1, _2),
+ 0, // nobuffs
+ boost::bind(&SslTransport::write, this, _1));
aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes
id = boost::str(boost::format("[%1%]") % socket.getFullAddress());
aio->start(poller);
}
-void SslTransport::read(SslIO&, SslIO::BufferBase* buffer)
+void SslTransport::read(AsynchIO&, AsynchIO::BufferBase* buffer)
{
int32_t decoded = context.getCodec().decode(buffer->bytes+buffer->dataStart, buffer->dataCount);
if (decoded < buffer->dataCount) {
@@ -101,10 +103,10 @@ void SslTransport::read(SslIO&, SslIO::B
}
}
-void SslTransport::write(SslIO&)
+void SslTransport::write(AsynchIO&)
{
if (context.getCodec().canEncode()) {
- SslIO::BufferBase* buffer = aio->getQueuedBuffer();
+ AsynchIO::BufferBase* buffer = aio->getQueuedBuffer();
if (buffer) {
size_t encoded = context.getCodec().encode(buffer->bytes, buffer->byteCount);
@@ -123,18 +125,18 @@ void SslTransport::close()
aio->queueWriteClose();
}
-void SslTransport::eof(SslIO&)
+void SslTransport::eof(AsynchIO&)
{
close();
}
-void SslTransport::disconnected(SslIO&)
+void SslTransport::disconnected(AsynchIO&)
{
close();
socketClosed(*aio, socket);
}
-void SslTransport::socketClosed(SslIO&, const SslSocket&)
+void SslTransport::socketClosed(AsynchIO&, const Socket&)
{
if (aio)
aio->queueForDeletion();
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h Thu Nov 15 14:42:13 2012
@@ -30,10 +30,9 @@ namespace qpid {
namespace sys {
class ConnectionCodec;
class Poller;
-namespace ssl {
-class SslIO;
-class SslIOBufferBase;
-}
+class AsynchConnector;
+class AsynchIO;
+class AsynchIOBufferBase;
}
namespace messaging {
@@ -54,18 +53,19 @@ class SslTransport : public Transport
private:
qpid::sys::ssl::SslSocket socket;
TransportContext& context;
- qpid::sys::ssl::SslIO* aio;
+ qpid::sys::AsynchConnector* connector;
+ qpid::sys::AsynchIO* aio;
boost::shared_ptr<qpid::sys::Poller> poller;
bool closed;
std::string id;
- void connected(const qpid::sys::ssl::SslSocket&);
+ void connected(const qpid::sys::Socket&);
void failed(const std::string& msg);
- void read(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*);
- void write(qpid::sys::ssl::SslIO&);
- void eof(qpid::sys::ssl::SslIO&);
- void disconnected(qpid::sys::ssl::SslIO&);
- void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&);
+ void read(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+ void write(qpid::sys::AsynchIO&);
+ void eof(qpid::sys::AsynchIO&);
+ void disconnected(qpid::sys::AsynchIO&);
+ void socketClosed(qpid::sys::AsynchIO&, const qpid::sys::Socket&);
friend class DriverImpl;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp Thu Nov 15 14:42:13 2012
@@ -48,14 +48,14 @@ struct StaticInit
} init;
}
-TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : context(c), connector(0), aio(0), poller(p) {}
+TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) : socket(createSocket()), context(c), connector(0), aio(0), poller(p) {}
void TcpTransport::connect(const std::string& host, const std::string& port)
{
assert(!connector);
assert(!aio);
connector = AsynchConnector::create(
- socket,
+ *socket,
host, port,
boost::bind(&TcpTransport::connected, this, _1),
boost::bind(&TcpTransport::failed, this, _3));
@@ -67,7 +67,7 @@ void TcpTransport::failed(const std::str
{
QPID_LOG(debug, "Failed to connect: " << msg);
connector = 0;
- socket.close();
+ socket->close();
context.closed();
}
@@ -75,7 +75,7 @@ void TcpTransport::connected(const Socke
{
context.opened();
connector = 0;
- aio = AsynchIO::create(socket,
+ aio = AsynchIO::create(*socket,
boost::bind(&TcpTransport::read, this, _1, _2),
boost::bind(&TcpTransport::eof, this, _1),
boost::bind(&TcpTransport::disconnected, this, _1),
@@ -83,7 +83,7 @@ void TcpTransport::connected(const Socke
0, // nobuffs
boost::bind(&TcpTransport::write, this, _1));
aio->createBuffers(std::numeric_limits<uint16_t>::max());//note: AMQP 1.0 _can_ handle large frame sizes
- id = boost::str(boost::format("[%1%]") % socket.getFullAddress());
+ id = boost::str(boost::format("[%1%]") % socket->getFullAddress());
aio->start(poller);
}
@@ -131,7 +131,7 @@ void TcpTransport::eof(AsynchIO&)
void TcpTransport::disconnected(AsynchIO&)
{
close();
- socketClosed(*aio, socket);
+ socketClosed(*aio, *socket);
}
void TcpTransport::socketClosed(AsynchIO&, const Socket&)
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h Thu Nov 15 14:42:13 2012
@@ -24,6 +24,7 @@
#include "qpid/messaging/amqp/Transport.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Socket.h"
+#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
namespace qpid {
@@ -51,7 +52,7 @@ class TcpTransport : public Transport
void giveReadCredit(int32_t) {}
private:
- qpid::sys::Socket socket;
+ boost::scoped_ptr<qpid::sys::Socket> socket;
TransportContext& context;
qpid::sys::AsynchConnector* connector;
qpid::sys::AsynchIO* aio;
Modified: qpid/trunk/qpid/cpp/src/ssl.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/ssl.cmake?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/ssl.cmake (original)
+++ qpid/trunk/qpid/cpp/src/ssl.cmake Thu Nov 15 14:42:13 2012
@@ -100,7 +100,7 @@ if (BUILD_SSL)
DESTINATION ${QPIDD_MODULE_DIR}
COMPONENT ${QPID_COMPONENT_BROKER})
- add_library (sslconnector MODULE qpid/client/SslConnector.cpp)
+ add_library (sslconnector MODULE qpid/client/SslConnector.cpp qpid/messaging/amqp/SslTransport.cpp)
target_link_libraries (sslconnector qpidclient sslcommon)
set_target_properties (sslconnector PROPERTIES
PREFIX ""
Modified: qpid/trunk/qpid/cpp/src/ssl.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/ssl.mk?rev=1409812&r1=1409811&r2=1409812&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/ssl.mk (original)
+++ qpid/trunk/qpid/cpp/src/ssl.mk Thu Nov 15 14:42:13 2012
@@ -48,6 +48,12 @@ dmoduleexec_LTLIBRARIES += ssl.la
sslconnector_la_SOURCES = \
qpid/client/SslConnector.cpp
+if HAVE_PROTON
+sslconnector_la_SOURCES += \
+ qpid/messaging/amqp/SslTransport.cpp
+endif #HAVE_PROTON
+
+
sslconnector_la_LIBADD = \
libqpidclient.la \
libsslcommon.la
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org