You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2013/03/01 01:21:13 UTC
svn commit: r1451443 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/broker/windows/ qpid/sys/ qpid/sys/windows/
Author: astitcher
Date: Fri Mar 1 00:21:12 2013
New Revision: 1451443
URL: http://svn.apache.org/r1451443
Log:
QPID-4610: Remove duplicated transport code from C++ broker
Added:
qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h
qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h
- copied, changed from r1451398, qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
Removed:
qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
Modified:
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Fri Mar 1 00:21:12 2013
@@ -1224,6 +1224,7 @@ set (qpidbroker_SOURCES
qpid/management/ManagementAgent.cpp
qpid/management/ManagementDirectExchange.cpp
qpid/management/ManagementTopicExchange.cpp
+ qpid/sys/SocketTransport.cpp
qpid/sys/TCPIOPlugin.cpp
)
add_msvc_version (qpidbroker library dll)
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Fri Mar 1 00:21:12 2013
@@ -755,7 +755,10 @@ libqpidbroker_la_SOURCES = \
qpid/management/ManagementDirectExchange.h \
qpid/management/ManagementTopicExchange.cpp \
qpid/management/ManagementTopicExchange.h \
- qpid/sys/TCPIOPlugin.cpp
+ qpid/sys/TCPIOPlugin.cpp \
+ qpid/sys/SocketTransport.cpp \
+ qpid/sys/SocketTransport.h \
+ qpid/sys/TransportFactory.h
QPIDBROKER_VERSION_INFO = 2:0:0
libqpidbroker_la_LDFLAGS = -version-info $(QPIDBROKER_VERSION_INFO)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Mar 1 00:21:12 2013
@@ -68,7 +68,7 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/Uuid.h"
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Thread.h"
@@ -89,7 +89,8 @@
#include <iostream>
#include <memory>
-using qpid::sys::ProtocolFactory;
+using qpid::sys::TransportAcceptor;
+using qpid::sys::TransportConnector;
using qpid::sys::Poller;
using qpid::sys::Dispatcher;
using qpid::sys::Thread;
@@ -485,7 +486,7 @@ Manageable::status_t Broker::ManagementM
string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport;
QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport <<
"; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
- if (!getProtocolFactory(transport)) {
+ if (!getTransportInfo(transport).connectorFactory) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
text = "transport type not supported";
return Manageable::STATUS_NOT_IMPLEMENTED;
@@ -795,7 +796,7 @@ void Broker::createObject(const std::str
}
}
- if (!getProtocolFactory(transport)) {
+ if (!getTransportInfo(transport).connectorFactory) {
QPID_LOG(error, "Transport '" << transport << "' not supported.");
throw UnsupportedTransport(transport);
}
@@ -1011,30 +1012,29 @@ bool Broker::getLogHiresTimestamp()
}
-boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const {
- ProtocolFactoryMap::const_iterator i
- = name.empty() ? protocolFactories.begin() : protocolFactories.find(name);
- if (i == protocolFactories.end()) return boost::shared_ptr<ProtocolFactory>();
+const Broker::TransportInfo& Broker::getTransportInfo(const std::string& name) const {
+ static TransportInfo nullTransportInfo;
+ TransportMap::const_iterator i
+ = name.empty() ? transportMap.begin() : transportMap.find(name);
+ if (i == transportMap.end()) return nullTransportInfo;
else return i->second;
}
uint16_t Broker::getPort(const std::string& name) const {
- boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(name);
- if (factory) {
- return factory->getPort();
+ if (int p = getTransportInfo(name).port) {
+ return p;
} else {
throw NoSuchTransportException(QPID_MSG("No such transport: '" << name << "'"));
}
}
-void Broker::registerProtocolFactory(const std::string& name, ProtocolFactory::shared_ptr protocolFactory) {
- protocolFactories[name] = protocolFactory;
- Url::addProtocol(name);
+void Broker::registerTransport(const std::string& name, boost::shared_ptr<TransportAcceptor> a, boost::shared_ptr<TransportConnector> c, uint16_t p) {
+ transportMap[name] = TransportInfo(a, c, p);
}
void Broker::accept() {
- for (ProtocolFactoryMap::const_iterator i = protocolFactories.begin(); i != protocolFactories.end(); i++) {
- i->second->accept(poller, factory.get());
+ for (TransportMap::const_iterator i = transportMap.begin(); i != transportMap.end(); i++) {
+ if (i->second.acceptor) i->second.acceptor->accept(poller, factory.get());
}
}
@@ -1043,8 +1043,8 @@ void Broker::connect(
const std::string& host, const std::string& port, const std::string& transport,
boost::function2<void, int, std::string> failed)
{
- boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport);
- if (pf) pf->connect(poller, name, host, port, factory.get(), failed);
+ boost::shared_ptr<TransportConnector> tcf = getTransportInfo(transport).connectorFactory;
+ if (tcf) tcf->connect(poller, name, host, port, factory.get(), failed);
else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport));
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Mar 1 00:21:12 2013
@@ -51,7 +51,8 @@
namespace qpid {
namespace sys {
-class ProtocolFactory;
+class TransportAcceptor;
+class TransportConnector;
class Poller;
class Timer;
}
@@ -124,8 +125,23 @@ class Broker : public sys::Runnable, pub
};
private:
- typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
+ struct TransportInfo {
+ boost::shared_ptr<sys::TransportAcceptor> acceptor;
+ boost::shared_ptr<sys::TransportConnector> connectorFactory;
+ uint16_t port;
+ TransportInfo() :
+ port(0)
+ {}
+
+ TransportInfo(boost::shared_ptr<sys::TransportAcceptor> a, boost::shared_ptr<sys::TransportConnector> c, uint16_t p) :
+ acceptor(a),
+ connectorFactory(c),
+ port(p)
+ {}
+ };
+ typedef std::map<std::string, TransportInfo > TransportMap;
+
void declareStandardExchange(const std::string& name, const std::string& type);
void setStore ();
void setLogLevel(const std::string& level);
@@ -150,7 +166,7 @@ class Broker : public sys::Runnable, pub
std::auto_ptr<sys::Timer> timer;
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
- ProtocolFactoryMap protocolFactories;
+ TransportMap transportMap;
std::auto_ptr<MessageStore> store;
AclModule* acl;
DataDir dataDir;
@@ -228,9 +244,11 @@ class Broker : public sys::Runnable, pub
uint32_t methodId, management::Args& args, std::string& text);
/** Add to the broker's protocolFactorys */
- QPID_BROKER_EXTERN void registerProtocolFactory(
- const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
-
+ QPID_BROKER_EXTERN void registerTransport(
+ const std::string& name,
+ boost::shared_ptr<sys::TransportAcceptor>, boost::shared_ptr<sys::TransportConnector>,
+ uint16_t port);
+
/** Accept connections */
QPID_BROKER_EXTERN void accept();
@@ -251,7 +269,7 @@ class Broker : public sys::Runnable, pub
uint32_t qty,
const qpid::types::Variant::Map& filter);
- QPID_BROKER_EXTERN boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(
+ QPID_BROKER_EXTERN const TransportInfo& getTransportInfo(
const std::string& name = TCP_TRANSPORT) const;
/** Expose poller so plugins can register their descriptors. */
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Fri Mar 1 00:21:12 2013
@@ -1,379 +1,314 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/ProtocolFactory.h"
-
-#include "qpid/Plugin.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
-#include "qpid/sys/ConnectionCodec.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/windows/SslAsynchIO.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <memory>
-
-// security.h needs to see this to distinguish from kernel use.
-#define SECURITY_WIN32
-#include <security.h>
-#include <Schnlsp.h>
-#undef SECURITY_WIN32
-
-
-namespace qpid {
-namespace sys {
-
-class Timer;
-
-namespace windows {
-
-struct SslServerOptions : qpid::Options
-{
- std::string certStore;
- std::string certStoreLocation;
- std::string certName;
- uint16_t port;
- bool clientAuth;
-
- SslServerOptions() : qpid::Options("SSL Options"),
- certStore("My"),
- certStoreLocation("CurrentUser"),
- certName("localhost"),
- port(5671),
- clientAuth(false)
- {
- qpid::Address me;
- if (qpid::sys::SystemInfo::getLocalHostname(me))
- certName = me.host;
-
- addOptions()
- ("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate")
- ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"),
- "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )")
- ("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use")
- ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
- ("ssl-require-client-authentication", optValue(clientAuth),
- "Forces clients to authenticate in order to establish an SSL connection");
- }
-};
-
-class SslProtocolFactory : public qpid::sys::ProtocolFactory {
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- Timer& brokerTimer;
- uint32_t maxNegotiateTime;
- uint16_t listeningPort;
- const bool tcpNoDelay;
- std::string brokerHost;
- const bool clientAuthSelected;
- std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
- ConnectFailedCallback connectFailedCallback;
- CredHandle credHandle;
-
- public:
- SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer);
- ~SslProtocolFactory();
- void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
- void connect(sys::Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
- sys::ConnectionCodec::Factory*,
- ConnectFailedCallback failed);
-
- uint16_t getPort() const;
-
- private:
- void connectFailed(const qpid::sys::Socket&,
- int err,
- const std::string& msg);
- void establishedIncoming(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*);
- void establishedOutgoing(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*, std::string& );
- void establishedCommon(sys::Poller::shared_ptr, sys::AsynchIOHandler*, sys::AsynchIO*, const qpid::sys::Socket&);
-};
-
-// Static instance to initialise plugin
-static struct SslPlugin : public Plugin {
- SslServerOptions options;
-
- Options* getOptions() { return &options; }
-
- void earlyInitialize(Target&) {
- }
-
- void initialize(Target& target) {
- broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- // Only provide to a Broker
- if (broker) {
- try {
- const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(opts, options, broker->getTimer()));
- QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
- broker->registerProtocolFactory("ssl", protocol);
- } catch (const std::exception& e) {
- QPID_LOG(error, "Failed to initialise SSL listener: " << e.what());
- }
- }
- }
-} sslPlugin;
-
-namespace {
- // Expand list of Interfaces and addresses to a list of addresses
- std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
- std::vector<std::string> addresses;
- // If there are no specific interfaces listed use a single "" to listen on every interface
- if (interfaces.empty()) {
- addresses.push_back("");
- return addresses;
- }
- for (unsigned i = 0; i < interfaces.size(); ++i) {
- const std::string& interface = interfaces[i];
- if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
- // We don't have an interface of that name -
- // Check for IPv6 ('[' ']') brackets and remove them
- // then pass to be looked up directly
- if (interface[0]=='[' && interface[interface.size()-1]==']') {
- addresses.push_back(interface.substr(1, interface.size()-2));
- } else {
- addresses.push_back(interface);
- }
- }
- }
- return addresses;
- }
-}
-
-SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer)
- : brokerTimer(timer),
- maxNegotiateTime(opts.maxNegotiateTime),
- tcpNoDelay(opts.tcpNoDelay),
- clientAuthSelected(options.clientAuth) {
-
- // Make sure that certificate store is good before listening to sockets
- // to avoid having open and listening sockets when there is no cert store
- SecInvalidateHandle(&credHandle);
-
- // Get the certificate for this server.
- DWORD flags = 0;
- std::string certStoreLocation = options.certStoreLocation;
- std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
- if (certStoreLocation == "currentuser") {
- flags = CERT_SYSTEM_STORE_CURRENT_USER;
- } else if (certStoreLocation == "localmachine") {
- flags = CERT_SYSTEM_STORE_LOCAL_MACHINE;
- } else if (certStoreLocation == "currentservice") {
- flags = CERT_SYSTEM_STORE_CURRENT_SERVICE;
- } else {
- QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation
- << " - Using default location");
- }
- HCERTSTORE certStoreHandle;
- certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A,
- X509_ASN_ENCODING,
- 0,
- flags |
- CERT_STORE_READONLY_FLAG,
- options.certStore.c_str());
- if (!certStoreHandle)
- throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
-
- PCCERT_CONTEXT certContext;
- certContext = ::CertFindCertificateInStore(certStoreHandle,
- X509_ASN_ENCODING,
- 0,
- CERT_FIND_SUBJECT_STR_A,
- options.certName.c_str(),
- NULL);
- if (certContext == NULL) {
- int err = ::GetLastError();
- ::CertCloseStore(certStoreHandle, 0);
- throw qpid::Exception(QPID_MSG("Locating certificate " << options.certName << " in store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
- throw QPID_WINDOWS_ERROR(err);
- }
-
- SCHANNEL_CRED cred;
- memset(&cred, 0, sizeof(cred));
- cred.dwVersion = SCHANNEL_CRED_VERSION;
- cred.cCreds = 1;
- cred.paCred = &certContext;
- SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
- UNISP_NAME,
- SECPKG_CRED_INBOUND,
- NULL,
- &cred,
- NULL,
- NULL,
- &credHandle,
- NULL);
- if (status != SEC_E_OK)
- throw QPID_WINDOWS_ERROR(status);
- ::CertFreeCertificateContext(certContext);
- ::CertCloseStore(certStoreHandle, 0);
-
- std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
- if (addresses.empty()) {
- // We specified some interfaces, but couldn't find addresses for them
- QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
- listeningPort = 0;
- }
-
- for (unsigned i = 0; i<addresses.size(); ++i) {
- QPID_LOG(debug, "Using interface: " << addresses[i]);
- SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
-
-
- // We must have at least one resolved address
- QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = createSocket();
- listeningPort = s->listen(sa, opts.connectionBacklog);
- listeners.push_back(s);
-
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- QPID_LOG(info, "SSL Listening to: " << sa.asString())
- Socket* s = createSocket();
- s->listen(sa, opts.connectionBacklog);
- listeners.push_back(s);
- }
- }
-}
-
-SslProtocolFactory::~SslProtocolFactory() {
- ::FreeCredentialsHandle(&credHandle);
-}
-
-void SslProtocolFactory::connectFailed(const qpid::sys::Socket&,
- int err,
- const std::string& msg) {
- if (connectFailedCallback)
- connectFailedCallback(err, msg);
-}
-
-void SslProtocolFactory::establishedIncoming(sys::Poller::shared_ptr poller,
- const qpid::sys::Socket& s,
- sys::ConnectionCodec::Factory* f) {
- sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false, false);
-
- sys::AsynchIO *aio =
- new qpid::sys::windows::ServerSslAsynchIO(
- clientAuthSelected,
- s,
- credHandle,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- establishedCommon(poller, async, aio, s);
-}
-
-void SslProtocolFactory::establishedOutgoing(sys::Poller::shared_ptr poller,
- const qpid::sys::Socket& s,
- sys::ConnectionCodec::Factory* f,
- std::string& name) {
- sys::AsynchIOHandler* async = new sys::AsynchIOHandler(name, f, true, false);
-
- sys::AsynchIO *aio =
- new qpid::sys::windows::ClientSslAsynchIO(
- brokerHost,
- s,
- credHandle,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- establishedCommon(poller, async, aio, s);
-}
-
-void SslProtocolFactory::establishedCommon(sys::Poller::shared_ptr poller,
- sys::AsynchIOHandler* async,
- sys::AsynchIO* aio,
- const qpid::sys::Socket& s) {
- if (tcpNoDelay) {
- s.setTcpNoDelay();
- QPID_LOG(info,
- "Set TCP_NODELAY on connection to " << s.getPeerAddress());
- }
-
- async->init(aio, brokerTimer, maxNegotiateTime);
- aio->start(poller);
-}
-
-uint16_t SslProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
-}
-
-void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
- sys::ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
- acceptors[i].start(poller);
- }
-}
-
-void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
- const std::string& name,
- const std::string& host,
- const std::string& port,
- sys::ConnectionCodec::Factory* fact,
- ConnectFailedCallback failed)
-{
- SCHANNEL_CRED cred;
- memset(&cred, 0, sizeof(cred));
- cred.dwVersion = SCHANNEL_CRED_VERSION;
- SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
- UNISP_NAME,
- SECPKG_CRED_OUTBOUND,
- NULL,
- &cred,
- NULL,
- NULL,
- &credHandle,
- NULL);
- if (status != SEC_E_OK)
- throw QPID_WINDOWS_ERROR(status);
-
- brokerHost = host;
- // Note that the following logic does not cause a memory leak.
- // The allocated Socket is freed either by the AsynchConnector
- // upon connection failure or by the AsynchIO upon connection
- // shutdown. The allocated AsynchConnector frees itself when it
- // is no longer needed.
- qpid::sys::Socket* socket = createSocket();
- connectFailedCallback = failed;
- AsynchConnector::create(*socket,
- host,
- port,
- boost::bind(&SslProtocolFactory::establishedOutgoing,
- this, poller, _1, fact, name),
- boost::bind(&SslProtocolFactory::connectFailed,
- this, _1, _2, _3));
-}
-
-}}} // namespace qpid::sys::windows
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/TransportFactory.h"
+#include "qpid/sys/SocketTransport.h"
+
+#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/sys/windows/SslAsynchIO.h"
+
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <memory>
+
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+
+namespace qpid {
+namespace sys {
+
+class Timer;
+
+namespace windows {
+
+struct SslServerOptions : qpid::Options
+{
+ std::string certStore;
+ std::string certStoreLocation;
+ std::string certName;
+ uint16_t port;
+ bool clientAuth;
+
+ SslServerOptions() : qpid::Options("SSL Options"),
+ certStore("My"),
+ certStoreLocation("CurrentUser"),
+ certName("localhost"),
+ port(5671),
+ clientAuth(false)
+ {
+ qpid::Address me;
+ if (qpid::sys::SystemInfo::getLocalHostname(me))
+ certName = me.host;
+
+ addOptions()
+ ("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate")
+ ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"),
+ "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )")
+ ("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use")
+ ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
+ ("ssl-require-client-authentication", optValue(clientAuth),
+ "Forces clients to authenticate in order to establish an SSL connection");
+ }
+};
+
+class SslProtocolFactory : public qpid::sys::SocketAcceptor, public qpid::sys::TransportConnector {
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
+ const bool tcpNoDelay;
+ std::string brokerHost;
+ const bool clientAuthSelected;
+ std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
+ ConnectFailedCallback connectFailedCallback;
+ CredHandle credHandle;
+
+ public:
+ SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer);
+ ~SslProtocolFactory();
+
+ void connect(sys::Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
+ sys::ConnectionCodec::Factory*,
+ ConnectFailedCallback failed);
+
+ private:
+ void connectFailed(const qpid::sys::Socket&,
+ int err,
+ const std::string& msg);
+ void establishedIncoming(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*);
+ void establishedOutgoing(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*, std::string& );
+ void establishedCommon(sys::Poller::shared_ptr, sys::AsynchIOHandler*, sys::AsynchIO*, const qpid::sys::Socket&);
+};
+
+// Static instance to initialise plugin
+static struct SslPlugin : public Plugin {
+ SslServerOptions options;
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize(Target&) {
+ }
+
+ void initialize(Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ // Only provide to a Broker
+ if (broker) {
+ try {
+ const broker::Broker::Options& opts = broker->getOptions();
+ boost::shared_ptr<SslProtocolFactory> protocol(new SslProtocolFactory(opts, options, broker->getTimer()));
+ uint16_t port =
+ protocol->listen(opts.listenInterfaces,
+ boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog,
+ &createSocket);
+ QPID_LOG(notice, "Listening for SSL connections on TCP port " << port);
+ broker->registerTransport("ssl", protocol, protocol, port);
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Failed to initialise SSL listener: " << e.what());
+ }
+ }
+ }
+} sslPlugin;
+
+SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer)
+ : SocketAcceptor(opts.tcpNoDelay, false, opts.maxNegotiateTime, timer,
+ boost::bind(&SslProtocolFactory::establishedIncoming, this, _1, _2, _3)),
+ brokerTimer(timer),
+ maxNegotiateTime(opts.maxNegotiateTime),
+ tcpNoDelay(opts.tcpNoDelay),
+ clientAuthSelected(options.clientAuth) {
+
+ // Make sure that certificate store is good before listening to sockets
+ // to avoid having open and listening sockets when there is no cert store
+ SecInvalidateHandle(&credHandle);
+
+ // Get the certificate for this server.
+ DWORD flags = 0;
+ std::string certStoreLocation = options.certStoreLocation;
+ std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
+ if (certStoreLocation == "currentuser") {
+ flags = CERT_SYSTEM_STORE_CURRENT_USER;
+ } else if (certStoreLocation == "localmachine") {
+ flags = CERT_SYSTEM_STORE_LOCAL_MACHINE;
+ } else if (certStoreLocation == "currentservice") {
+ flags = CERT_SYSTEM_STORE_CURRENT_SERVICE;
+ } else {
+ QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation
+ << " - Using default location");
+ }
+ HCERTSTORE certStoreHandle;
+ certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A,
+ X509_ASN_ENCODING,
+ 0,
+ flags |
+ CERT_STORE_READONLY_FLAG,
+ options.certStore.c_str());
+ if (!certStoreHandle)
+ throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
+
+ PCCERT_CONTEXT certContext;
+ certContext = ::CertFindCertificateInStore(certStoreHandle,
+ X509_ASN_ENCODING,
+ 0,
+ CERT_FIND_SUBJECT_STR_A,
+ options.certName.c_str(),
+ NULL);
+ if (certContext == NULL) {
+ int err = ::GetLastError();
+ ::CertCloseStore(certStoreHandle, 0);
+ throw qpid::Exception(QPID_MSG("Locating certificate " << options.certName << " in store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
+ throw QPID_WINDOWS_ERROR(err);
+ }
+
+ SCHANNEL_CRED cred;
+ memset(&cred, 0, sizeof(cred));
+ cred.dwVersion = SCHANNEL_CRED_VERSION;
+ cred.cCreds = 1;
+ cred.paCred = &certContext;
+ SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
+ UNISP_NAME,
+ SECPKG_CRED_INBOUND,
+ NULL,
+ &cred,
+ NULL,
+ NULL,
+ &credHandle,
+ NULL);
+ if (status != SEC_E_OK)
+ throw QPID_WINDOWS_ERROR(status);
+ ::CertFreeCertificateContext(certContext);
+ ::CertCloseStore(certStoreHandle, 0);
+}
+
+SslProtocolFactory::~SslProtocolFactory() {
+ ::FreeCredentialsHandle(&credHandle);
+}
+
+void SslProtocolFactory::connectFailed(const qpid::sys::Socket&,
+ int err,
+ const std::string& msg) {
+ if (connectFailedCallback)
+ connectFailedCallback(err, msg);
+}
+
+void SslProtocolFactory::establishedIncoming(sys::Poller::shared_ptr poller,
+ const qpid::sys::Socket& s,
+ sys::ConnectionCodec::Factory* f) {
+ sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false, false);
+
+ sys::AsynchIO *aio =
+ new qpid::sys::windows::ServerSslAsynchIO(
+ clientAuthSelected,
+ s,
+ credHandle,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+
+ establishedCommon(poller, async, aio, s);
+}
+
+void SslProtocolFactory::establishedOutgoing(sys::Poller::shared_ptr poller,
+ const qpid::sys::Socket& s,
+ sys::ConnectionCodec::Factory* f,
+ std::string& name) {
+ sys::AsynchIOHandler* async = new sys::AsynchIOHandler(name, f, true, false);
+
+ sys::AsynchIO *aio =
+ new qpid::sys::windows::ClientSslAsynchIO(
+ brokerHost,
+ s,
+ credHandle,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+
+ establishedCommon(poller, async, aio, s);
+}
+
+void SslProtocolFactory::establishedCommon(sys::Poller::shared_ptr poller,
+ sys::AsynchIOHandler* async,
+ sys::AsynchIO* aio,
+ const qpid::sys::Socket& s) {
+ if (tcpNoDelay) {
+ s.setTcpNoDelay();
+ QPID_LOG(info,
+ "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ }
+
+ async->init(aio, brokerTimer, maxNegotiateTime);
+ aio->start(poller);
+}
+
+void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
+ const std::string& name,
+ const std::string& host,
+ const std::string& port,
+ sys::ConnectionCodec::Factory* fact,
+ ConnectFailedCallback failed)
+{
+ SCHANNEL_CRED cred;
+ memset(&cred, 0, sizeof(cred));
+ cred.dwVersion = SCHANNEL_CRED_VERSION;
+ SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
+ UNISP_NAME,
+ SECPKG_CRED_OUTBOUND,
+ NULL,
+ &cred,
+ NULL,
+ NULL,
+ &credHandle,
+ NULL);
+ if (status != SEC_E_OK)
+ throw QPID_WINDOWS_ERROR(status);
+
+ brokerHost = host;
+ // Note that the following logic does not cause a memory leak.
+ // The allocated Socket is freed either by the AsynchConnector
+ // upon connection failure or by the AsynchIO upon connection
+ // shutdown. The allocated AsynchConnector frees itself when it
+ // is no longer needed.
+ qpid::sys::Socket* socket = createSocket();
+ connectFailedCallback = failed;
+ AsynchConnector::create(*socket,
+ host,
+ port,
+ boost::bind(&SslProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
+ boost::bind(&SslProtocolFactory::connectFailed,
+ this, _1, _2, _3));
+}
+
+}}}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Fri Mar 1 00:21:12 2013
@@ -19,7 +19,7 @@
*
*/
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
@@ -239,7 +239,7 @@ void RdmaIOHandler::initProtocolIn(Rdma:
}
}
-class RdmaIOProtocolFactory : public ProtocolFactory {
+class RdmaIOProtocolFactory : public TransportAcceptor, public TransportConnector {
auto_ptr<Rdma::Listener> listener;
const uint16_t listeningPort;
@@ -275,9 +275,10 @@ static class RdmaIOPlugin : public Plugi
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog));
- QPID_LOG(notice, "Rdma: Listening on RDMA port " << protocol->getPort());
- broker->registerProtocolFactory("rdma", protocol);
+ boost::shared_ptr<RdmaIOProtocolFactory> protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog));
+ uint16_t port = protocol->getPort();
+ QPID_LOG(notice, "Rdma: Listening on RDMA port " << port);
+ broker->registerTransport("rdma", protocol, protocol, port);
}
}
} rdmaPlugin;
Added: qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp?rev=1451443&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp Fri Mar 1 00:21:12 2013
@@ -0,0 +1,209 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/SocketTransport.h"
+
+#include "qpid/broker/NameGenerator.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
+
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+ void establishedCommon(
+ AsynchIOHandler* async,
+ boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer,
+ const Socket& s)
+ {
+ if (opts.tcpNoDelay) {
+ s.setTcpNoDelay();
+ QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ }
+
+ AsynchIO* aio = AsynchIO::create
+ (s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+
+ async->init(aio, *timer, opts.maxNegotiateTime);
+ aio->start(poller);
+ }
+
+ void establishedIncoming(
+ boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer,
+ const Socket& s, ConnectionCodec::Factory* f)
+ {
+ AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, opts.nodict);
+ establishedCommon(async, poller, opts, timer, s);
+ }
+
+ void establishedOutgoing(
+ boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer,
+ const Socket& s, ConnectionCodec::Factory* f, const std::string& name)
+ {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, opts.nodict);
+ establishedCommon(async, poller, opts, timer, s);
+ }
+
+ void connectFailed(
+ const Socket& s, int ec, const std::string& emsg,
+ SocketConnector::ConnectFailedCallback failedCb)
+ {
+ failedCb(ec, emsg);
+ s.close();
+ delete &s;
+ }
+
+ // Expand list of Interfaces and addresses to a list of addresses
+ std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+ std::vector<std::string> addresses;
+ // If there are no specific interfaces listed use a single "" to listen on every interface
+ if (interfaces.empty()) {
+ addresses.push_back("");
+ return addresses;
+ }
+ for (unsigned i = 0; i < interfaces.size(); ++i) {
+ const std::string& interface = interfaces[i];
+ if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+ // We don't have an interface of that name -
+ // Check for IPv6 ('[' ']') brackets and remove them
+ // then pass to be looked up directly
+ if (interface[0]=='[' && interface[interface.size()-1]==']') {
+ addresses.push_back(interface.substr(1, interface.size()-2));
+ } else {
+ addresses.push_back(interface);
+ }
+ }
+ }
+ return addresses;
+ }
+}
+
+SocketAcceptor::SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0) :
+ timer(timer0),
+ options(tcpNoDelay, nodict, maxNegotiateTime),
+ established(boost::bind(&establishedIncoming, _1, options, &timer, _2, _3))
+{}
+
+SocketAcceptor::SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0, const EstablishedCallback& established0) :
+ timer(timer0),
+ options(tcpNoDelay, nodict, maxNegotiateTime),
+ established(established0)
+{}
+
+void SocketAcceptor::addListener(Socket* socket)
+{
+ listeners.push_back(socket);
+}
+
+uint16_t SocketAcceptor::listen(const std::vector<std::string>& interfaces, const std::string& port, int backlog, const SocketFactory& factory)
+{
+ std::vector<std::string> addresses = expandInterfaces(interfaces);
+ if (addresses.empty()) {
+ // We specified some interfaces, but couldn't find addresses for them
+ QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
+ return 0;
+ }
+
+ int listeningPort = 0;
+ for (unsigned i = 0; i<addresses.size(); ++i) {
+ QPID_LOG(debug, "Using interface: " << addresses[i]);
+ SocketAddress sa(addresses[i], port);
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = factory();
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ addListener(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = factory();
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ addListener(s);
+ }
+ }
+ return listeningPort;
+}
+
+void SocketAcceptor::accept(boost::shared_ptr<Poller> poller, ConnectionCodec::Factory* f)
+{
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i], boost::bind(established, poller, _1, f)));
+ acceptors[i].start(poller);
+ }
+}
+
+SocketConnector::SocketConnector(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0, const SocketFactory& factory0) :
+ timer(timer0),
+ factory(factory0),
+ options(tcpNoDelay, nodict, maxNegotiateTime)
+{}
+
+void SocketConnector::connect(
+ boost::shared_ptr<Poller> poller,
+ const std::string& name,
+ const std::string& host, const std::string& port,
+ ConnectionCodec::Factory* fact,
+ ConnectFailedCallback failed)
+{
+ // Note that the following logic does not cause a memory leak.
+ // The allocated Socket is freed either by the AsynchConnector
+ // upon connection failure or by the AsynchIO upon connection
+ // shutdown. The allocated AsynchConnector frees itself when it
+ // is no longer needed.
+ Socket* socket = factory();
+ try {
+ AsynchConnector* c = AsynchConnector::create(
+ *socket,
+ host,
+ port,
+ boost::bind(&establishedOutgoing, poller, options, &timer, _1, fact, name),
+ boost::bind(&connectFailed, _1, _2, _3, failed));
+ c->start(poller);
+ } catch (std::exception&) {
+ // TODO: Design question - should we do the error callback and also throw?
+ int errCode = socket->getError();
+ connectFailed(*socket, errCode, strError(errCode), failed);
+ throw;
+ }
+}
+
+}} // namespace qpid::sys
Added: qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h?rev=1451443&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h Fri Mar 1 00:21:12 2013
@@ -0,0 +1,91 @@
+#ifndef QPID_SYS_SOCKETTRANSPORT_H
+#define QPID_SYS_SOCKETTRANSPORT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/TransportFactory.h"
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace sys {
+
+class AsynchAcceptor;
+class Poller;
+class Timer;
+class Socket;
+typedef boost::function0<Socket*> SocketFactory;
+typedef boost::function3<void, boost::shared_ptr<Poller>, const Socket&, ConnectionCodec::Factory*> EstablishedCallback;
+
+struct SocketTransportOptions {
+ bool tcpNoDelay;
+ bool nodict;
+ uint32_t maxNegotiateTime;
+
+ SocketTransportOptions(bool t, bool d, uint32_t m) :
+ tcpNoDelay(t),
+ nodict(d),
+ maxNegotiateTime(m)
+ {}
+};
+
+class SocketAcceptor : public TransportAcceptor {
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
+ Timer& timer;
+ SocketTransportOptions options;
+ const EstablishedCallback established;
+
+public:
+ SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer);
+ SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer, const EstablishedCallback& established);
+
+ // Create sockets from list of interfaces and listen to them
+ uint16_t listen(const std::vector<std::string>& interfaces, const std::string& port, int backlog, const SocketFactory& factory);
+
+ // Import sockets that are already being listened to
+ void addListener(Socket* socket);
+
+ void accept(boost::shared_ptr<Poller> poller, ConnectionCodec::Factory* f);
+};
+
+class SocketConnector : public TransportConnector {
+ Timer& timer;
+ const SocketFactory factory;
+ SocketTransportOptions options;
+
+public:
+ SocketConnector(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer, const SocketFactory& factory);
+
+ void connect(boost::shared_ptr<Poller> poller,
+ const std::string& name,
+ const std::string& host, const std::string& port,
+ ConnectionCodec::Factory* f,
+ ConnectFailedCallback failed);
+};
+
+}}
+
+#endif
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp Fri Mar 1 00:21:12 2013
@@ -19,22 +19,17 @@
*
*/
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/SocketTransport.h"
#include "qpid/sys/ssl/util.h"
#include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/Poller.h"
#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
@@ -64,32 +59,20 @@ struct SslServerOptions : ssl::SslOption
}
};
-class SslProtocolFactory : public ProtocolFactory {
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- Timer& brokerTimer;
- uint32_t maxNegotiateTime;
- uint16_t listeningPort;
- const bool tcpNoDelay;
- bool nodict;
+namespace {
+ Socket* createServerSSLSocket(const SslServerOptions& options) {
+ return new SslSocket(options.certName, options.clientAuth);
+ }
- public:
- SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
- Timer& timer);
- void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
- ConnectionCodec::Factory*,
- ConnectFailedCallback);
-
- uint16_t getPort() const;
-
- private:
- void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
- void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
- void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
- void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
-};
+ Socket* createServerSSLMuxSocket(const SslServerOptions& options) {
+ return new SslMuxSocket(options.certName, options.clientAuth);
+ }
+
+ Socket* createClientSSLSocket() {
+ return new SslSocket();
+ }
+}
// Static instance to initialise plugin
static struct SslPlugin : public Plugin {
@@ -104,7 +87,7 @@ static struct SslPlugin : public Plugin
void earlyInitialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && !options.certDbPath.empty()) {
- const broker::Broker::Options& opts = broker->getOptions();
+ broker::Broker::Options& opts = broker->getOptions();
if (opts.port == options.port && // AMQP & AMQPS ports are the same
opts.port != 0) {
@@ -132,18 +115,25 @@ static struct SslPlugin : public Plugin
nssInitialized = true;
const broker::Broker::Options& opts = broker->getOptions();
-
- ProtocolFactory::shared_ptr protocol(
- static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer())));
-
- if (protocol->getPort()!=0 ) {
+ TransportAcceptor::shared_ptr ta;
+ SocketAcceptor* sa =
+ new SocketAcceptor(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer());
+ uint16_t port = sa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(options.port), opts.connectionBacklog,
+ options.multiplex ?
+ boost::bind(&createServerSSLMuxSocket, options) :
+ boost::bind(&createServerSSLSocket, options));
+ if ( port!=0 ) {
+ ta.reset(sa);
QPID_LOG(notice, "Listening for " <<
(options.multiplex ? "SSL or TCP" : "SSL") <<
" connections on TCP/TCP6 port " <<
- protocol->getPort());
+ port);
}
- broker->registerProtocolFactory("ssl", protocol);
+ TransportConnector::shared_ptr tc(
+ new SocketConnector(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer(),
+ &createClientSSLSocket));
+ broker->registerTransport("ssl", ta, tc, port);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
}
@@ -152,160 +142,4 @@ static struct SslPlugin : public Plugin
}
} sslPlugin;
-namespace {
- // Expand list of Interfaces and addresses to a list of addresses
- std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
- std::vector<std::string> addresses;
- // If there are no specific interfaces listed use a single "" to listen on every interface
- if (interfaces.empty()) {
- addresses.push_back("");
- return addresses;
- }
- for (unsigned i = 0; i < interfaces.size(); ++i) {
- const std::string& interface = interfaces[i];
- if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
- // We don't have an interface of that name -
- // Check for IPv6 ('[' ']') brackets and remove them
- // then pass to be looked up directly
- if (interface[0]=='[' && interface[interface.size()-1]==']') {
- addresses.push_back(interface.substr(1, interface.size()-2));
- } else {
- addresses.push_back(interface);
- }
- }
- }
- return addresses;
- }
-}
-
-SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
- Timer& timer) :
- brokerTimer(timer),
- maxNegotiateTime(opts.maxNegotiateTime),
- tcpNoDelay(opts.tcpNoDelay),
- nodict(options.nodict)
-{
- std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
- if (addresses.empty()) {
- // We specified some interfaces, but couldn't find addresses for them
- QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening");
- listeningPort = 0;
- }
-
- for (unsigned i = 0; i<addresses.size(); ++i) {
- QPID_LOG(debug, "Using interface: " << addresses[i]);
- SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
-
- // We must have at least one resolved address
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = options.multiplex ?
- new SslMuxSocket(options.certName, options.clientAuth) :
- new SslSocket(options.certName, options.clientAuth);
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
-
- listeningPort = lport;
-
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- // Hack to ensure that all listening connections are on the same port
- sa.setAddrInfoPort(listeningPort);
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = options.multiplex ?
- new SslMuxSocket(options.certName, options.clientAuth) :
- new SslSocket(options.certName, options.clientAuth);
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
- }
- }
-}
-
-void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f) {
- AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
- establishedCommon(async, poller, s);
-}
-
-void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, const std::string& name) {
- AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
- establishedCommon(async, poller, s);
-}
-
-void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
- if (tcpNoDelay) {
- s.setTcpNoDelay();
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
- }
-
- AsynchIO* aio = AsynchIO::create(
- s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- async->init(aio, brokerTimer, maxNegotiateTime);
- aio->start(poller);
-}
-
-uint16_t SslProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
-}
-
-void SslProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
- acceptors[i].start(poller);
- }
-}
-
-void SslProtocolFactory::connectFailed(
- const Socket& s, int ec, const std::string& emsg,
- ConnectFailedCallback failedCb)
-{
- failedCb(ec, emsg);
- s.close();
- delete &s;
-}
-
-void SslProtocolFactory::connect(
- Poller::shared_ptr poller,
- const std::string& name,
- const std::string& host, const std::string& port,
- ConnectionCodec::Factory* fact,
- ConnectFailedCallback failed)
-{
- // Note that the following logic does not cause a memory leak.
- // The allocated Socket is freed either by the SslConnector
- // upon connection failure or by the SslIoHandle upon connection
- // shutdown. The allocated SslConnector frees itself when it
- // is no longer needed.
-
- Socket* socket = new qpid::sys::ssl::SslSocket();
- try {
- AsynchConnector* c = AsynchConnector::create(
- *socket,
- host,
- port,
- boost::bind(&SslProtocolFactory::establishedOutgoing,
- this, poller, _1, fact, name),
- boost::bind(&SslProtocolFactory::connectFailed,
- this, _1, _2, _3, failed));
- c->start(poller);
- } catch (std::exception&) {
- // TODO: Design question - should we do the error callback and also throw?
- int errCode = socket->getError();
- connectFailed(*socket, errCode, strError(errCode), failed);
- throw;
- }
-}
-
}} // namespace qpid::sys
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Mar 1 00:21:12 2013
@@ -19,52 +19,18 @@
*
*/
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/Poller.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
+#include "qpid/sys/SocketTransport.h"
namespace qpid {
namespace sys {
-class Timer;
-
-class AsynchIOProtocolFactory : public ProtocolFactory {
- boost::ptr_vector<Socket> listeners;
- boost::ptr_vector<AsynchAcceptor> acceptors;
- Timer& brokerTimer;
- uint32_t maxNegotiateTime;
- uint16_t listeningPort;
- const bool tcpNoDelay;
-
- public:
- AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen);
- void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& name,
- const std::string& host, const std::string& port,
- ConnectionCodec::Factory*,
- ConnectFailedCallback);
-
- uint16_t getPort() const;
-
- private:
- void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
- void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
- void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
- void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
-};
-
static bool sslMultiplexEnabled(void)
{
Options o;
@@ -93,170 +59,22 @@ static class TCPIOPlugin : public Plugin
// Check for SSL on the same port
bool shouldListen = !sslMultiplexEnabled();
- ProtocolFactory::shared_ptr protocolt(
- new AsynchIOProtocolFactory(opts, broker->getTimer(),shouldListen));
-
- if (shouldListen && protocolt->getPort()!=0 ) {
- QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
- }
-
- broker->registerProtocolFactory("tcp", protocolt);
- }
- }
-} tcpPlugin;
-
-namespace {
- // Expand list of Interfaces and addresses to a list of addresses
- std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
- std::vector<std::string> addresses;
- // If there are no specific interfaces listed use a single "" to listen on every interface
- if (interfaces.empty()) {
- addresses.push_back("");
- return addresses;
- }
- for (unsigned i = 0; i < interfaces.size(); ++i) {
- const std::string& interface = interfaces[i];
- if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
- // We don't have an interface of that name -
- // Check for IPv6 ('[' ']') brackets and remove them
- // then pass to be looked up directly
- if (interface[0]=='[' && interface[interface.size()-1]==']') {
- addresses.push_back(interface.substr(1, interface.size()-2));
- } else {
- addresses.push_back(interface);
+ uint16_t port = opts.port;
+ TransportAcceptor::shared_ptr ta;
+ if (shouldListen) {
+ SocketAcceptor* aa = new SocketAcceptor(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer());
+ ta.reset(aa);
+ port = aa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, &createSocket);
+ if ( port!=0 ) {
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << port);
}
}
- }
- return addresses;
- }
-}
-
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen) :
- brokerTimer(timer),
- maxNegotiateTime(opts.maxNegotiateTime),
- tcpNoDelay(opts.tcpNoDelay)
-{
- if (!shouldListen) {
- listeningPort = boost::lexical_cast<uint16_t>(opts.port);
- return;
- }
- std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
- if (addresses.empty()) {
- // We specified some interfaces, but couldn't find addresses for them
- QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
- listeningPort = 0;
- }
+ TransportConnector::shared_ptr tc(new SocketConnector(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer(), &createSocket));
- for (unsigned i = 0; i<addresses.size(); ++i) {
- QPID_LOG(debug, "Using interface: " << addresses[i]);
- SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(opts.port));
-
- // We must have at least one resolved address
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = createSocket();
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
-
- listeningPort = lport;
-
- // Try any other resolved addresses
- while (sa.nextAddress()) {
- // Hack to ensure that all listening connections are on the same port
- sa.setAddrInfoPort(listeningPort);
- QPID_LOG(info, "Listening to: " << sa.asString())
- Socket* s = createSocket();
- uint16_t lport = s->listen(sa, opts.connectionBacklog);
- QPID_LOG(debug, "Listened to: " << lport);
- listeners.push_back(s);
+ broker->registerTransport("tcp", ta, tc, port);
}
}
-}
-
-void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f) {
- AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
- establishedCommon(async, poller, s);
-}
-
-void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, const std::string& name) {
- AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
- establishedCommon(async, poller, s);
-}
-
-void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
- if (tcpNoDelay) {
- s.setTcpNoDelay();
- QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
- }
-
- AsynchIO* aio = AsynchIO::create
- (s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
-
- async->init(aio, brokerTimer, maxNegotiateTime);
- aio->start(poller);
-}
-
-uint16_t AsynchIOProtocolFactory::getPort() const {
- return listeningPort; // Immutable no need for lock.
-}
-
-void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
- ConnectionCodec::Factory* fact) {
- for (unsigned i = 0; i<listeners.size(); ++i) {
- acceptors.push_back(
- AsynchAcceptor::create(listeners[i],
- boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact)));
- acceptors[i].start(poller);
- }
-}
-
-void AsynchIOProtocolFactory::connectFailed(
- const Socket& s, int ec, const std::string& emsg,
- ConnectFailedCallback failedCb)
-{
- failedCb(ec, emsg);
- s.close();
- delete &s;
-}
-
-void AsynchIOProtocolFactory::connect(
- Poller::shared_ptr poller,
- const std::string& name,
- const std::string& host, const std::string& port,
- ConnectionCodec::Factory* fact,
- ConnectFailedCallback failed)
-{
- // Note that the following logic does not cause a memory leak.
- // The allocated Socket is freed either by the AsynchConnector
- // upon connection failure or by the AsynchIO upon connection
- // shutdown. The allocated AsynchConnector frees itself when it
- // is no longer needed.
- Socket* socket = createSocket();
- try {
- AsynchConnector* c = AsynchConnector::create(
- *socket,
- host,
- port,
- boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
- this, poller, _1, fact, name),
- boost::bind(&AsynchIOProtocolFactory::connectFailed,
- this, _1, _2, _3, failed));
- c->start(poller);
- } catch (std::exception&) {
- // TODO: Design question - should we do the error callback and also throw?
- int errCode = socket->getError();
- connectFailed(*socket, errCode, strError(errCode), failed);
- throw;
- }
-}
+} tcpPlugin;
}} // namespace qpid::sys
Copied: qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h (from r1451398, qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h?p2=qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h&p1=qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h&r1=1451398&r2=1451443&rev=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h Fri Mar 1 00:21:12 2013
@@ -1,5 +1,5 @@
-#ifndef _sys_ProtocolFactory_h
-#define _sys_ProtocolFactory_h
+#ifndef QPID_SYS_TRANSPORTFACTORY_H
+#define QPID_SYS_TRANSPORTFACTORY_H
/*
*
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,24 +22,34 @@
*
*/
-#include "qpid/sys/IntegerTypes.h"
#include "qpid/SharedObject.h"
#include "qpid/sys/ConnectionCodec.h"
+#include <string>
#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace sys {
+class AsynchAcceptor;
class Poller;
+class Timer;
-class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
+class TransportAcceptor : public qpid::SharedObject<TransportAcceptor>
{
public:
+ virtual ~TransportAcceptor() = 0;
+ virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
+};
+
+inline TransportAcceptor::~TransportAcceptor() {}
+
+class TransportConnector : public qpid::SharedObject<TransportConnector>
+{
+public:
typedef boost::function2<void, int, std::string> ConnectFailedCallback;
- virtual ~ProtocolFactory() = 0;
- virtual uint16_t getPort() const = 0;
- virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
+ virtual ~TransportConnector() = 0;
virtual void connect(
boost::shared_ptr<Poller>,
const std::string& name,
@@ -48,10 +58,8 @@ class ProtocolFactory : public qpid::Sha
ConnectFailedCallback failed) = 0;
};
-inline ProtocolFactory::~ProtocolFactory() {}
+inline TransportConnector::~TransportConnector() {}
}}
-
-
-#endif //!_sys_ProtocolFactory_h
+#endif
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp Fri Mar 1 00:21:12 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org