You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2013/03/01 01:21:13 UTC

svn commit: r1451443 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/broker/windows/ qpid/sys/ qpid/sys/windows/

Author: astitcher
Date: Fri Mar  1 00:21:12 2013
New Revision: 1451443

URL: http://svn.apache.org/r1451443
Log:
QPID-4610: Remove duplicated transport code from C++ broker

Added:
    qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h
    qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h
      - copied, changed from r1451398, qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
Removed:
    qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Fri Mar  1 00:21:12 2013
@@ -1224,6 +1224,7 @@ set (qpidbroker_SOURCES
      qpid/management/ManagementAgent.cpp
      qpid/management/ManagementDirectExchange.cpp
      qpid/management/ManagementTopicExchange.cpp
+     qpid/sys/SocketTransport.cpp
      qpid/sys/TCPIOPlugin.cpp
 )
 add_msvc_version (qpidbroker library dll)

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Fri Mar  1 00:21:12 2013
@@ -755,7 +755,10 @@ libqpidbroker_la_SOURCES = \
   qpid/management/ManagementDirectExchange.h \
   qpid/management/ManagementTopicExchange.cpp \
   qpid/management/ManagementTopicExchange.h \
-  qpid/sys/TCPIOPlugin.cpp
+  qpid/sys/TCPIOPlugin.cpp \
+  qpid/sys/SocketTransport.cpp \
+  qpid/sys/SocketTransport.h \
+  qpid/sys/TransportFactory.h
 
 QPIDBROKER_VERSION_INFO = 2:0:0
 libqpidbroker_la_LDFLAGS = -version-info $(QPIDBROKER_VERSION_INFO)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Mar  1 00:21:12 2013
@@ -68,7 +68,7 @@
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/Uuid.h"
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
 #include "qpid/sys/Poller.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Thread.h"
@@ -89,7 +89,8 @@
 #include <iostream>
 #include <memory>
 
-using qpid::sys::ProtocolFactory;
+using qpid::sys::TransportAcceptor;
+using qpid::sys::TransportConnector;
 using qpid::sys::Poller;
 using qpid::sys::Dispatcher;
 using qpid::sys::Thread;
@@ -485,7 +486,7 @@ Manageable::status_t Broker::ManagementM
         string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport;
         QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport <<
                         "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
-        if (!getProtocolFactory(transport)) {
+        if (!getTransportInfo(transport).connectorFactory) {
             QPID_LOG(error, "Transport '" << transport << "' not supported");
             text = "transport type not supported";
             return  Manageable::STATUS_NOT_IMPLEMENTED;
@@ -795,7 +796,7 @@ void Broker::createObject(const std::str
             }
         }
 
-        if (!getProtocolFactory(transport)) {
+        if (!getTransportInfo(transport).connectorFactory) {
             QPID_LOG(error, "Transport '" << transport << "' not supported.");
             throw UnsupportedTransport(transport);
         }
@@ -1011,30 +1012,29 @@ bool Broker::getLogHiresTimestamp()
 }
 
 
-boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const {
-    ProtocolFactoryMap::const_iterator i
-        = name.empty() ? protocolFactories.begin() : protocolFactories.find(name);
-    if (i == protocolFactories.end()) return boost::shared_ptr<ProtocolFactory>();
+const Broker::TransportInfo& Broker::getTransportInfo(const std::string& name) const {
+    static TransportInfo nullTransportInfo;
+    TransportMap::const_iterator i
+        = name.empty() ? transportMap.begin() : transportMap.find(name);
+    if (i == transportMap.end()) return nullTransportInfo;
     else return i->second;
 }
 
 uint16_t Broker::getPort(const std::string& name) const  {
-    boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(name);
-    if (factory) {
-        return factory->getPort();
+    if (int p = getTransportInfo(name).port) {
+        return p;
     } else {
         throw NoSuchTransportException(QPID_MSG("No such transport: '" << name << "'"));
     }
 }
 
-void Broker::registerProtocolFactory(const std::string& name, ProtocolFactory::shared_ptr protocolFactory) {
-    protocolFactories[name] = protocolFactory;
-    Url::addProtocol(name);
+void Broker::registerTransport(const std::string& name, boost::shared_ptr<TransportAcceptor> a, boost::shared_ptr<TransportConnector> c, uint16_t p) {
+    transportMap[name] = TransportInfo(a, c, p);
 }
 
 void Broker::accept() {
-    for (ProtocolFactoryMap::const_iterator i = protocolFactories.begin(); i != protocolFactories.end(); i++) {
-        i->second->accept(poller, factory.get());
+    for (TransportMap::const_iterator i = transportMap.begin(); i != transportMap.end(); i++) {
+        if (i->second.acceptor) i->second.acceptor->accept(poller, factory.get());
     }
 }
 
@@ -1043,8 +1043,8 @@ void Broker::connect(
     const std::string& host, const std::string& port, const std::string& transport,
     boost::function2<void, int, std::string> failed)
 {
-    boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport);
-    if (pf) pf->connect(poller, name, host, port, factory.get(), failed);
+    boost::shared_ptr<TransportConnector> tcf = getTransportInfo(transport).connectorFactory;
+    if (tcf) tcf->connect(poller, name, host, port, factory.get(), failed);
     else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport));
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Mar  1 00:21:12 2013
@@ -51,7 +51,8 @@
 namespace qpid {
 
 namespace sys {
-class ProtocolFactory;
+class TransportAcceptor;
+class TransportConnector;
 class Poller;
 class Timer;
 }
@@ -124,8 +125,23 @@ class Broker : public sys::Runnable, pub
     };
 
   private:
-    typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
+    struct TransportInfo {
+        boost::shared_ptr<sys::TransportAcceptor> acceptor;
+        boost::shared_ptr<sys::TransportConnector> connectorFactory;
+        uint16_t port;
 
+        TransportInfo() :
+            port(0)
+        {}
+
+        TransportInfo(boost::shared_ptr<sys::TransportAcceptor> a, boost::shared_ptr<sys::TransportConnector> c, uint16_t p) :
+            acceptor(a),
+            connectorFactory(c),
+            port(p)
+        {}
+    };
+    typedef std::map<std::string, TransportInfo > TransportMap;
+    
     void declareStandardExchange(const std::string& name, const std::string& type);
     void setStore ();
     void setLogLevel(const std::string& level);
@@ -150,7 +166,7 @@ class Broker : public sys::Runnable, pub
     std::auto_ptr<sys::Timer> timer;
     Options config;
     std::auto_ptr<management::ManagementAgent> managementAgent;
-    ProtocolFactoryMap protocolFactories;
+    TransportMap transportMap;
     std::auto_ptr<MessageStore> store;
     AclModule* acl;
     DataDir dataDir;
@@ -228,9 +244,11 @@ class Broker : public sys::Runnable, pub
         uint32_t methodId, management::Args& args, std::string& text);
 
     /** Add to the broker's protocolFactorys */
-    QPID_BROKER_EXTERN void registerProtocolFactory(
-        const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
-
+    QPID_BROKER_EXTERN void registerTransport(
+        const std::string& name,
+        boost::shared_ptr<sys::TransportAcceptor>, boost::shared_ptr<sys::TransportConnector>,
+        uint16_t port);
+    
     /** Accept connections */
     QPID_BROKER_EXTERN void accept();
 
@@ -251,7 +269,7 @@ class Broker : public sys::Runnable, pub
         uint32_t  qty,
         const qpid::types::Variant::Map& filter);
 
-    QPID_BROKER_EXTERN boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(
+    QPID_BROKER_EXTERN const TransportInfo& getTransportInfo(
         const std::string& name = TCP_TRANSPORT) const;
 
     /** Expose poller so plugins can register their descriptors. */

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Fri Mar  1 00:21:12 2013
@@ -1,379 +1,314 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/ProtocolFactory.h"
-
-#include "qpid/Plugin.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
-#include "qpid/sys/ConnectionCodec.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/windows/SslAsynchIO.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <memory>
-
-// security.h needs to see this to distinguish from kernel use.
-#define SECURITY_WIN32
-#include <security.h>
-#include <Schnlsp.h>
-#undef SECURITY_WIN32
-
-
-namespace qpid {
-namespace sys {
-
-class Timer;
-
-namespace windows {
-
-struct SslServerOptions : qpid::Options
-{
-    std::string certStore;
-    std::string certStoreLocation;
-    std::string certName;
-    uint16_t port;
-    bool clientAuth;
-
-    SslServerOptions() : qpid::Options("SSL Options"),
-                         certStore("My"),
-                         certStoreLocation("CurrentUser"),
-                         certName("localhost"),
-                         port(5671),
-                         clientAuth(false)
-    {
-        qpid::Address me;
-        if (qpid::sys::SystemInfo::getLocalHostname(me))
-            certName = me.host;
-
-        addOptions()
-            ("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate")
-            ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"),
-             "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )")
-            ("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use")
-            ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
-            ("ssl-require-client-authentication", optValue(clientAuth), 
-             "Forces clients to authenticate in order to establish an SSL connection");
-    }
-};
-
-class SslProtocolFactory : public qpid::sys::ProtocolFactory {
-    boost::ptr_vector<Socket> listeners;
-    boost::ptr_vector<AsynchAcceptor> acceptors;
-    Timer& brokerTimer;
-    uint32_t maxNegotiateTime;
-    uint16_t listeningPort;
-    const bool tcpNoDelay;
-    std::string brokerHost;
-    const bool clientAuthSelected;
-    std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
-    ConnectFailedCallback connectFailedCallback;
-    CredHandle credHandle;
-
-  public:
-    SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer);
-    ~SslProtocolFactory();
-    void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
-    void connect(sys::Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
-                 sys::ConnectionCodec::Factory*,
-                 ConnectFailedCallback failed);
-
-    uint16_t getPort() const;
-
-  private:
-    void connectFailed(const qpid::sys::Socket&,
-                       int err,
-                       const std::string& msg);
-    void establishedIncoming(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*);
-    void establishedOutgoing(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*, std::string& );
-    void establishedCommon(sys::Poller::shared_ptr, sys::AsynchIOHandler*, sys::AsynchIO*, const qpid::sys::Socket&);
-};
-
-// Static instance to initialise plugin
-static struct SslPlugin : public Plugin {
-    SslServerOptions options;
-
-    Options* getOptions() { return &options; }
-
-    void earlyInitialize(Target&) {
-    }
-    
-    void initialize(Target& target) {
-        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
-        // Only provide to a Broker
-        if (broker) {
-            try {
-                const broker::Broker::Options& opts = broker->getOptions();
-                ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(opts, options, broker->getTimer()));
-                QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
-                broker->registerProtocolFactory("ssl", protocol);
-            } catch (const std::exception& e) {
-                QPID_LOG(error, "Failed to initialise SSL listener: " << e.what());
-            }
-        }
-    }
-} sslPlugin;
-
-namespace {
-    // Expand list of Interfaces and addresses to a list of addresses
-    std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
-        std::vector<std::string> addresses;
-        // If there are no specific interfaces listed use a single "" to listen on every interface
-        if (interfaces.empty()) {
-            addresses.push_back("");
-            return addresses;
-        }
-        for (unsigned i = 0; i < interfaces.size(); ++i) {
-            const std::string& interface = interfaces[i];
-            if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
-                // We don't have an interface of that name -
-                // Check for IPv6 ('[' ']') brackets and remove them
-                // then pass to be looked up directly
-                if (interface[0]=='[' && interface[interface.size()-1]==']') {
-                    addresses.push_back(interface.substr(1, interface.size()-2));
-                } else {
-                    addresses.push_back(interface);
-                }
-            }
-        }
-        return addresses;
-    }
-}
-
-SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer)
-    : brokerTimer(timer),
-      maxNegotiateTime(opts.maxNegotiateTime),
-      tcpNoDelay(opts.tcpNoDelay),
-      clientAuthSelected(options.clientAuth) {
-
-    // Make sure that certificate store is good before listening to sockets
-    // to avoid having open and listening sockets when there is no cert store
-    SecInvalidateHandle(&credHandle);
-
-    // Get the certificate for this server.
-    DWORD flags = 0;
-    std::string certStoreLocation = options.certStoreLocation;
-    std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
-    if (certStoreLocation == "currentuser") {
-        flags = CERT_SYSTEM_STORE_CURRENT_USER;
-    } else if (certStoreLocation == "localmachine") {
-        flags = CERT_SYSTEM_STORE_LOCAL_MACHINE;
-    } else if (certStoreLocation == "currentservice") {
-        flags = CERT_SYSTEM_STORE_CURRENT_SERVICE;
-    } else {
-        QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation
-            << " - Using default location");
-    }
-    HCERTSTORE certStoreHandle;
-    certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A,
-                                      X509_ASN_ENCODING,
-                                      0,
-                                      flags |
-                                      CERT_STORE_READONLY_FLAG,
-                                      options.certStore.c_str());
-    if (!certStoreHandle)
-        throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
-
-    PCCERT_CONTEXT certContext;
-    certContext = ::CertFindCertificateInStore(certStoreHandle,
-                                               X509_ASN_ENCODING,
-                                               0,
-                                               CERT_FIND_SUBJECT_STR_A,
-                                               options.certName.c_str(),
-                                               NULL);
-    if (certContext == NULL) {
-        int err = ::GetLastError();
-        ::CertCloseStore(certStoreHandle, 0);
-        throw qpid::Exception(QPID_MSG("Locating certificate " << options.certName << " in store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
-        throw QPID_WINDOWS_ERROR(err);
-    }
-
-    SCHANNEL_CRED cred;
-    memset(&cred, 0, sizeof(cred));
-    cred.dwVersion = SCHANNEL_CRED_VERSION;
-    cred.cCreds = 1;
-    cred.paCred = &certContext;
-    SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
-                                                        UNISP_NAME,
-                                                        SECPKG_CRED_INBOUND,
-                                                        NULL,
-                                                        &cred,
-                                                        NULL,
-                                                        NULL,
-                                                        &credHandle,
-                                                        NULL);
-    if (status != SEC_E_OK)
-        throw QPID_WINDOWS_ERROR(status);
-    ::CertFreeCertificateContext(certContext);
-    ::CertCloseStore(certStoreHandle, 0);
-
-    std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
-    if (addresses.empty()) {
-        // We specified some interfaces, but couldn't find addresses for them
-        QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
-        listeningPort = 0;
-    }
-
-    for (unsigned i = 0; i<addresses.size(); ++i) {
-        QPID_LOG(debug, "Using interface: " << addresses[i]);
-        SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
-
-
-        // We must have at least one resolved address
-        QPID_LOG(info, "SSL Listening to: " << sa.asString())
-        Socket* s = createSocket();
-        listeningPort = s->listen(sa, opts.connectionBacklog);
-        listeners.push_back(s);
-
-        // Try any other resolved addresses
-        while (sa.nextAddress()) {
-            QPID_LOG(info, "SSL Listening to: " << sa.asString())
-            Socket* s = createSocket();
-            s->listen(sa, opts.connectionBacklog);
-            listeners.push_back(s);
-        }
-    }
-}
-
-SslProtocolFactory::~SslProtocolFactory() {
-    ::FreeCredentialsHandle(&credHandle);
-}
-
-void SslProtocolFactory::connectFailed(const qpid::sys::Socket&,
-                                       int err,
-                                       const std::string& msg) {
-    if (connectFailedCallback)
-        connectFailedCallback(err, msg);
-}
-
-void SslProtocolFactory::establishedIncoming(sys::Poller::shared_ptr poller,
-                                             const qpid::sys::Socket& s,
-                                             sys::ConnectionCodec::Factory* f) {
-    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false, false);
-
-    sys::AsynchIO *aio = 
-        new qpid::sys::windows::ServerSslAsynchIO(
-            clientAuthSelected,
-            s,
-            credHandle,
-            boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-            boost::bind(&AsynchIOHandler::eof, async, _1),
-            boost::bind(&AsynchIOHandler::disconnect, async, _1),
-            boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-            boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-            boost::bind(&AsynchIOHandler::idle, async, _1));
-
-	establishedCommon(poller, async, aio, s);
-}
-
-void SslProtocolFactory::establishedOutgoing(sys::Poller::shared_ptr poller,
-                                             const qpid::sys::Socket& s,
-                                             sys::ConnectionCodec::Factory* f,
-                                             std::string& name) {
-    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(name, f, true, false);
-
-    sys::AsynchIO *aio = 
-        new qpid::sys::windows::ClientSslAsynchIO(
-            brokerHost,
-            s,
-            credHandle,
-            boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-            boost::bind(&AsynchIOHandler::eof, async, _1),
-            boost::bind(&AsynchIOHandler::disconnect, async, _1),
-            boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-            boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-            boost::bind(&AsynchIOHandler::idle, async, _1));
-
-	establishedCommon(poller, async, aio, s);
-}
-
-void SslProtocolFactory::establishedCommon(sys::Poller::shared_ptr poller,
-                                           sys::AsynchIOHandler* async,
-                                           sys::AsynchIO* aio,
-                                           const qpid::sys::Socket& s) {
-    if (tcpNoDelay) {
-        s.setTcpNoDelay();
-        QPID_LOG(info,
-                 "Set TCP_NODELAY on connection to " << s.getPeerAddress());
-    }
-
-    async->init(aio, brokerTimer, maxNegotiateTime);
-    aio->start(poller);
-}
-
-uint16_t SslProtocolFactory::getPort() const {
-    return listeningPort; // Immutable no need for lock.
-}
-
-void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
-                                sys::ConnectionCodec::Factory* fact) {
-    for (unsigned i = 0; i<listeners.size(); ++i) {
-        acceptors.push_back(
-            AsynchAcceptor::create(listeners[i],
-                            boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
-        acceptors[i].start(poller);
-    }
-}
-
-void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
-                                 const std::string& name,
-                                 const std::string& host,
-                                 const std::string& port,
-                                 sys::ConnectionCodec::Factory* fact,
-                                 ConnectFailedCallback failed)
-{
-    SCHANNEL_CRED cred;
-    memset(&cred, 0, sizeof(cred));
-    cred.dwVersion = SCHANNEL_CRED_VERSION;
-    SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
-                                                        UNISP_NAME,
-                                                        SECPKG_CRED_OUTBOUND,
-                                                        NULL,
-                                                        &cred,
-                                                        NULL,
-                                                        NULL,
-                                                        &credHandle,
-                                                        NULL);
-    if (status != SEC_E_OK)
-        throw QPID_WINDOWS_ERROR(status);
-
-    brokerHost = host;
-    // Note that the following logic does not cause a memory leak.
-    // The allocated Socket is freed either by the AsynchConnector
-    // upon connection failure or by the AsynchIO upon connection
-    // shutdown.  The allocated AsynchConnector frees itself when it
-    // is no longer needed.
-    qpid::sys::Socket* socket = createSocket();
-    connectFailedCallback = failed;
-    AsynchConnector::create(*socket,
-                            host,
-                            port,
-                            boost::bind(&SslProtocolFactory::establishedOutgoing,
-                                        this, poller, _1, fact, name),
-                            boost::bind(&SslProtocolFactory::connectFailed,
-                                        this, _1, _2, _3));
-}
-
-}}} // namespace qpid::sys::windows
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/TransportFactory.h"
+#include "qpid/sys/SocketTransport.h"
+
+#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
+#include "qpid/sys/windows/SslAsynchIO.h"
+
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <memory>
+
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+
+namespace qpid {
+namespace sys {
+
+class Timer;
+
+namespace windows {
+
+struct SslServerOptions : qpid::Options
+{
+    std::string certStore;
+    std::string certStoreLocation;
+    std::string certName;
+    uint16_t port;
+    bool clientAuth;
+
+    SslServerOptions() : qpid::Options("SSL Options"),
+                         certStore("My"),
+                         certStoreLocation("CurrentUser"),
+                         certName("localhost"),
+                         port(5671),
+                         clientAuth(false)
+    {
+        qpid::Address me;
+        if (qpid::sys::SystemInfo::getLocalHostname(me))
+            certName = me.host;
+
+        addOptions()
+            ("ssl-cert-store", optValue(certStore, "NAME"), "Local store name from which to obtain certificate")
+            ("ssl-cert-store-location", optValue(certStoreLocation, "NAME"),
+             "Local store name location for certificates ( CurrentUser | LocalMachine | CurrentService )")
+            ("ssl-cert-name", optValue(certName, "NAME"), "Name of the certificate to use")
+            ("ssl-port", optValue(port, "PORT"), "Port on which to listen for SSL connections")
+            ("ssl-require-client-authentication", optValue(clientAuth),
+             "Forces clients to authenticate in order to establish an SSL connection");
+    }
+};
+
+class SslProtocolFactory : public qpid::sys::SocketAcceptor, public qpid::sys::TransportConnector {
+    Timer& brokerTimer;
+    uint32_t maxNegotiateTime;
+    const bool tcpNoDelay;
+    std::string brokerHost;
+    const bool clientAuthSelected;
+    std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
+    ConnectFailedCallback connectFailedCallback;
+    CredHandle credHandle;
+
+  public:
+    SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions&, Timer& timer);
+    ~SslProtocolFactory();
+
+    void connect(sys::Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
+                 sys::ConnectionCodec::Factory*,
+                 ConnectFailedCallback failed);
+
+  private:
+    void connectFailed(const qpid::sys::Socket&,
+                       int err,
+                       const std::string& msg);
+    void establishedIncoming(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*);
+    void establishedOutgoing(sys::Poller::shared_ptr, const qpid::sys::Socket&, sys::ConnectionCodec::Factory*, std::string& );
+    void establishedCommon(sys::Poller::shared_ptr, sys::AsynchIOHandler*, sys::AsynchIO*, const qpid::sys::Socket&);
+};
+
+// Static instance to initialise plugin
+static struct SslPlugin : public Plugin {
+    SslServerOptions options;
+
+    Options* getOptions() { return &options; }
+
+    void earlyInitialize(Target&) {
+    }
+
+    void initialize(Target& target) {
+        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+        // Only provide to a Broker
+        if (broker) {
+            try {
+                const broker::Broker::Options& opts = broker->getOptions();
+                boost::shared_ptr<SslProtocolFactory> protocol(new SslProtocolFactory(opts, options, broker->getTimer()));
+                uint16_t port =
+                    protocol->listen(opts.listenInterfaces,
+                                     boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog,
+                                     &createSocket);
+                QPID_LOG(notice, "Listening for SSL connections on TCP port " << port);
+                broker->registerTransport("ssl", protocol, protocol, port);
+            } catch (const std::exception& e) {
+                QPID_LOG(error, "Failed to initialise SSL listener: " << e.what());
+            }
+        }
+    }
+} sslPlugin;
+
+SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options, Timer& timer)
+    : SocketAcceptor(opts.tcpNoDelay, false, opts.maxNegotiateTime, timer,
+                     boost::bind(&SslProtocolFactory::establishedIncoming, this, _1, _2, _3)),
+      brokerTimer(timer),
+      maxNegotiateTime(opts.maxNegotiateTime),
+      tcpNoDelay(opts.tcpNoDelay),
+      clientAuthSelected(options.clientAuth) {
+
+    // Make sure that certificate store is good before listening to sockets
+    // to avoid having open and listening sockets when there is no cert store
+    SecInvalidateHandle(&credHandle);
+
+    // Get the certificate for this server.
+    DWORD flags = 0;
+    std::string certStoreLocation = options.certStoreLocation;
+    std::transform(certStoreLocation.begin(), certStoreLocation.end(), certStoreLocation.begin(), ::tolower);
+    if (certStoreLocation == "currentuser") {
+        flags = CERT_SYSTEM_STORE_CURRENT_USER;
+    } else if (certStoreLocation == "localmachine") {
+        flags = CERT_SYSTEM_STORE_LOCAL_MACHINE;
+    } else if (certStoreLocation == "currentservice") {
+        flags = CERT_SYSTEM_STORE_CURRENT_SERVICE;
+    } else {
+        QPID_LOG(error, "Unrecognised SSL certificate store location: " << options.certStoreLocation
+            << " - Using default location");
+    }
+    HCERTSTORE certStoreHandle;
+    certStoreHandle = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A,
+                                      X509_ASN_ENCODING,
+                                      0,
+                                      flags |
+                                      CERT_STORE_READONLY_FLAG,
+                                      options.certStore.c_str());
+    if (!certStoreHandle)
+        throw qpid::Exception(QPID_MSG("Opening store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
+
+    PCCERT_CONTEXT certContext;
+    certContext = ::CertFindCertificateInStore(certStoreHandle,
+                                               X509_ASN_ENCODING,
+                                               0,
+                                               CERT_FIND_SUBJECT_STR_A,
+                                               options.certName.c_str(),
+                                               NULL);
+    if (certContext == NULL) {
+        int err = ::GetLastError();
+        ::CertCloseStore(certStoreHandle, 0);
+        throw qpid::Exception(QPID_MSG("Locating certificate " << options.certName << " in store " << options.certStore << " " << qpid::sys::strError(GetLastError())));
+        throw QPID_WINDOWS_ERROR(err);
+    }
+
+    SCHANNEL_CRED cred;
+    memset(&cred, 0, sizeof(cred));
+    cred.dwVersion = SCHANNEL_CRED_VERSION;
+    cred.cCreds = 1;
+    cred.paCred = &certContext;
+    SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
+                                                        UNISP_NAME,
+                                                        SECPKG_CRED_INBOUND,
+                                                        NULL,
+                                                        &cred,
+                                                        NULL,
+                                                        NULL,
+                                                        &credHandle,
+                                                        NULL);
+    if (status != SEC_E_OK)
+        throw QPID_WINDOWS_ERROR(status);
+    ::CertFreeCertificateContext(certContext);
+    ::CertCloseStore(certStoreHandle, 0);
+}
+
+SslProtocolFactory::~SslProtocolFactory() {
+    ::FreeCredentialsHandle(&credHandle);
+}
+
+void SslProtocolFactory::connectFailed(const qpid::sys::Socket&,
+                                       int err,
+                                       const std::string& msg) {
+    if (connectFailedCallback)
+        connectFailedCallback(err, msg);
+}
+
+void SslProtocolFactory::establishedIncoming(sys::Poller::shared_ptr poller,
+                                             const qpid::sys::Socket& s,
+                                             sys::ConnectionCodec::Factory* f) {
+    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(s.getFullAddress(), f, false, false);
+
+    sys::AsynchIO *aio =
+        new qpid::sys::windows::ServerSslAsynchIO(
+            clientAuthSelected,
+            s,
+            credHandle,
+            boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+            boost::bind(&AsynchIOHandler::eof, async, _1),
+            boost::bind(&AsynchIOHandler::disconnect, async, _1),
+            boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+            boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+            boost::bind(&AsynchIOHandler::idle, async, _1));
+
+	establishedCommon(poller, async, aio, s);
+}
+
+void SslProtocolFactory::establishedOutgoing(sys::Poller::shared_ptr poller,
+                                             const qpid::sys::Socket& s,
+                                             sys::ConnectionCodec::Factory* f,
+                                             std::string& name) {
+    sys::AsynchIOHandler* async = new sys::AsynchIOHandler(name, f, true, false);
+
+    sys::AsynchIO *aio =
+        new qpid::sys::windows::ClientSslAsynchIO(
+            brokerHost,
+            s,
+            credHandle,
+            boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+            boost::bind(&AsynchIOHandler::eof, async, _1),
+            boost::bind(&AsynchIOHandler::disconnect, async, _1),
+            boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+            boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+            boost::bind(&AsynchIOHandler::idle, async, _1));
+
+	establishedCommon(poller, async, aio, s);
+}
+
+void SslProtocolFactory::establishedCommon(sys::Poller::shared_ptr poller,
+                                           sys::AsynchIOHandler* async,
+                                           sys::AsynchIO* aio,
+                                           const qpid::sys::Socket& s) {
+    if (tcpNoDelay) {
+        s.setTcpNoDelay();
+        QPID_LOG(info,
+                 "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+    }
+
+    async->init(aio, brokerTimer, maxNegotiateTime);
+    aio->start(poller);
+}
+
+void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
+                                 const std::string& name,
+                                 const std::string& host,
+                                 const std::string& port,
+                                 sys::ConnectionCodec::Factory* fact,
+                                 ConnectFailedCallback failed)
+{
+    SCHANNEL_CRED cred;
+    memset(&cred, 0, sizeof(cred));
+    cred.dwVersion = SCHANNEL_CRED_VERSION;
+    SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
+                                                        UNISP_NAME,
+                                                        SECPKG_CRED_OUTBOUND,
+                                                        NULL,
+                                                        &cred,
+                                                        NULL,
+                                                        NULL,
+                                                        &credHandle,
+                                                        NULL);
+    if (status != SEC_E_OK)
+        throw QPID_WINDOWS_ERROR(status);
+
+    brokerHost = host;
+    // Note that the following logic does not cause a memory leak.
+    // The allocated Socket is freed either by the AsynchConnector
+    // upon connection failure or by the AsynchIO upon connection
+    // shutdown.  The allocated AsynchConnector frees itself when it
+    // is no longer needed.
+    qpid::sys::Socket* socket = createSocket();
+    connectFailedCallback = failed;
+    AsynchConnector::create(*socket,
+                            host,
+                            port,
+                            boost::bind(&SslProtocolFactory::establishedOutgoing,
+                                        this, poller, _1, fact, name),
+                            boost::bind(&SslProtocolFactory::connectFailed,
+                                        this, _1, _2, _3));
+}
+
+}}}

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Fri Mar  1 00:21:12 2013
@@ -19,7 +19,7 @@
  *
  */
 
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
@@ -239,7 +239,7 @@ void RdmaIOHandler::initProtocolIn(Rdma:
     }
 }
 
-class RdmaIOProtocolFactory : public ProtocolFactory {
+class RdmaIOProtocolFactory : public TransportAcceptor, public TransportConnector {
     auto_ptr<Rdma::Listener> listener;
     const uint16_t listeningPort;
 
@@ -275,9 +275,10 @@ static class RdmaIOPlugin : public Plugi
         // Only provide to a Broker
         if (broker) {
             const broker::Broker::Options& opts = broker->getOptions();
-            ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog));
-            QPID_LOG(notice, "Rdma: Listening on RDMA port " << protocol->getPort());
-            broker->registerProtocolFactory("rdma", protocol);
+            boost::shared_ptr<RdmaIOProtocolFactory> protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog));
+            uint16_t port = protocol->getPort();
+            QPID_LOG(notice, "Rdma: Listening on RDMA port " << port);
+            broker->registerTransport("rdma", protocol, protocol, port);
         }
     }
 } rdmaPlugin;

Added: qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp?rev=1451443&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp Fri Mar  1 00:21:12 2013
@@ -0,0 +1,209 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/SocketTransport.h"
+
+#include "qpid/broker/NameGenerator.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/AsynchIOHandler.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
+#include "qpid/sys/SystemInfo.h"
+
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+    void establishedCommon(
+        AsynchIOHandler* async,
+        boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer,
+        const Socket& s)
+    {
+        if (opts.tcpNoDelay) {
+            s.setTcpNoDelay();
+            QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+        }
+
+        AsynchIO* aio = AsynchIO::create
+        (s,
+         boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+         boost::bind(&AsynchIOHandler::eof, async, _1),
+         boost::bind(&AsynchIOHandler::disconnect, async, _1),
+         boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+         boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+         boost::bind(&AsynchIOHandler::idle, async, _1));
+
+        async->init(aio, *timer, opts.maxNegotiateTime);
+        aio->start(poller);
+    }
+
+    void establishedIncoming(
+        boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer,
+        const Socket& s, ConnectionCodec::Factory* f)
+    {
+        AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, opts.nodict);
+        establishedCommon(async, poller, opts, timer, s);
+    }
+
+    void establishedOutgoing(
+        boost::shared_ptr<Poller> poller, const SocketTransportOptions& opts, Timer* timer,
+        const Socket& s, ConnectionCodec::Factory* f, const std::string& name)
+    {
+        AsynchIOHandler* async = new AsynchIOHandler(name, f, true, opts.nodict);
+        establishedCommon(async, poller, opts, timer, s);
+    }
+
+    void connectFailed(
+        const Socket& s, int ec, const std::string& emsg,
+        SocketConnector::ConnectFailedCallback failedCb)
+    {
+        failedCb(ec, emsg);
+        s.close();
+        delete &s;
+    }
+
+    // Expand list of Interfaces and addresses to a list of addresses
+    std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
+        std::vector<std::string> addresses;
+        // If there are no specific interfaces listed use a single "" to listen on every interface
+        if (interfaces.empty()) {
+            addresses.push_back("");
+            return addresses;
+        }
+        for (unsigned i = 0; i < interfaces.size(); ++i) {
+            const std::string& interface = interfaces[i];
+            if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
+                // We don't have an interface of that name -
+                // Check for IPv6 ('[' ']') brackets and remove them
+                // then pass to be looked up directly
+                if (interface[0]=='[' && interface[interface.size()-1]==']') {
+                    addresses.push_back(interface.substr(1, interface.size()-2));
+                } else {
+                    addresses.push_back(interface);
+                }
+            }
+        }
+        return addresses;
+    }
+}
+
+SocketAcceptor::SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0) :
+    timer(timer0),
+    options(tcpNoDelay, nodict, maxNegotiateTime),
+    established(boost::bind(&establishedIncoming, _1, options, &timer, _2, _3))
+{}
+
+SocketAcceptor::SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0, const EstablishedCallback& established0) :
+    timer(timer0),
+    options(tcpNoDelay, nodict, maxNegotiateTime),
+    established(established0)
+{}
+
+void SocketAcceptor::addListener(Socket* socket)
+{
+    listeners.push_back(socket);
+}
+
+uint16_t SocketAcceptor::listen(const std::vector<std::string>& interfaces, const std::string& port, int backlog, const SocketFactory& factory)
+{
+    std::vector<std::string> addresses = expandInterfaces(interfaces);
+    if (addresses.empty()) {
+        // We specified some interfaces, but couldn't find addresses for them
+        QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
+        return 0;
+    }
+
+    int listeningPort = 0;
+    for (unsigned i = 0; i<addresses.size(); ++i) {
+        QPID_LOG(debug, "Using interface: " << addresses[i]);
+        SocketAddress sa(addresses[i], port);
+
+        // We must have at least one resolved address
+        QPID_LOG(info, "Listening to: " << sa.asString())
+        Socket* s = factory();
+        uint16_t lport = s->listen(sa, backlog);
+        QPID_LOG(debug, "Listened to: " << lport);
+        addListener(s);
+
+        listeningPort = lport;
+
+        // Try any other resolved addresses
+        while (sa.nextAddress()) {
+            // Hack to ensure that all listening connections are on the same port
+            sa.setAddrInfoPort(listeningPort);
+            QPID_LOG(info, "Listening to: " << sa.asString())
+            Socket* s = factory();
+            uint16_t lport = s->listen(sa, backlog);
+            QPID_LOG(debug, "Listened to: " << lport);
+            addListener(s);
+        }
+    }
+    return listeningPort;
+}
+
+void SocketAcceptor::accept(boost::shared_ptr<Poller> poller, ConnectionCodec::Factory* f)
+{
+    for (unsigned i = 0; i<listeners.size(); ++i) {
+        acceptors.push_back(
+            AsynchAcceptor::create(listeners[i], boost::bind(established, poller, _1, f)));
+        acceptors[i].start(poller);
+    }
+}
+
+SocketConnector::SocketConnector(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer0, const SocketFactory& factory0) :
+    timer(timer0),
+    factory(factory0),
+    options(tcpNoDelay, nodict, maxNegotiateTime)
+{}
+
+void SocketConnector::connect(
+    boost::shared_ptr<Poller> poller,
+    const std::string& name,
+    const std::string& host, const std::string& port,
+    ConnectionCodec::Factory* fact,
+    ConnectFailedCallback failed)
+{
+    // Note that the following logic does not cause a memory leak.
+    // The allocated Socket is freed either by the AsynchConnector
+    // upon connection failure or by the AsynchIO upon connection
+    // shutdown.  The allocated AsynchConnector frees itself when it
+    // is no longer needed.
+    Socket* socket = factory();
+    try {
+        AsynchConnector* c = AsynchConnector::create(
+            *socket,
+            host,
+            port,
+            boost::bind(&establishedOutgoing, poller, options, &timer, _1, fact, name),
+            boost::bind(&connectFailed, _1, _2, _3, failed));
+        c->start(poller);
+    } catch (std::exception&) {
+        // TODO: Design question - should we do the error callback and also throw?
+        int errCode = socket->getError();
+        connectFailed(*socket, errCode, strError(errCode), failed);
+        throw;
+    }
+}
+
+}} // namespace qpid::sys

Added: qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h?rev=1451443&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.h Fri Mar  1 00:21:12 2013
@@ -0,0 +1,91 @@
+#ifndef QPID_SYS_SOCKETTRANSPORT_H
+#define QPID_SYS_SOCKETTRANSPORT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/TransportFactory.h"
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace sys {
+
+class AsynchAcceptor;
+class Poller;
+class Timer;
+class Socket;
+typedef boost::function0<Socket*> SocketFactory;
+typedef boost::function3<void, boost::shared_ptr<Poller>, const Socket&, ConnectionCodec::Factory*> EstablishedCallback;
+
+struct SocketTransportOptions {
+    bool tcpNoDelay;
+    bool nodict;
+    uint32_t maxNegotiateTime;
+
+    SocketTransportOptions(bool t, bool d, uint32_t m) :
+        tcpNoDelay(t),
+        nodict(d),
+        maxNegotiateTime(m)
+    {}
+};
+
+class SocketAcceptor : public TransportAcceptor {
+    boost::ptr_vector<Socket> listeners;
+    boost::ptr_vector<AsynchAcceptor> acceptors;
+    Timer& timer;
+    SocketTransportOptions options;
+    const EstablishedCallback established;
+
+public:
+    SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer);
+    SocketAcceptor(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer, const EstablishedCallback& established);
+
+    // Create sockets from list of interfaces and listen to them
+    uint16_t listen(const std::vector<std::string>& interfaces, const std::string& port, int backlog, const SocketFactory& factory);
+
+    // Import sockets that are already being listened to
+    void addListener(Socket* socket);
+
+    void accept(boost::shared_ptr<Poller> poller, ConnectionCodec::Factory* f);
+};
+
+class SocketConnector : public TransportConnector {
+    Timer& timer;
+    const SocketFactory factory;
+    SocketTransportOptions options;
+
+public:
+    SocketConnector(bool tcpNoDelay, bool nodict, uint32_t maxNegotiateTime, Timer& timer, const SocketFactory& factory);
+
+    void connect(boost::shared_ptr<Poller> poller,
+                 const std::string& name,
+                 const std::string& host, const std::string& port,
+                 ConnectionCodec::Factory* f,
+                 ConnectFailedCallback failed);
+};
+
+}}
+
+#endif

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp Fri Mar  1 00:21:12 2013
@@ -19,22 +19,17 @@
  *
  */
 
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/NameGenerator.h"
 #include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/SocketTransport.h"
 #include "qpid/sys/ssl/util.h"
 #include "qpid/sys/ssl/SslSocket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/Poller.h"
 
 #include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
 
 namespace qpid {
 namespace sys {
@@ -64,32 +59,20 @@ struct SslServerOptions : ssl::SslOption
     }
 };
 
-class SslProtocolFactory : public ProtocolFactory {
-    boost::ptr_vector<Socket> listeners;
-    boost::ptr_vector<AsynchAcceptor> acceptors;
-    Timer& brokerTimer;
-    uint32_t maxNegotiateTime;
-    uint16_t listeningPort;
-    const bool tcpNoDelay;
-    bool nodict;
+namespace {
+    Socket* createServerSSLSocket(const SslServerOptions& options) {
+        return new SslSocket(options.certName, options.clientAuth);
+    }
 
-  public:
-    SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
-                       Timer& timer);
-    void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const std::string& name, const std::string& host, const std::string& port,
-                 ConnectionCodec::Factory*,
-                 ConnectFailedCallback);
-
-    uint16_t getPort() const;
-
-  private:
-    void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
-    void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
-    void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
-    void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
-};
+    Socket* createServerSSLMuxSocket(const SslServerOptions& options) {
+        return new SslMuxSocket(options.certName, options.clientAuth);
+    }
+
+    Socket* createClientSSLSocket() {
+        return new SslSocket();
+    }
 
+}
 
 // Static instance to initialise plugin
 static struct SslPlugin : public Plugin {
@@ -104,7 +87,7 @@ static struct SslPlugin : public Plugin 
     void earlyInitialize(Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         if (broker && !options.certDbPath.empty()) {
-            const broker::Broker::Options& opts = broker->getOptions();
+            broker::Broker::Options& opts = broker->getOptions();
 
             if (opts.port == options.port && // AMQP & AMQPS ports are the same
                 opts.port != 0) {
@@ -132,18 +115,25 @@ static struct SslPlugin : public Plugin 
                     nssInitialized = true;
 
                     const broker::Broker::Options& opts = broker->getOptions();
-
-                    ProtocolFactory::shared_ptr protocol(
-                        static_cast<ProtocolFactory*>(new SslProtocolFactory(opts, options, broker->getTimer())));
-
-                    if (protocol->getPort()!=0 ) {
+                    TransportAcceptor::shared_ptr ta;
+                    SocketAcceptor* sa =
+                        new SocketAcceptor(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer());
+                    uint16_t port = sa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(options.port), opts.connectionBacklog,
+                                               options.multiplex ?
+                                                 boost::bind(&createServerSSLMuxSocket, options) :
+                                                 boost::bind(&createServerSSLSocket, options));
+                    if ( port!=0 ) {
+                        ta.reset(sa);
                         QPID_LOG(notice, "Listening for " <<
                                         (options.multiplex ? "SSL or TCP" : "SSL") <<
                                         " connections on TCP/TCP6 port " <<
-                                        protocol->getPort());
+                                        port);
                     }
 
-                    broker->registerProtocolFactory("ssl", protocol);
+                    TransportConnector::shared_ptr tc(
+                        new SocketConnector(opts.tcpNoDelay, options.nodict, opts.maxNegotiateTime, broker->getTimer(),
+                                            &createClientSSLSocket));
+                    broker->registerTransport("ssl", ta, tc, port);
                 } catch (const std::exception& e) {
                     QPID_LOG(error, "Failed to initialise SSL plugin: " << e.what());
                 }
@@ -152,160 +142,4 @@ static struct SslPlugin : public Plugin 
     }
 } sslPlugin;
 
-namespace {
-    // Expand list of Interfaces and addresses to a list of addresses
-    std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
-        std::vector<std::string> addresses;
-        // If there are no specific interfaces listed use a single "" to listen on every interface
-        if (interfaces.empty()) {
-            addresses.push_back("");
-            return addresses;
-        }
-        for (unsigned i = 0; i < interfaces.size(); ++i) {
-            const std::string& interface = interfaces[i];
-            if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
-                // We don't have an interface of that name -
-                // Check for IPv6 ('[' ']') brackets and remove them
-                // then pass to be looked up directly
-                if (interface[0]=='[' && interface[interface.size()-1]==']') {
-                    addresses.push_back(interface.substr(1, interface.size()-2));
-                } else {
-                    addresses.push_back(interface);
-                }
-            }
-        }
-        return addresses;
-    }
-}
-
-SslProtocolFactory::SslProtocolFactory(const qpid::broker::Broker::Options& opts, const SslServerOptions& options,
-                                       Timer& timer) :
-    brokerTimer(timer),
-    maxNegotiateTime(opts.maxNegotiateTime),
-    tcpNoDelay(opts.tcpNoDelay),
-    nodict(options.nodict)
-{
-    std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
-    if (addresses.empty()) {
-        // We specified some interfaces, but couldn't find addresses for them
-        QPID_LOG(warning, "SSL: No specified network interfaces found: Not Listening");
-        listeningPort = 0;
-    }
-
-    for (unsigned i = 0; i<addresses.size(); ++i) {
-        QPID_LOG(debug, "Using interface: " << addresses[i]);
-        SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(options.port));
-
-        // We must have at least one resolved address
-        QPID_LOG(info, "Listening to: " << sa.asString())
-        Socket* s = options.multiplex ?
-            new SslMuxSocket(options.certName, options.clientAuth) :
-            new SslSocket(options.certName, options.clientAuth);
-        uint16_t lport = s->listen(sa, opts.connectionBacklog);
-        QPID_LOG(debug, "Listened to: " << lport);
-        listeners.push_back(s);
-
-        listeningPort = lport;
-
-        // Try any other resolved addresses
-        while (sa.nextAddress()) {
-            // Hack to ensure that all listening connections are on the same port
-            sa.setAddrInfoPort(listeningPort);
-            QPID_LOG(info, "Listening to: " << sa.asString())
-            Socket* s = options.multiplex ?
-                new SslMuxSocket(options.certName, options.clientAuth) :
-                new SslSocket(options.certName, options.clientAuth);
-            uint16_t lport = s->listen(sa, opts.connectionBacklog);
-            QPID_LOG(debug, "Listened to: " << lport);
-            listeners.push_back(s);
-        }
-    }
-}
-
-void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
-                                          ConnectionCodec::Factory* f) {
-    AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
-    establishedCommon(async, poller, s);
-}
-
-void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
-                                             ConnectionCodec::Factory* f, const std::string& name) {
-    AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
-    establishedCommon(async, poller, s);
-}
-
-void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
-    if (tcpNoDelay) {
-        s.setTcpNoDelay();
-        QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
-    }
-
-    AsynchIO* aio = AsynchIO::create(
-        s,
-        boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-        boost::bind(&AsynchIOHandler::eof, async, _1),
-        boost::bind(&AsynchIOHandler::disconnect, async, _1),
-        boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-        boost::bind(&AsynchIOHandler::idle, async, _1));
-
-    async->init(aio, brokerTimer, maxNegotiateTime);
-    aio->start(poller);
-}
-
-uint16_t SslProtocolFactory::getPort() const {
-    return listeningPort; // Immutable no need for lock.
-}
-
-void SslProtocolFactory::accept(Poller::shared_ptr poller,
-                                ConnectionCodec::Factory* fact) {
-    for (unsigned i = 0; i<listeners.size(); ++i) {
-        acceptors.push_back(
-            AsynchAcceptor::create(listeners[i],
-                            boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
-        acceptors[i].start(poller);
-    }
-}
-
-void SslProtocolFactory::connectFailed(
-    const Socket& s, int ec, const std::string& emsg,
-    ConnectFailedCallback failedCb)
-{
-    failedCb(ec, emsg);
-    s.close();
-    delete &s;
-}
-
-void SslProtocolFactory::connect(
-    Poller::shared_ptr poller,
-    const std::string& name,
-    const std::string& host, const std::string& port,
-    ConnectionCodec::Factory* fact,
-    ConnectFailedCallback failed)
-{
-    // Note that the following logic does not cause a memory leak.
-    // The allocated Socket is freed either by the SslConnector
-    // upon connection failure or by the SslIoHandle upon connection
-    // shutdown.  The allocated SslConnector frees itself when it
-    // is no longer needed.
-
-    Socket* socket = new qpid::sys::ssl::SslSocket();
-    try {
-    AsynchConnector* c = AsynchConnector::create(
-        *socket,
-        host,
-        port,
-        boost::bind(&SslProtocolFactory::establishedOutgoing,
-                    this, poller, _1, fact, name),
-        boost::bind(&SslProtocolFactory::connectFailed,
-                    this, _1, _2, _3, failed));
-    c->start(poller);
-    } catch (std::exception&) {
-        // TODO: Design question - should we do the error callback and also throw?
-        int errCode = socket->getError();
-        connectFailed(*socket, errCode, strError(errCode), failed);
-        throw;
-    }
-}
-
 }} // namespace qpid::sys

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Mar  1 00:21:12 2013
@@ -19,52 +19,18 @@
  *
  */
 
-#include "qpid/sys/ProtocolFactory.h"
+#include "qpid/sys/TransportFactory.h"
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/NameGenerator.h"
 #include "qpid/log/Statement.h"
-#include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/Socket.h"
-#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/Poller.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
+#include "qpid/sys/SocketTransport.h"
 
 namespace qpid {
 namespace sys {
 
-class Timer;
-
-class AsynchIOProtocolFactory : public ProtocolFactory {
-    boost::ptr_vector<Socket> listeners;
-    boost::ptr_vector<AsynchAcceptor> acceptors;
-    Timer& brokerTimer;
-    uint32_t maxNegotiateTime;
-    uint16_t listeningPort;
-    const bool tcpNoDelay;
-
-  public:
-    AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen);
-    void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const std::string& name,
-                 const std::string& host, const std::string& port,
-                 ConnectionCodec::Factory*,
-                 ConnectFailedCallback);
-
-    uint16_t getPort() const;
-
-  private:
-    void establishedIncoming(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
-    void establishedOutgoing(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, const std::string&);
-    void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const Socket&);
-    void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
-};
-
 static bool sslMultiplexEnabled(void)
 {
     Options o;
@@ -93,170 +59,22 @@ static class TCPIOPlugin : public Plugin
             // Check for SSL on the same port
             bool shouldListen = !sslMultiplexEnabled();
 
-            ProtocolFactory::shared_ptr protocolt(
-                new AsynchIOProtocolFactory(opts, broker->getTimer(),shouldListen));
-
-            if (shouldListen && protocolt->getPort()!=0 ) {
-                QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
-            }
-
-            broker->registerProtocolFactory("tcp", protocolt);
-        }
-    }
-} tcpPlugin;
-
-namespace {
-    // Expand list of Interfaces and addresses to a list of addresses
-    std::vector<std::string> expandInterfaces(const std::vector<std::string>& interfaces) {
-        std::vector<std::string> addresses;
-        // If there are no specific interfaces listed use a single "" to listen on every interface
-        if (interfaces.empty()) {
-            addresses.push_back("");
-            return addresses;
-        }
-        for (unsigned i = 0; i < interfaces.size(); ++i) {
-            const std::string& interface = interfaces[i];
-            if (!(SystemInfo::getInterfaceAddresses(interface, addresses))) {
-                // We don't have an interface of that name -
-                // Check for IPv6 ('[' ']') brackets and remove them
-                // then pass to be looked up directly
-                if (interface[0]=='[' && interface[interface.size()-1]==']') {
-                    addresses.push_back(interface.substr(1, interface.size()-2));
-                } else {
-                    addresses.push_back(interface);
+            uint16_t port = opts.port;
+            TransportAcceptor::shared_ptr ta;
+            if (shouldListen) {
+                SocketAcceptor* aa = new SocketAcceptor(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer());
+                ta.reset(aa);
+                port = aa->listen(opts.listenInterfaces, boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, &createSocket);
+                if ( port!=0 ) {
+                    QPID_LOG(notice, "Listening on TCP/TCP6 port " << port);
                 }
             }
-        }
-        return addresses;
-    }
-}
-
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& timer, bool shouldListen) :
-    brokerTimer(timer),
-    maxNegotiateTime(opts.maxNegotiateTime),
-    tcpNoDelay(opts.tcpNoDelay)
-{
-    if (!shouldListen) {
-        listeningPort = boost::lexical_cast<uint16_t>(opts.port);
-        return;
-    }
 
-    std::vector<std::string> addresses = expandInterfaces(opts.listenInterfaces);
-    if (addresses.empty()) {
-        // We specified some interfaces, but couldn't find addresses for them
-        QPID_LOG(warning, "TCP/TCP6: No specified network interfaces found: Not Listening");
-        listeningPort = 0;
-    }
+            TransportConnector::shared_ptr tc(new SocketConnector(opts.tcpNoDelay, false, opts.maxNegotiateTime, broker->getTimer(), &createSocket));
 
-    for (unsigned i = 0; i<addresses.size(); ++i) {
-        QPID_LOG(debug, "Using interface: " << addresses[i]);
-        SocketAddress sa(addresses[i], boost::lexical_cast<std::string>(opts.port));
-
-        // We must have at least one resolved address
-        QPID_LOG(info, "Listening to: " << sa.asString())
-        Socket* s = createSocket();
-        uint16_t lport = s->listen(sa, opts.connectionBacklog);
-        QPID_LOG(debug, "Listened to: " << lport);
-        listeners.push_back(s);
-
-        listeningPort = lport;
-
-        // Try any other resolved addresses
-        while (sa.nextAddress()) {
-            // Hack to ensure that all listening connections are on the same port
-            sa.setAddrInfoPort(listeningPort);
-            QPID_LOG(info, "Listening to: " << sa.asString())
-            Socket* s = createSocket();
-            uint16_t lport = s->listen(sa, opts.connectionBacklog);
-            QPID_LOG(debug, "Listened to: " << lport);
-            listeners.push_back(s);
+            broker->registerTransport("tcp", ta, tc, port);
         }
     }
-}
-
-void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const Socket& s,
-                                          ConnectionCodec::Factory* f) {
-    AsynchIOHandler* async = new AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
-    establishedCommon(async, poller, s);
-}
-
-void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const Socket& s,
-                                              ConnectionCodec::Factory* f, const std::string& name) {
-    AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
-    establishedCommon(async, poller, s);
-}
-
-void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, Poller::shared_ptr poller, const Socket& s) {
-    if (tcpNoDelay) {
-        s.setTcpNoDelay();
-        QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
-    }
-
-    AsynchIO* aio = AsynchIO::create
-      (s,
-       boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
-       boost::bind(&AsynchIOHandler::eof, async, _1),
-       boost::bind(&AsynchIOHandler::disconnect, async, _1),
-       boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
-       boost::bind(&AsynchIOHandler::nobuffs, async, _1),
-       boost::bind(&AsynchIOHandler::idle, async, _1));
-
-    async->init(aio, brokerTimer, maxNegotiateTime);
-    aio->start(poller);
-}
-
-uint16_t AsynchIOProtocolFactory::getPort() const {
-    return listeningPort; // Immutable no need for lock.
-}
-
-void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
-                                     ConnectionCodec::Factory* fact) {
-    for (unsigned i = 0; i<listeners.size(); ++i) {
-        acceptors.push_back(
-            AsynchAcceptor::create(listeners[i],
-                            boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, fact)));
-        acceptors[i].start(poller);
-    }
-}
-
-void AsynchIOProtocolFactory::connectFailed(
-    const Socket& s, int ec, const std::string& emsg,
-    ConnectFailedCallback failedCb)
-{
-    failedCb(ec, emsg);
-    s.close();
-    delete &s;
-}
-
-void AsynchIOProtocolFactory::connect(
-    Poller::shared_ptr poller,
-    const std::string& name,
-    const std::string& host, const std::string& port,
-    ConnectionCodec::Factory* fact,
-    ConnectFailedCallback failed)
-{
-    // Note that the following logic does not cause a memory leak.
-    // The allocated Socket is freed either by the AsynchConnector
-    // upon connection failure or by the AsynchIO upon connection
-    // shutdown.  The allocated AsynchConnector frees itself when it
-    // is no longer needed.
-    Socket* socket = createSocket();
-    try {
-    AsynchConnector* c = AsynchConnector::create(
-        *socket,
-        host,
-        port,
-        boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
-                    this, poller, _1, fact, name),
-        boost::bind(&AsynchIOProtocolFactory::connectFailed,
-                    this, _1, _2, _3, failed));
-    c->start(poller);
-    } catch (std::exception&) {
-        // TODO: Design question - should we do the error callback and also throw?
-        int errCode = socket->getError();
-        connectFailed(*socket, errCode, strError(errCode), failed);
-        throw;
-    }
-}
+} tcpPlugin;
 
 }} // namespace qpid::sys

Copied: qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h (from r1451398, qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h?p2=qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h&p1=qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h&r1=1451398&r2=1451443&rev=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TransportFactory.h Fri Mar  1 00:21:12 2013
@@ -1,5 +1,5 @@
-#ifndef _sys_ProtocolFactory_h
-#define _sys_ProtocolFactory_h
+#ifndef QPID_SYS_TRANSPORTFACTORY_H
+#define QPID_SYS_TRANSPORTFACTORY_H
 
 /*
  *
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,24 +22,34 @@
  *
  */
 
-#include "qpid/sys/IntegerTypes.h"
 #include "qpid/SharedObject.h"
 #include "qpid/sys/ConnectionCodec.h"
+#include <string>
 #include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace sys {
 
+class AsynchAcceptor;
 class Poller;
+class Timer;
 
-class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
+class TransportAcceptor : public qpid::SharedObject<TransportAcceptor>
 {
   public:
+    virtual ~TransportAcceptor() = 0;
+    virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
+};
+
+inline TransportAcceptor::~TransportAcceptor() {}
+
+class TransportConnector : public qpid::SharedObject<TransportConnector>
+{
+public:
     typedef boost::function2<void, int, std::string> ConnectFailedCallback;
 
-    virtual ~ProtocolFactory() = 0;
-    virtual uint16_t getPort() const = 0;
-    virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
+    virtual ~TransportConnector() = 0;
     virtual void connect(
         boost::shared_ptr<Poller>,
         const std::string& name,
@@ -48,10 +58,8 @@ class ProtocolFactory : public qpid::Sha
         ConnectFailedCallback failed) = 0;
 };
 
-inline ProtocolFactory::~ProtocolFactory() {}
+inline TransportConnector::~TransportConnector() {}
 
 }}
 
-
-    
-#endif  //!_sys_ProtocolFactory_h
+#endif

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp?rev=1451443&r1=1451442&r2=1451443&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/WinSocket.cpp Fri Mar  1 00:21:12 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org