You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [14/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Oct 21 14:42:12 2011
@@ -25,31 +25,31 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
 #include "qpid/sys/Poller.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/log/Statement.h"
 
 #include <boost/bind.hpp>
-#include <memory>
+#include <boost/ptr_container/ptr_vector.hpp>
 
 namespace qpid {
 namespace sys {
 
 class AsynchIOProtocolFactory : public ProtocolFactory {
     const bool tcpNoDelay;
-    Socket listener;
-    const uint16_t listeningPort;
-    std::auto_ptr<AsynchAcceptor> acceptor;
+    boost::ptr_vector<Socket> listeners;
+    boost::ptr_vector<AsynchAcceptor> acceptors;
+    uint16_t listeningPort;
 
   public:
-    AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay);
+    AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+    void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
                  ConnectionCodec::Factory*,
                  ConnectFailedCallback);
 
     uint16_t getPort() const;
-    std::string getHost() const;
 
   private:
     void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
@@ -57,27 +57,78 @@ class AsynchIOProtocolFactory : public P
     void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
 };
 
+static bool sslMultiplexEnabled(void)
+{
+    Options o;
+    Plugin::addOptions(o);
+
+    if (o.find_nothrow("ssl-multiplex", false)) {
+        // This option is added by the SSL plugin when the SSL port
+        // is configured to be the same as the main port.
+        QPID_LOG(notice, "SSL multiplexing enabled");
+        return true;
+    }
+    return false;
+}
+
 // Static instance to initialise plugin
 static class TCPIOPlugin : public Plugin {
     void earlyInitialize(Target&) {
     }
-    
+
     void initialize(Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         // Only provide to a Broker
         if (broker) {
             const broker::Broker::Options& opts = broker->getOptions();
-            ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog, 
-                                                                             opts.tcpNoDelay));
-            QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
-            broker->registerProtocolFactory("tcp", protocol);
+
+            // Check for SSL on the same port
+            bool shouldListen = !sslMultiplexEnabled();
+
+            ProtocolFactory::shared_ptr protocolt(
+                new AsynchIOProtocolFactory(
+                    "", boost::lexical_cast<std::string>(opts.port),
+                    opts.connectionBacklog,
+                    opts.tcpNoDelay,
+                    shouldListen));
+            if (shouldListen) {
+                QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
+            }
+            broker->registerProtocolFactory("tcp", protocolt);
         }
     }
 } tcpPlugin;
 
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) :
-    tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog))
-{}
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) :
+    tcpNoDelay(nodelay)
+{
+    if (!shouldListen) {
+        return;
+    }
+
+    SocketAddress sa(host, port);
+
+    // We must have at least one resolved address
+    QPID_LOG(info, "Listening to: " << sa.asString())
+    Socket* s = new Socket;
+    uint16_t lport = s->listen(sa, backlog);
+    QPID_LOG(debug, "Listened to: " << lport);
+    listeners.push_back(s);
+
+    listeningPort = lport;
+
+    // Try any other resolved addresses
+    while (sa.nextAddress()) {
+        // Hack to ensure that all listening connections are on the same port
+        sa.setAddrInfoPort(listeningPort);
+        QPID_LOG(info, "Listening to: " << sa.asString())
+        Socket* s = new Socket;
+        uint16_t lport = s->listen(sa, backlog);
+        QPID_LOG(debug, "Listened to: " << lport);
+        listeners.push_back(s);
+    }
+
+}
 
 void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
                                           ConnectionCodec::Factory* f, bool isClient) {
@@ -107,16 +158,14 @@ uint16_t AsynchIOProtocolFactory::getPor
     return listeningPort; // Immutable no need for lock.
 }
 
-std::string AsynchIOProtocolFactory::getHost() const {
-    return listener.getSockname();
-}
-
 void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
                                      ConnectionCodec::Factory* fact) {
-    acceptor.reset(
-        AsynchAcceptor::create(listener,
-                           boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
-    acceptor->start(poller);
+    for (unsigned i = 0; i<listeners.size(); ++i) {
+        acceptors.push_back(
+            AsynchAcceptor::create(listeners[i],
+                            boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+        acceptors[i].start(poller);
+    }
 }
 
 void AsynchIOProtocolFactory::connectFailed(
@@ -130,7 +179,7 @@ void AsynchIOProtocolFactory::connectFai
 
 void AsynchIOProtocolFactory::connect(
     Poller::shared_ptr poller,
-    const std::string& host, int16_t port,
+    const std::string& host, const std::string& port,
     ConnectionCodec::Factory* fact,
     ConnectFailedCallback failed)
 {
@@ -139,8 +188,8 @@ void AsynchIOProtocolFactory::connect(
     // upon connection failure or by the AsynchIO upon connection
     // shutdown.  The allocated AsynchConnector frees itself when it
     // is no longer needed.
-
     Socket* socket = new Socket();
+    try {
     AsynchConnector* c = AsynchConnector::create(
         *socket,
         host,
@@ -150,6 +199,12 @@ void AsynchIOProtocolFactory::connect(
         boost::bind(&AsynchIOProtocolFactory::connectFailed,
                     this, _1, _2, _3, failed));
     c->start(poller);
+    } catch (std::exception&) {
+        // TODO: Design question - should we do the error callback and also throw?
+        int errCode = socket->getError();
+        connectFailed(*socket, errCode, strError(errCode), failed);
+        throw;
+    }
 }
 
 }} // namespace qpid::sys

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp Fri Oct 21 14:42:12 2011
@@ -75,6 +75,12 @@ void TimerTask::cancel() {
     cancelled = true;
 }
 
+void TimerTask::setFired() {
+    // Set nextFireTime to just before now, making readyToFire() true.
+    nextFireTime = AbsTime(sys::now(), Duration(-1));
+}
+
+
 Timer::Timer() :
     active(false),
     late(50 * TIME_MSEC),
@@ -131,12 +137,14 @@ void Timer::run()
                 bool warningsEnabled;
                 QPID_LOG_TEST(warning, warningsEnabled);
                 if (warningsEnabled) {
-                    if (delay > late && overrun > overran)
-                        warn.lateAndOverran(t->name, delay, overrun, Duration(start, end));
+                    if (overrun > overran) {
+                        if (delay > overran) // if delay is significant to an overrun.
+                            warn.lateAndOverran(t->name, delay, overrun, Duration(start, end));
+                        else
+                            warn.overran(t->name, overrun, Duration(start, end));
+                    }
                     else if (delay > late)
                         warn.late(t->name, delay);
-                    else if (overrun > overran)
-                        warn.overran(t->name, overrun, Duration(start, end));
                 }
                 continue;
             } else {
@@ -183,7 +191,11 @@ void Timer::stop()
 
 // Allow subclasses to override behavior when firing a task.
 void Timer::fire(boost::intrusive_ptr<TimerTask> t) {
-    t->fireTask();
+    try {
+        t->fireTask();
+    } catch (const std::exception& e) {
+        QPID_LOG(error, "Exception thrown by timer task " << t->getName() << ": " << e.what());
+    }
 }
 
 // Provided for subclasses: called when a task is droped.

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -64,6 +64,10 @@ class TimerTask : public RefCounted {
 
     std::string getName() const { return name; }
 
+    // Move the nextFireTime so readyToFire is true.
+    // Used by the cluster, where tasks are fired on cluster events, not on local time.
+    QPID_COMMON_EXTERN void setFired();
+
   protected:
     // Must be overridden with callback
     virtual void fire() = 0;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp Fri Oct 21 14:42:12 2011
@@ -56,20 +56,22 @@ void TimerWarnings::log() {
             std::string task = i->first;
             TaskStats& stats = i->second;
             if (stats.lateDelay.count)
-                QPID_LOG(warning, task << " task late "
+                QPID_LOG(info, task << " task late "
                          << stats.lateDelay.count << " times by "
                          << stats.lateDelay.average()/TIME_MSEC << "ms on average.");
+
             if (stats.overranOverrun.count)
-                QPID_LOG(warning, task << " task overran "
+                QPID_LOG(info, task << " task overran "
                          << stats.overranOverrun.count << " times by "
                          << stats.overranOverrun.average()/TIME_MSEC << "ms (taking "
                          << stats.overranTime.average() << "ns) on average.");
 
-            if (stats.lateAndOverranDelay.count)
-                QPID_LOG(warning, task << " task overran "
-                         << stats.overranOverrun.count << " times by "
-                         << stats.overranOverrun.average()/TIME_MSEC << "ms (taking "
-                         << stats.overranTime.average() << "ns) on average.");
+            if (stats.lateAndOverranOverrun.count)
+                QPID_LOG(info, task << " task late and overran "
+                         << stats.lateAndOverranOverrun.count << " times: late "
+                         << stats.lateAndOverranDelay.average()/TIME_MSEC << "ms, overran "
+                         << stats.lateAndOverranOverrun.average()/TIME_MSEC << "ms (taking "
+                         << stats.lateAndOverranTime.average() << "ns) on average.");
 
         }
         nextReport = AbsTime(now(), interval);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h Fri Oct 21 14:42:12 2011
@@ -21,19 +21,22 @@
  *
  */
 
-#if (defined(_WINDOWS) || defined (WIN32)) && defined(_MSC_VER)
-#include <malloc.h>
-#ifdef alloc
-#  undef alloc
-#endif
-#define alloc _alloc
-#ifdef alloca
-#  undef alloca
-#endif
-#define alloca _alloca
+#if (defined(_WINDOWS) || defined (WIN32))
+#  include <malloc.h>
+
+#  if defined(_MSC_VER)
+#    ifdef alloc
+#      undef alloc
+#    endif
+#    define alloc _alloc
+#    ifdef alloca
+#      undef alloca
+#    endif
+#    define alloca _alloca
+#  endif
 #endif
 #if !defined _WINDOWS && !defined WIN32
-#include <alloca.h>
+#  include <alloca.h>
 #endif
 
 #endif  /*!QPID_SYS_ALLOCA_H*/

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp Fri Oct 21 14:42:12 2011
@@ -57,6 +57,7 @@ size_t CyrusSecurityLayer::decode(const 
             copied += count;
             decodeBuffer.position += count;
             size_t decodedSize = codec->decode(decodeBuffer.data, decodeBuffer.position);
+            if (decodedSize == 0) break;
             if (decodedSize < decodeBuffer.position) {
                 ::memmove(decodeBuffer.data, decodeBuffer.data + decodedSize, decodeBuffer.position - decodedSize);
             }
@@ -106,7 +107,7 @@ size_t CyrusSecurityLayer::encode(const 
 
 bool CyrusSecurityLayer::canEncode()
 {
-    return encrypted || codec->canEncode();
+    return codec && (encrypted || codec->canEncode());
 }
 
 void CyrusSecurityLayer::init(qpid::sys::Codec* c)

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp Fri Oct 21 14:42:12 2011
@@ -384,7 +384,12 @@ void PollerPrivate::resetMode(PollerHand
         epe.data.u64 = 0; // Keep valgrind happy
         epe.data.ptr = &eh;
 
-        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
+        int rc = ::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe);
+        // If something has closed the fd in the meantime try adding it back
+        if (rc ==-1 && errno == ENOENT) {
+            rc = ::epoll_ctl(epollFd, EPOLL_CTL_ADD, eh.fd(), &epe);
+        }
+        QPID_POSIX_CHECK(rc);
 
         eh.setActive();
         return;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Oct 21 14:42:12 2011
@@ -149,11 +149,12 @@ private:
     ConnectedCallback connCallback;
     FailedCallback failCallback;
     const Socket& socket;
+    SocketAddress sa;
 
 public:
     AsynchConnector(const Socket& socket,
-                    std::string hostname,
-                    uint16_t port,
+                    const std::string& hostname,
+                    const std::string& port,
                     ConnectedCallback connCb,
                     FailedCallback failCb);
     void start(Poller::shared_ptr poller);
@@ -161,8 +162,8 @@ public:
 };
 
 AsynchConnector::AsynchConnector(const Socket& s,
-                                 std::string hostname,
-                                 uint16_t port,
+                                 const std::string& hostname,
+                                 const std::string& port,
                                  ConnectedCallback connCb,
                                  FailedCallback failCb) :
     DispatchHandle(s,
@@ -171,11 +172,13 @@ AsynchConnector::AsynchConnector(const S
                    boost::bind(&AsynchConnector::connComplete, this, _1)),
     connCallback(connCb),
     failCallback(failCb),
-    socket(s)
+    socket(s),
+    sa(hostname, port)
 {
     socket.setNonblocking();
-    SocketAddress sa(hostname, boost::lexical_cast<std::string>(port));
+
     // Note, not catching any exceptions here, also has effect of destructing
+    QPID_LOG(info, "Connecting: " << sa.asString());
     socket.connect(sa);
 }
 
@@ -191,11 +194,26 @@ void AsynchConnector::stop()
 
 void AsynchConnector::connComplete(DispatchHandle& h)
 {
-    h.stopWatch();
     int errCode = socket.getError();
     if (errCode == 0) {
+        h.stopWatch();
         connCallback(socket);
     } else {
+        // Retry while we cause an immediate exception
+        // (asynch failure will be handled by re-entering here at the top)
+        while (sa.nextAddress()) {
+            try {
+                // Try next address without deleting ourselves
+                QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode));
+                QPID_LOG(info, "Retrying connect: " << sa.asString());
+                socket.connect(sa);
+                return;
+            } catch (const std::exception& e) {
+                QPID_LOG(debug, "Ignored socket connect exception: " << e.what());
+            }
+            errCode = socket.getError();
+        }
+        h.stopWatch();
         failCallback(socket, errCode, strError(errCode));
     }
     DispatchHandle::doDelete();
@@ -589,8 +607,8 @@ AsynchAcceptor* AsynchAcceptor::create(c
 }
 
 AsynchConnector* AsynchConnector::create(const Socket& s,
-                                         std::string hostname,
-                                         uint16_t port,
+                                         const std::string& hostname,
+                                         const std::string& port,
                                          ConnectedCallback connCb,
                                          FailedCallback failCb)
 {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/LockFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/LockFile.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/LockFile.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/LockFile.cpp Fri Oct 21 14:42:12 2011
@@ -58,8 +58,7 @@ LockFile::~LockFile() {
     if (impl) {
         int f = impl->fd;
         if (f >= 0) {
-            int unused_ret;
-            unused_ret = ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value.
+            (void) ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value.
             ::close(f);
             impl->fd = -1;
         }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Socket.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Socket.cpp Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,65 +34,35 @@
 #include <netdb.h>
 #include <cstdlib>
 #include <string.h>
-#include <iostream>
-
-#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
 
 namespace qpid {
 namespace sys {
 
 namespace {
-std::string getName(int fd, bool local, bool includeService = false)
+std::string getName(int fd, bool local)
 {
-    ::sockaddr_storage name; // big enough for any socket address    
-    ::socklen_t namelen = sizeof(name);
-    
-    int result = -1;
+    ::sockaddr_storage name_s; // big enough for any socket address
+    ::sockaddr* name = (::sockaddr*)&name_s;
+    ::socklen_t namelen = sizeof(name_s);
+
     if (local) {
-        result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
+        QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
     } else {
-        result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
+        QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) );
     }
 
-    QPID_POSIX_CHECK(result);
-
-    char servName[NI_MAXSERV];
-    char dispName[NI_MAXHOST];
-    if (includeService) {
-        if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 
-                                 servName, sizeof(servName), 
-                                 NI_NUMERICHOST | NI_NUMERICSERV) != 0)
-            throw QPID_POSIX_ERROR(rc);
-        return std::string(dispName) + ":" + std::string(servName);
-
-    } else {
-        if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
-            throw QPID_POSIX_ERROR(rc);
-        return dispName;
-    }
+    return SocketAddress::asString(name, namelen);
 }
 
-std::string getService(int fd, bool local)
+uint16_t getLocalPort(int fd)
 {
-    ::sockaddr_storage name; // big enough for any socket address    
-    ::socklen_t namelen = sizeof(name);
-    
-    int result = -1;
-    if (local) {
-        result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
-    } else {
-        result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
-    }
+    ::sockaddr_storage name_s; // big enough for any socket address
+    ::sockaddr* name = (::sockaddr*)&name_s;
+    ::socklen_t namelen = sizeof(name_s);
 
-    QPID_POSIX_CHECK(result);
+    QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
 
-    char servName[NI_MAXSERV];
-    if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, 
-                                 servName, sizeof(servName), 
-                                 NI_NUMERICHOST | NI_NUMERICSERV) != 0)
-        throw QPID_POSIX_ERROR(rc);
-    return servName;
+    return SocketAddress::getPort(name);
 }
 }
 
@@ -119,6 +89,11 @@ void Socket::createSocket(const SocketAd
     try {
         if (nonblocking) setNonblocking();
         if (nodelay) setTcpNoDelay();
+        if (getAddrInfo(sa).ai_family == AF_INET6) {
+            int flag = 1;
+            int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag));
+            QPID_POSIX_CHECK(result);
+        }
     } catch (std::exception&) {
         ::close(s);
         socket = -1;
@@ -126,13 +101,18 @@ void Socket::createSocket(const SocketAd
     }
 }
 
-void Socket::setTimeout(const Duration& interval) const
-{
-    const int& socket = impl->fd;
-    struct timeval tv;
-    toTimeval(tv, interval);
-    setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
-    setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+Socket* Socket::createSameTypeSocket() const {
+    int& socket = impl->fd;
+    // Socket currently has no actual socket attached
+    if (socket == -1)
+        return new Socket;
+
+    ::sockaddr_storage sa;
+    ::socklen_t salen = sizeof(sa);
+    QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+    int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+    if (s < 0) throw QPID_POSIX_ERROR(errno);
+    return new Socket(new IOHandlePrivate(s));
 }
 
 void Socket::setNonblocking() const {
@@ -149,20 +129,27 @@ void Socket::setTcpNoDelay() const
     nodelay = true;
     if (socket != -1) {
         int flag = 1;
-        int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+        int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
         QPID_POSIX_CHECK(result);
     }
 }
 
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, const std::string& port) const
 {
-    SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+    SocketAddress sa(host, port);
     connect(sa);
 }
 
 void Socket::connect(const SocketAddress& addr) const
 {
-    connectname = addr.asString();
+    // The display name for an outbound connection needs to be the name that was specified
+    // for the address rather than a resolved IP address as we don't know which of
+    // the IP addresses is actually the one that will be connected to.
+    peername = addr.asString(false);
+
+    // However the string we compare with the local port must be numeric or it might not
+    // match when it should as getLocalAddress() will always be numeric
+    std::string connectname = addr.asString();
 
     createSocket(addr);
 
@@ -170,7 +157,24 @@ void Socket::connect(const SocketAddress
     // TODO the correct thing to do here is loop on failure until you've used all the returned addresses
     if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
         (errno != EINPROGRESS)) {
-        throw Exception(QPID_MSG(strError(errno) << ": " << connectname));
+        throw Exception(QPID_MSG(strError(errno) << ": " << peername));
+    }
+    // When connecting to a port on the same host which no longer has
+    // a process associated with it, the OS occasionally chooses the
+    // remote port (which is unoccupied) as the port to bind the local
+    // end of the socket, resulting in a "circular" connection.
+    //
+    // This seems like something the OS should prevent but I have
+    // confirmed that sporadic hangs in
+    // cluster_tests.LongTests.test_failover on RHEL5 are caused by
+    // such a circular connection.
+    //
+    // Raise an error if we see such a connection, since we know there is
+    // no listener on the peer address.
+    //
+    if (getLocalAddress() == connectname) {
+        close();
+        throw Exception(QPID_MSG("Connection refused: " << peername));
     }
 }
 
@@ -183,9 +187,9 @@ Socket::close() const
     socket = -1;
 }
 
-int Socket::listen(uint16_t port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
 {
-    SocketAddress sa("", boost::lexical_cast<std::string>(port));
+    SocketAddress sa(host, port);
     return listen(sa, backlog);
 }
 
@@ -195,26 +199,24 @@ int Socket::listen(const SocketAddress& 
 
     const int& socket = impl->fd;
     int yes=1;
-    QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
+    QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
 
     if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
         throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
     if (::listen(socket, backlog) < 0)
         throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
 
-    struct sockaddr_in name;
-    socklen_t namelen = sizeof(name);
-    if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
-        throw QPID_POSIX_ERROR(errno);
-
-    return ntohs(name.sin_port);
+    return getLocalPort(socket);
 }
 
 Socket* Socket::accept() const
 {
     int afd = ::accept(impl->fd, 0, 0);
-    if ( afd >= 0)
-        return new Socket(new IOHandlePrivate(afd));
+    if ( afd >= 0) {
+        Socket* s = new Socket(new IOHandlePrivate(afd));
+        s->localname = localname;
+        return s;
+    }
     else if (errno == EAGAIN)
         return 0;
     else throw QPID_POSIX_ERROR(errno);
@@ -230,37 +232,20 @@ int Socket::write(const void *buf, size_
     return ::write(impl->fd, buf, count);
 }
 
-std::string Socket::getSockname() const
-{
-    return getName(impl->fd, true);
-}
-
-std::string Socket::getPeername() const
-{
-    return getName(impl->fd, false);
-}
-
 std::string Socket::getPeerAddress() const
 {
-    if (connectname.empty()) {
-        connectname = getName(impl->fd, false, true);
+    if (peername.empty()) {
+        peername = getName(impl->fd, false);
     }
-    return connectname;
+    return peername;
 }
 
 std::string Socket::getLocalAddress() const
 {
-    return getName(impl->fd, true, true);
-}
-
-uint16_t Socket::getLocalPort() const
-{
-    return std::atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t Socket::getRemotePort() const
-{
-    return std::atoi(getService(impl->fd, true).c_str());
+    if (localname.empty()) {
+        localname = getName(impl->fd, true);
+    }
+    return localname;
 }
 
 int Socket::getError() const

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/SocketAddress.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/SocketAddress.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/SocketAddress.cpp Fri Oct 21 14:42:12 2011
@@ -21,11 +21,13 @@
 
 #include "qpid/sys/SocketAddress.h"
 
-#include "qpid/sys/posix/check.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
 
 #include <sys/socket.h>
-#include <string.h>
+#include <netinet/in.h>
 #include <netdb.h>
+#include <string.h>
 
 namespace qpid {
 namespace sys {
@@ -46,15 +48,9 @@ SocketAddress::SocketAddress(const Socke
 
 SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
 {
-    if (&sa != this) {
-        host = sa.host;
-        port = sa.port;
-
-        if (addrInfo) {
-            ::freeaddrinfo(addrInfo);
-            addrInfo = 0;
-        }
-    }
+    SocketAddress temp(sa);
+
+    std::swap(temp, *this);
     return *this;
 }
 
@@ -65,9 +61,61 @@ SocketAddress::~SocketAddress()
     }
 }
 
-std::string SocketAddress::asString() const
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen)
+{
+    char servName[NI_MAXSERV];
+    char dispName[NI_MAXHOST];
+    if (int rc=::getnameinfo(addr, addrlen,
+        dispName, sizeof(dispName),
+                             servName, sizeof(servName),
+                             NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+        throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+    std::string s;
+    switch (addr->sa_family) {
+        case AF_INET: s += dispName; break;
+        case AF_INET6: s += "["; s += dispName; s+= "]"; break;
+        default: throw Exception(QPID_MSG("Unexpected socket type"));
+    }
+    s += ":";
+    s += servName;
+    return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+    switch (addr->sa_family) {
+        case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+        case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+        default:throw Exception(QPID_MSG("Unexpected socket type"));
+    }
+}
+
+std::string SocketAddress::asString(bool numeric) const
 {
-    return host + ":" + port;
+    if (!numeric)
+        return host + ":" + port;
+    // Canonicalise into numeric id
+    const ::addrinfo& ai = getAddrInfo(*this);
+
+    return asString(ai.ai_addr, ai.ai_addrlen);
+}
+
+bool SocketAddress::nextAddress() {
+    bool r = currentAddrInfo->ai_next != 0;
+    if (r)
+        currentAddrInfo = currentAddrInfo->ai_next;
+    return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+    if (!currentAddrInfo) return;
+
+    ::addrinfo& ai = *currentAddrInfo;
+    switch (ai.ai_family) {
+    case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+    case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+    default: throw Exception(QPID_MSG("Unexpected socket type"));
+    }
 }
 
 const ::addrinfo& getAddrInfo(const SocketAddress& sa)
@@ -75,7 +123,8 @@ const ::addrinfo& getAddrInfo(const Sock
     if (!sa.addrInfo) {
         ::addrinfo hints;
         ::memset(&hints, 0, sizeof(hints));
-        hints.ai_family = AF_INET; // Change this to support IPv6
+        hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+        hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
         hints.ai_socktype = SOCK_STREAM;
 
         const char* node = 0;
@@ -88,10 +137,11 @@ const ::addrinfo& getAddrInfo(const Sock
 
         int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
         if (n != 0)
-            throw Exception(QPID_MSG("Cannot resolve " << sa.host << ": " << ::gai_strerror(n)));
+            throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+        sa.currentAddrInfo = sa.addrInfo;
     }
 
-    return *sa.addrInfo;
+    return *sa.currentAddrInfo;
 }
 
 }}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Thread.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Thread.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Thread.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Thread.cpp Fri Oct 21 14:42:12 2011
@@ -37,7 +37,8 @@ void* runRunnable(void* p)
 }
 }
 
-struct ThreadPrivate {
+class ThreadPrivate {
+public:
     pthread_t thread;
 
     ThreadPrivate(Runnable* runnable) {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Time.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Time.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Time.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Time.cpp Fri Oct 21 14:42:12 2011
@@ -27,6 +27,7 @@
 #include <stdio.h>
 #include <sys/time.h>
 #include <unistd.h>
+#include <iomanip>
 
 namespace {
 int64_t max_abstime() { return std::numeric_limits<int64_t>::max(); }
@@ -103,6 +104,12 @@ void outputFormattedNow(std::ostream& o)
     o << " ";
 }
 
+void outputHiresNow(std::ostream& o) {
+    ::timespec time;
+    ::clock_gettime(CLOCK_REALTIME, &time);
+    o << time.tv_sec << "." << std::setw(9) << std::setfill('0') << time.tv_nsec << "s ";
+}
+
 void sleep(int secs) {
     ::sleep(secs);
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/RdmaIO.cpp Fri Oct 21 14:42:12 2011
@@ -140,8 +140,8 @@ namespace Rdma {
         // Prepost recv buffers before we go any further
         qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize);
 
-        // Create xmit buffers
-        qp->createSendBuffers(xmitBufferCount, bufferSize+FrameHeaderSize);
+        // Create xmit buffers, reserve space for frame header.
+        qp->createSendBuffers(xmitBufferCount, bufferSize, FrameHeaderSize);
     }
 
     AsynchIO::~AsynchIO() {
@@ -210,12 +210,14 @@ namespace Rdma {
             }
             break;
         case 1:
-            Buffer* ob = buff ? buff : getSendBuffer();
+            if (!buff)
+                buff = getSendBuffer();
             // Add FrameHeader after frame data
             FrameHeader header(credit);
-            ::memcpy(ob->bytes()+ob->dataCount(), &header, FrameHeaderSize);
-            ob->dataCount(ob->dataCount()+FrameHeaderSize);
-            qp->postSend(ob);
+            assert(buff->dataCount() <= buff->byteCount());   // ensure app data doesn't impinge on reserved space.
+            ::memcpy(buff->bytes()+buff->dataCount(), &header, FrameHeaderSize);
+            buff->dataCount(buff->dataCount()+FrameHeaderSize);
+            qp->postSend(buff);
             break;
         }
     }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.cpp Fri Oct 21 14:42:12 2011
@@ -50,8 +50,9 @@ namespace Rdma {
         return count;
     }
 
-    Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) :
-        bufferSize(byteCount)
+    Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount,
+                   const int32_t reserve) :
+        bufferSize(byteCount + reserve), reserved(reserve)
     {
         sge.addr = (uintptr_t) bytes;
         sge.length = 0;
@@ -163,21 +164,21 @@ namespace Rdma {
     }
 
     // Create buffers to use for writing
-    void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize)
+    void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved)
     {
         assert(!smr);
 
         // Round up buffersize to cacheline (64 bytes)
-        bufferSize = (bufferSize+63) & (~63);
+        int dataLength = (bufferSize+reserved+63) & (~63);
 
         // Allocate memory block for all receive buffers
-        char* mem = new char [sendBufferCount * bufferSize];
-        smr = regMr(pd.get(), mem, sendBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE);
+        char* mem = new char [sendBufferCount * dataLength];
+        smr = regMr(pd.get(), mem, sendBufferCount * dataLength, ::IBV_ACCESS_LOCAL_WRITE);
         sendBuffers.reserve(sendBufferCount);
         freeBuffers.reserve(sendBufferCount);
         for (int i = 0; i<sendBufferCount; ++i) {
             // Allocate xmit buffer
-            sendBuffers.push_back(Buffer(smr->lkey, &mem[i*bufferSize], bufferSize));
+            sendBuffers.push_back(Buffer(smr->lkey, &mem[i*dataLength], bufferSize, reserved));
             freeBuffers.push_back(i);
         }
     }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.h Fri Oct 21 14:42:12 2011
@@ -57,8 +57,9 @@ namespace Rdma {
         void dataCount(int32_t);
 
     private:
-        Buffer(uint32_t lkey, char* bytes, const int32_t byteCount);
+        Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, const int32_t reserve=0);
         int32_t bufferSize;
+        int32_t reserved;   // for framing header
         ::ibv_sge sge;
     };
 
@@ -66,8 +67,9 @@ namespace Rdma {
       return (char*) sge.addr;
     }
 
+    /** return the number of bytes available for application data */
     inline int32_t Buffer::byteCount() const {
-        return bufferSize;
+        return bufferSize - reserved;
     }
 
     inline int32_t Buffer::dataCount() const {
@@ -75,6 +77,8 @@ namespace Rdma {
     }
 
     inline void Buffer::dataCount(int32_t s) {
+        // catch any attempt to overflow a buffer
+        assert(s <= bufferSize + reserved);
         sge.length = s;
     }
 
@@ -136,7 +140,7 @@ namespace Rdma {
         typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
 
         // Create a buffers to use for writing
-        void createSendBuffers(int sendBufferCount, int bufferSize);
+        void createSendBuffers(int sendBufferCount, int dataSize, int headerSize);
 
         // Get a send buffer
         Buffer* getSendBuffer();

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslHandler.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslHandler.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslHandler.h Fri Oct 21 14:42:12 2011
@@ -35,7 +35,7 @@ namespace sys {
 namespace ssl {
 
 class SslIO;
-class SslIOBufferBase;
+struct SslIOBufferBase;
 class SslSocket;
 
 class SslHandler : public OutputControl {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.cpp Fri Oct 21 14:42:12 2011
@@ -68,29 +68,33 @@ __thread int64_t threadMaxReadTimeNs = 2
  * Asynch Acceptor
  */
 
-SslAcceptor::SslAcceptor(const SslSocket& s, Callback callback) :
+template <class T>
+SslAcceptorTmpl<T>::SslAcceptorTmpl(const T& s, Callback callback) :
     acceptedCallback(callback),
-    handle(s, boost::bind(&SslAcceptor::readable, this, _1), 0, 0),
+    handle(s, boost::bind(&SslAcceptorTmpl<T>::readable, this, _1), 0, 0),
     socket(s) {
 
     s.setNonblocking();
     ignoreSigpipe();
 }
 
-SslAcceptor::~SslAcceptor() 
+template <class T>
+SslAcceptorTmpl<T>::~SslAcceptorTmpl()
 {
     handle.stopWatch();
 }
 
-void SslAcceptor::start(Poller::shared_ptr poller) {
+template <class T>
+void SslAcceptorTmpl<T>::start(Poller::shared_ptr poller) {
     handle.startWatch(poller);
 }
 
 /*
  * We keep on accepting as long as there is something to accept
  */
-void SslAcceptor::readable(DispatchHandle& h) {
-    SslSocket* s;
+template <class T>
+void SslAcceptorTmpl<T>::readable(DispatchHandle& h) {
+    Socket* s;
     do {
         errno = 0;
         // TODO: Currently we ignore the peers address, perhaps we should
@@ -110,6 +114,10 @@ void SslAcceptor::readable(DispatchHandl
     h.rewatch();
 }
 
+// Explicitly instantiate the templates we need
+template class SslAcceptorTmpl<SslSocket>;
+template class SslAcceptorTmpl<SslMuxSocket>;
+
 /*
  * Asynch Connector
  */
@@ -117,7 +125,7 @@ void SslAcceptor::readable(DispatchHandl
 SslConnector::SslConnector(const SslSocket& s,
                                  Poller::shared_ptr poller,
                                  std::string hostname,
-                                 uint16_t port,
+                                 std::string port,
                                  ConnectedCallback connCb,
                                  FailedCallback failCb) :
     DispatchHandle(s,

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.h Fri Oct 21 14:42:12 2011
@@ -29,26 +29,30 @@
 
 namespace qpid {
 namespace sys {
+
+class Socket;
+
 namespace ssl {
-    
+
 class SslSocket;
 
 /*
  * Asynchronous ssl acceptor: accepts connections then does a callback
  * with the accepted fd
  */
-class SslAcceptor {
+template <class T>
+class SslAcceptorTmpl {
 public:
-    typedef boost::function1<void, const SslSocket&> Callback;
+    typedef boost::function1<void, const Socket&> Callback;
 
 private:
     Callback acceptedCallback;
     qpid::sys::DispatchHandle handle;
-    const SslSocket& socket;
+    const T& socket;
 
 public:
-    SslAcceptor(const SslSocket& s, Callback callback);
-    ~SslAcceptor();
+    SslAcceptorTmpl(const T& s, Callback callback);
+    ~SslAcceptorTmpl();
     void start(qpid::sys::Poller::shared_ptr poller);
 
 private:
@@ -73,7 +77,7 @@ public:
     SslConnector(const SslSocket& socket,
                     Poller::shared_ptr poller,
                     std::string hostname,
-                    uint16_t port,
+                    std::string port,
                     ConnectedCallback connCb,
                     FailedCallback failCb = 0);
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.cpp Fri Oct 21 14:42:12 2011
@@ -25,11 +25,13 @@
 #include "qpid/Exception.h"
 #include "qpid/sys/posix/check.h"
 #include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
 
 #include <fcntl.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/errno.h>
+#include <poll.h>
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <netdb.h>
@@ -50,36 +52,6 @@ namespace sys {
 namespace ssl {
 
 namespace {
-std::string getName(int fd, bool local, bool includeService = false)
-{
-    ::sockaddr_storage name; // big enough for any socket address
-    ::socklen_t namelen = sizeof(name);
-
-    int result = -1;
-    if (local) {
-        result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
-    } else {
-        result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
-    }
-
-    QPID_POSIX_CHECK(result);
-
-    char servName[NI_MAXSERV];
-    char dispName[NI_MAXHOST];
-    if (includeService) {
-        if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
-                                 servName, sizeof(servName),
-                                 NI_NUMERICHOST | NI_NUMERICSERV) != 0)
-            throw QPID_POSIX_ERROR(rc);
-        return std::string(dispName) + ":" + std::string(servName);
-
-    } else {
-        if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
-            throw QPID_POSIX_ERROR(rc);
-        return dispName;
-    }
-}
-
 std::string getService(int fd, bool local)
 {
     ::sockaddr_storage name; // big enough for any socket address
@@ -132,7 +104,7 @@ std::string getDomainFromSubject(std::st
 
 }
 
-SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0)
+SslSocket::SslSocket() : socket(0), prototype(0)
 {
     impl->fd = ::socket (PF_INET, SOCK_STREAM, 0);
     if (impl->fd < 0) throw QPID_POSIX_ERROR(errno);
@@ -144,7 +116,7 @@ SslSocket::SslSocket() : IOHandle(new IO
  * returned from accept. Because we use posix accept rather than
  * PR_Accept, we have to reset the handshake.
  */
-SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : IOHandle(ioph), socket(0), prototype(0)
+SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), socket(0), prototype(0)
 {
     socket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd));
     NSS_CHECK(SSL_ResetHandshake(socket, true));
@@ -158,7 +130,7 @@ void SslSocket::setNonblocking() const
     PR_SetSocketOption(socket, &option);
 }
 
-void SslSocket::connect(const std::string& host, uint16_t port) const
+void SslSocket::connect(const std::string& host, const std::string& port) const
 {
     std::stringstream namestream;
     namestream << host << ":" << port;
@@ -180,7 +152,7 @@ void SslSocket::connect(const std::strin
     PRHostEnt hostEntry;
     PR_CHECK(PR_GetHostByName(host.data(), hostBuffer, PR_NETDB_BUF_SIZE, &hostEntry));
     PRNetAddr address;
-    int value = PR_EnumerateHostEnt(0, &hostEntry, port, &address);
+    int value = PR_EnumerateHostEnt(0, &hostEntry, boost::lexical_cast<PRUint16>(port), &address);
     if (value < 0) {
         throw Exception(QPID_MSG("Error getting address for host: " << ErrorString()));
     } else if (value == 0) {
@@ -238,6 +210,7 @@ int SslSocket::listen(uint16_t port, int
 
 SslSocket* SslSocket::accept() const
 {
+    QPID_LOG(trace, "Accepting SSL connection.");
     int afd = ::accept(impl->fd, 0, 0);
     if ( afd >= 0) {
         return new SslSocket(new IOHandlePrivate(afd), prototype);
@@ -248,36 +221,109 @@ SslSocket* SslSocket::accept() const
     }
 }
 
-int SslSocket::read(void *buf, size_t count) const
-{
-    return PR_Read(socket, buf, count);
-}
+#define SSL_STREAM_MAX_WAIT_ms 20
+#define SSL_STREAM_MAX_RETRIES 2
 
-int SslSocket::write(const void *buf, size_t count) const
-{
-    return PR_Write(socket, buf, count);
-}
+static bool isSslStream(int afd) {
+    int retries = SSL_STREAM_MAX_RETRIES;
+    unsigned char buf[5] = {};
 
-std::string SslSocket::getSockname() const
-{
-    return getName(impl->fd, true);
+    do {
+        struct pollfd fd = {afd, POLLIN, 0};
+
+        /*
+         * Note that this is blocking the accept thread, so connections that
+         * send no data can limit the rate at which we can accept new
+         * connections.
+         */
+        if (::poll(&fd, 1, SSL_STREAM_MAX_WAIT_ms) > 0) {
+            errno = 0;
+            int result = recv(afd, buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT);
+            if (result == sizeof(buf)) {
+                break;
+            }
+            if (errno && errno != EAGAIN) {
+                int err = errno;
+                ::close(afd);
+                throw QPID_POSIX_ERROR(err);
+            }
+        }
+    } while (retries-- > 0);
+
+    if (retries < 0) {
+        return false;
+    }
+
+    /*
+     * SSLv2 Client Hello format
+     * http://www.mozilla.org/projects/security/pki/nss/ssl/draft02.html
+     *
+     * Bytes 0-1: RECORD-LENGTH
+     * Byte    2: MSG-CLIENT-HELLO (1)
+     * Byte    3: CLIENT-VERSION-MSB
+     * Byte    4: CLIENT-VERSION-LSB
+     *
+     * Allowed versions:
+     * 2.0 - SSLv2
+     * 3.0 - SSLv3
+     * 3.1 - TLS 1.0
+     * 3.2 - TLS 1.1
+     * 3.3 - TLS 1.2
+     *
+     * The version sent in the Client-Hello is the latest version supported by
+     * the client. NSS may send version 3.x in an SSLv2 header for
+     * maximum compatibility.
+     */
+    bool isSSL2Handshake = buf[2] == 1 &&   // MSG-CLIENT-HELLO
+        ((buf[3] == 3 && buf[4] <= 3) ||    // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3)
+         (buf[3] == 2 && buf[4] == 0));     // SSL 2
+
+    /*
+     * SSLv3/TLS Client Hello format
+     * RFC 2246
+     *
+     * Byte    0: ContentType (handshake - 22)
+     * Bytes 1-2: ProtocolVersion {major, minor}
+     *
+     * Allowed versions:
+     * 3.0 - SSLv3
+     * 3.1 - TLS 1.0
+     * 3.2 - TLS 1.1
+     * 3.3 - TLS 1.2
+     */
+    bool isSSL3Handshake = buf[0] == 22 &&  // handshake
+        (buf[1] == 3 && buf[2] <= 3);       // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3)
+
+    return isSSL2Handshake || isSSL3Handshake;
 }
 
-std::string SslSocket::getPeername() const
+Socket* SslMuxSocket::accept() const
 {
-    return getName(impl->fd, false);
+    int afd = ::accept(impl->fd, 0, 0);
+    if (afd >= 0) {
+        QPID_LOG(trace, "Accepting connection with optional SSL wrapper.");
+        if (isSslStream(afd)) {
+            QPID_LOG(trace, "Accepted SSL connection.");
+            return new SslSocket(new IOHandlePrivate(afd), prototype);
+        } else {
+            QPID_LOG(trace, "Accepted Plaintext connection.");
+            return new Socket(new IOHandlePrivate(afd));
+        }
+    } else if (errno == EAGAIN) {
+        return 0;
+    } else {
+        throw QPID_POSIX_ERROR(errno);
+    }
 }
 
-std::string SslSocket::getPeerAddress() const
+int SslSocket::read(void *buf, size_t count) const
 {
-    if (!connectname.empty())
-        return connectname;
-    return getName(impl->fd, false, true);
+    return PR_Read(socket, buf, count);
 }
 
-std::string SslSocket::getLocalAddress() const
+int SslSocket::write(const void *buf, size_t count) const
 {
-    return getName(impl->fd, true, true);
+    return PR_Write(socket, buf, count);
 }
 
 uint16_t SslSocket::getLocalPort() const
@@ -290,17 +336,6 @@ uint16_t SslSocket::getRemotePort() cons
     return atoi(getService(impl->fd, true).c_str());
 }
 
-int SslSocket::getError() const
-{
-    int       result;
-    socklen_t rSize = sizeof (result);
-
-    if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0)
-        throw QPID_POSIX_ERROR(errno);
-
-    return result;
-}
-
 void SslSocket::setTcpNoDelay(bool nodelay) const
 {
     if (nodelay) {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.h Fri Oct 21 14:42:12 2011
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Socket.h"
 #include <nspr.h>
 
 #include <string>
@@ -36,7 +37,7 @@ class Duration;
 
 namespace ssl {
 
-class SslSocket : public qpid::sys::IOHandle
+class SslSocket : public qpid::sys::Socket
 {
 public:
     /** Create a socket wrapper for descriptor. */
@@ -53,7 +54,7 @@ public:
      * NSSInit().*/
     void setCertName(const std::string& certName);
 
-    void connect(const std::string& host, uint16_t port) const;
+    void connect(const std::string& host, const std::string& port) const;
 
     void close() const;
 
@@ -75,45 +76,13 @@ public:
     int read(void *buf, size_t count) const;
     int write(const void *buf, size_t count) const;
 
-    /** Returns the "socket name" ie the address bound to
-     * the near end of the socket
-     */
-    std::string getSockname() const;
-
-    /** Returns the "peer name" ie the address bound to
-     * the remote end of the socket
-     */
-    std::string getPeername() const;
-
-    /**
-     * Returns an address (host and port) for the remote end of the
-     * socket
-     */
-    std::string getPeerAddress() const;
-    /**
-     * Returns an address (host and port) for the local end of the
-     * socket
-     */
-    std::string getLocalAddress() const;
-
-    /**
-     * Returns the full address of the connection: local and remote host and port.
-     */
-    std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
-
     uint16_t getLocalPort() const;
     uint16_t getRemotePort() const;
 
-    /**
-     * Returns the error code stored in the socket.  This may be used
-     * to determine the result of a non-blocking connect.
-     */
-    int getError() const;
-
     int getKeyLen() const;
     std::string getClientAuthId() const;
 
-private:
+protected:
     mutable std::string connectname;
     mutable PRFileDesc* socket;
     std::string certname;
@@ -126,6 +95,13 @@ private:
     mutable PRFileDesc* prototype;
 
     SslSocket(IOHandlePrivate* ioph, PRFileDesc* model);
+    friend class SslMuxSocket;
+};
+
+class SslMuxSocket : public SslSocket
+{
+public:
+    Socket* accept() const;
 };
 
 }}}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIO.cpp Fri Oct 21 14:42:12 2011
@@ -30,6 +30,7 @@
 #include "qpid/log/Statement.h"
 
 #include "qpid/sys/windows/check.h"
+#include "qpid/sys/windows/mingw32_compat.h"
 
 #include <boost/thread/once.hpp>
 
@@ -46,16 +47,13 @@ namespace {
 
 /*
  * The function pointers for AcceptEx and ConnectEx need to be looked up
- * at run time. Make sure this is done only once.
+ * at run time.
  */
-boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
-LPFN_ACCEPTEX fnAcceptEx = 0;
-typedef void (*lookUpFunc)(const qpid::sys::Socket &);
-
-void lookUpAcceptEx() {
-    SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
+    SOCKET h = toSocketHandle(s);
     GUID guidAcceptEx = WSAID_ACCEPTEX;
     DWORD dwBytes = 0;
+    LPFN_ACCEPTEX fnAcceptEx;
     WSAIoctl(h,
              SIO_GET_EXTENSION_FUNCTION_POINTER,
              &guidAcceptEx,
@@ -65,9 +63,9 @@ void lookUpAcceptEx() {
              &dwBytes,
              NULL,
              NULL);
-    closesocket(h);
     if (fnAcceptEx == 0)
         throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+    return fnAcceptEx;
 }
 
 }
@@ -94,18 +92,15 @@ private:
 
     AsynchAcceptor::Callback acceptedCallback;
     const Socket& socket;
+    const LPFN_ACCEPTEX fnAcceptEx;
 };
 
 AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
   : acceptedCallback(callback),
-    socket(s) {
+    socket(s),
+    fnAcceptEx(lookUpAcceptEx(s)) {
 
     s.setNonblocking();
-#if (BOOST_VERSION >= 103500)   /* boost 1.35 or later reversed the args */
-    boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
-#else
-    boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
-#endif
 }
 
 AsynchAcceptor::~AsynchAcceptor()
@@ -114,7 +109,8 @@ AsynchAcceptor::~AsynchAcceptor()
 }
 
 void AsynchAcceptor::start(Poller::shared_ptr poller) {
-    poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+    PollerHandle ph = PollerHandle(socket);
+    poller->monitorHandle(ph, Poller::INPUT);
     restart ();
 }
 
@@ -122,25 +118,26 @@ void AsynchAcceptor::restart(void) {
     DWORD bytesReceived = 0;  // Not used, needed for AcceptEx API
     AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
                                                         this,
-                                                        toSocketHandle(socket));
+                                                        socket);
     BOOL status;
-    status = ::fnAcceptEx(toSocketHandle(socket),
-                          toSocketHandle(*result->newSocket),
-                          result->addressBuffer,
-                          0,
-                          AsynchAcceptResult::SOCKADDRMAXLEN,
-                          AsynchAcceptResult::SOCKADDRMAXLEN,
-                          &bytesReceived,
-                          result->overlapped());
+    status = fnAcceptEx(toSocketHandle(socket),
+                        toSocketHandle(*result->newSocket),
+                        result->addressBuffer,
+                        0,
+                        AsynchAcceptResult::SOCKADDRMAXLEN,
+                        AsynchAcceptResult::SOCKADDRMAXLEN,
+                        &bytesReceived,
+                        result->overlapped());
     QPID_WINDOWS_CHECK_ASYNC_START(status);
 }
 
 
 AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
                                        AsynchAcceptor *acceptor,
-                                       SOCKET listener)
-  : callback(cb), acceptor(acceptor), listener(listener) {
-    newSocket.reset (new Socket());
+                                       const Socket& listener)
+  : callback(cb), acceptor(acceptor),
+    listener(toSocketHandle(listener)),
+    newSocket(listener.createSameTypeSocket()) {
 }
 
 void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
@@ -154,7 +151,7 @@ void AsynchAcceptResult::success(size_t 
     delete this;
 }
 
-void AsynchAcceptResult::failure(int status) {
+void AsynchAcceptResult::failure(int /*status*/) {
     //if (status != WSA_OPERATION_ABORTED)
     // Can there be anything else?  ;
     delete this;
@@ -173,20 +170,20 @@ private:
     FailedCallback failCallback;
     const Socket& socket;
     const std::string hostname;
-    const uint16_t port;
+    const std::string port;
 
 public:
     AsynchConnector(const Socket& socket,
-                    std::string hostname,
-                    uint16_t port,
+                    const std::string& hostname,
+                    const std::string& port,
                     ConnectedCallback connCb,
                     FailedCallback failCb = 0);
     void start(Poller::shared_ptr poller);
 };
 
 AsynchConnector::AsynchConnector(const Socket& sock,
-                                 std::string hname,
-                                 uint16_t p,
+                                 const std::string& hname,
+                                 const std::string& p,
                                  ConnectedCallback connCb,
                                  FailedCallback failCb) :
     connCallback(connCb), failCallback(failCb), socket(sock),
@@ -216,8 +213,8 @@ AsynchAcceptor* AsynchAcceptor::create(c
 }
 
 AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
-                                                    std::string hostname,
-                                                    uint16_t port,
+                                                    const std::string& hostname,
+                                                    const std::string& port,
                                                     ConnectedCallback connCb,
                                                     FailedCallback failCb)
 {
@@ -410,8 +407,9 @@ void AsynchIO::queueForDeletion() {
 }
 
 void AsynchIO::start(Poller::shared_ptr poller0) {
+    PollerHandle ph = PollerHandle(socket);
     poller = poller0;
-    poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+    poller->monitorHandle(ph, Poller::INPUT);
     if (writeQueue.size() > 0)  // Already have data queued for write
         notifyPendingWrite();
     startReading();
@@ -584,7 +582,6 @@ void AsynchIO::notifyIdle(void) {
 void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
     writeInProgress = true;
     InterlockedIncrement(&opsInProgress);
-    int writeCount = buff->byteCount-buff->dataCount;
     AsynchWriteResult *result =
         new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
                               buff,

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIoResult.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIoResult.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIoResult.h Fri Oct 21 14:42:12 2011
@@ -83,22 +83,22 @@ class AsynchAcceptResult : public Asynch
 public:
     AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb,
                        AsynchAcceptor *acceptor,
-                       SOCKET listener);
+                       const qpid::sys::Socket& listener);
     virtual void success (size_t bytesTransferred);
     virtual void failure (int error);
 
 private:
     virtual void complete(void) {}  // No-op for this class.
 
-    std::auto_ptr<qpid::sys::Socket> newSocket;
     qpid::sys::AsynchAcceptor::Callback callback;
     AsynchAcceptor *acceptor;
     SOCKET listener;
+    std::auto_ptr<qpid::sys::Socket> newSocket;
 
     // AcceptEx needs a place to write the local and remote addresses
     // when accepting the connection. Place those here; get enough for
     // IPv6 addresses, even if the socket is IPv4.
-    enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16,
+    enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16,
            SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN };
     char addressBuffer[SOCKADDRBUFLEN];
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/IocpPoller.cpp Fri Oct 21 14:42:12 2011
@@ -152,9 +152,9 @@ void Poller::monitorHandle(PollerHandle&
 }
 
 // All no-ops...
-void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {}
-void Poller::registerHandle(PollerHandle& handle) {}
-void Poller::unregisterHandle(PollerHandle& handle) {}
+void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {}
+void Poller::registerHandle(PollerHandle& /*handle*/) {}
+void Poller::unregisterHandle(PollerHandle& /*handle*/) {}
 
 Poller::Event Poller::wait(Duration timeout) {
     DWORD timeoutMs = 0;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Shlib.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Shlib.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Shlib.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Shlib.cpp Fri Oct 21 14:42:12 2011
@@ -44,7 +44,8 @@ void  Shlib::unload() {
 }
 
 void*  Shlib::getSymbol(const char* name) {
-    void* sym = GetProcAddress(static_cast<HMODULE>(handle), name);
+    // Double cast avoids warning about casting function pointer to object
+    void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name)));
     if (sym == NULL)
         throw QPID_WINDOWS_ERROR(GetLastError());
     return sym;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp Fri Oct 21 14:42:12 2011
@@ -20,19 +20,18 @@
  */
 
 #include "qpid/sys/Socket.h"
+
 #include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/windows/IoHandlePrivate.h"
 #include "qpid/sys/windows/check.h"
-#include "qpid/sys/Time.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
 
-#include <cstdlib>
-#include <string.h>
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
 
 #include <winsock2.h>
 
-#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
-
 // Need to initialize WinSock. Ideally, this would be a singleton or embedded
 // in some one-time initialization function. I tried boost singleton and could
 // not get it to compile (and others located in google had the same problem).
@@ -84,53 +83,30 @@ namespace sys {
 
 namespace {
 
-std::string getName(SOCKET fd, bool local, bool includeService = false)
+std::string getName(SOCKET fd, bool local)
 {
-    sockaddr_in name; // big enough for any socket address    
-    socklen_t namelen = sizeof(name);
+    ::sockaddr_storage name_s; // big enough for any socket address
+    ::sockaddr* name = (::sockaddr*)&name_s;
+    ::socklen_t namelen = sizeof(name_s);
+
     if (local) {
-        QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
+        QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
     } else {
-        QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
+        QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen));
     }
 
-    char servName[NI_MAXSERV];
-    char dispName[NI_MAXHOST];
-    if (includeService) {
-        if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
-                                   dispName, sizeof(dispName), 
-                                   servName, sizeof(servName), 
-                                   NI_NUMERICHOST | NI_NUMERICSERV) != 0)
-            throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
-        return std::string(dispName) + ":" + std::string(servName);
-    } else {
-        if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
-                                   dispName, sizeof(dispName),
-                                   0, 0,
-                                   NI_NUMERICHOST) != 0)
-            throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
-        return dispName;
-    }
+    return SocketAddress::asString(name, namelen);
 }
 
-std::string getService(SOCKET fd, bool local)
+uint16_t getLocalPort(int fd)
 {
-    sockaddr_in name; // big enough for any socket address    
-    socklen_t namelen = sizeof(name);
-    
-    if (local) {
-        QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
-    } else {
-        QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
-    }
+    ::sockaddr_storage name_s; // big enough for any socket address
+    ::sockaddr* name = (::sockaddr*)&name_s;
+    ::socklen_t namelen = sizeof(name_s);
+
+    QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
 
-    char servName[NI_MAXSERV];
-    if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
-                               0, 0, 
-                               servName, sizeof(servName), 
-                               NI_NUMERICHOST | NI_NUMERICSERV) != 0)
-        throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
-    return servName;
+    return SocketAddress::getPort(name);
 }
 }  // namespace
 
@@ -138,13 +114,7 @@ Socket::Socket() :
     IOHandle(new IOHandlePrivate),
     nonblocking(false),
     nodelay(false)
-{
-    SOCKET& socket = impl->fd;
-    if (socket != INVALID_SOCKET) Socket::close();
-    SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
-    if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
-    socket = s;
-}
+{}
 
 Socket::Socket(IOHandlePrivate* h) :
     IOHandle(h),
@@ -152,8 +122,7 @@ Socket::Socket(IOHandlePrivate* h) :
     nodelay(false)
 {}
 
-void
-Socket::createSocket(const SocketAddress& sa) const
+void Socket::createSocket(const SocketAddress& sa) const
 {
     SOCKET& socket = impl->fd;
     if (socket != INVALID_SOCKET) Socket::close();
@@ -168,24 +137,24 @@ Socket::createSocket(const SocketAddress
         if (nonblocking) setNonblocking();
         if (nodelay) setTcpNoDelay();
     } catch (std::exception&) {
-        closesocket(s);
+        ::closesocket(s);
         socket = INVALID_SOCKET;
         throw;
     }
 }
 
-void Socket::setTimeout(const Duration& interval) const
-{
-    const SOCKET& socket = impl->fd;
-    int64_t nanosecs = interval;
-    nanosecs /= (1000 * 1000); // nsecs -> usec -> msec
-    int msec = 0;
-    if (nanosecs > std::numeric_limits<int>::max())
-        msec = std::numeric_limits<int>::max();
-    else
-        msec = static_cast<int>(nanosecs);
-    setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec));
-    setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec));
+Socket* Socket::createSameTypeSocket() const {
+    SOCKET& socket = impl->fd;
+    // Socket currently has no actual socket attached
+    if (socket == INVALID_SOCKET)
+        return new Socket;
+
+    ::sockaddr_storage sa;
+    ::socklen_t salen = sizeof(sa);
+    QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+    SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+    if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+    return new Socket(new IOHandlePrivate(s));
 }
 
 void Socket::setNonblocking() const {
@@ -193,30 +162,25 @@ void Socket::setNonblocking() const {
     QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
 }
 
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, const std::string& port) const
 {
-    SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+    SocketAddress sa(host, port);
     connect(sa);
 }
 
 void
 Socket::connect(const SocketAddress& addr) const
 {
+    peername = addr.asString(false);
+
+    createSocket(addr);
+
     const SOCKET& socket = impl->fd;
-    const addrinfo *addrs = &(getAddrInfo(addr));
-    int error = 0;
+    int err;
     WSASetLastError(0);
-    while (addrs != 0) {
-        if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) ||
-            (WSAGetLastError() == WSAEWOULDBLOCK))
-            break;
-        // Error... save this error code and see if there are other address
-        // to try before throwing the exception.
-        error = WSAGetLastError();
-        addrs = addrs->ai_next;
-    }
-    if (error)
-        throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname));
+    if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) &&
+        ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK))
+        throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername));
 }
 
 void
@@ -247,24 +211,26 @@ int Socket::read(void *buf, size_t count
     return received;
 }
 
-int Socket::listen(uint16_t port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
+{
+    SocketAddress sa(host, port);
+    return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& addr, int backlog) const
 {
+    createSocket(addr);
+
     const SOCKET& socket = impl->fd;
     BOOL yes=1;
     QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
-    struct sockaddr_in name;
-    memset(&name, 0, sizeof(name));
-    name.sin_family = AF_INET;
-    name.sin_port = htons(port);
-    name.sin_addr.s_addr = 0;
-    if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
-        throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+
+    if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR)
+        throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError())));
     if (::listen(socket, backlog) == SOCKET_ERROR)
-        throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
-    
-    socklen_t namelen = sizeof(name);
-    QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
-    return ntohs(name.sin_port);
+        throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError())));
+
+    return getLocalPort(socket);
 }
 
 Socket* Socket::accept() const
@@ -277,36 +243,20 @@ Socket* Socket::accept() const
     else throw QPID_WINDOWS_ERROR(WSAGetLastError());
 }
 
-std::string Socket::getSockname() const
-{
-    return getName(impl->fd, true);
-}
-
-std::string Socket::getPeername() const
-{
-    return getName(impl->fd, false);
-}
-
 std::string Socket::getPeerAddress() const
 {
-    if (!connectname.empty())
-        return std::string (connectname);
-    return getName(impl->fd, false, true);
+    if (peername.empty()) {
+        peername = getName(impl->fd, false);
+    }
+    return peername;
 }
 
 std::string Socket::getLocalAddress() const
 {
-    return getName(impl->fd, true, true);
-}
-
-uint16_t Socket::getLocalPort() const
-{
-    return atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t Socket::getRemotePort() const
-{
-    return atoi(getService(impl->fd, true).c_str());
+    if (localname.empty()) {
+        localname = getName(impl->fd, true);
+    }
+    return localname;
 }
 
 int Socket::getError() const

Propchange: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp
            ('svn:executable' removed)

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SocketAddress.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SocketAddress.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SocketAddress.cpp Fri Oct 21 14:42:12 2011
@@ -21,7 +21,13 @@
 
 #include "qpid/sys/SocketAddress.h"
 
-#include "qpid/sys/windows/check.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
 
 #include <winsock2.h>
 #include <ws2tcpip.h>
@@ -35,37 +41,111 @@ SocketAddress::SocketAddress(const std::
     port(port0),
     addrInfo(0)
 {
-    ::addrinfo hints;
-    ::memset(&hints, 0, sizeof(hints));
-    hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
-    hints.ai_socktype = SOCK_STREAM;
-
-    const char* node = 0;
-    if (host.empty()) {
-        hints.ai_flags |= AI_PASSIVE;
-    } else {
-        node = host.c_str();
-    }
-    const char* service = port.empty() ? "0" : port.c_str();
+}
+
+SocketAddress::SocketAddress(const SocketAddress& sa) :
+    host(sa.host),
+    port(sa.port),
+    addrInfo(0)
+{
+}
 
-    int n = ::getaddrinfo(node, service, &hints, &addrInfo);
-    if (n != 0)
-        throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
+{
+    SocketAddress temp(sa);
+
+    std::swap(temp, *this);
+    return *this;
 }
 
 SocketAddress::~SocketAddress()
 {
-    ::freeaddrinfo(addrInfo);
+    if (addrInfo) {
+        ::freeaddrinfo(addrInfo);
+    }
 }
 
-std::string SocketAddress::asString() const
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen)
 {
-    return host + ":" + port;
+    char servName[NI_MAXSERV];
+    char dispName[NI_MAXHOST];
+    if (int rc=::getnameinfo(addr, addrlen,
+        dispName, sizeof(dispName),
+                             servName, sizeof(servName),
+                             NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+        throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+    std::string s;
+    switch (addr->sa_family) {
+        case AF_INET: s += dispName; break;
+        case AF_INET6: s += "["; s += dispName; s+= "]"; break;
+        default: throw Exception(QPID_MSG("Unexpected socket type"));
+    }
+    s += ":";
+    s += servName;
+    return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+    switch (addr->sa_family) {
+        case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+        case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+        default:throw Exception(QPID_MSG("Unexpected socket type"));
+    }
+}
+
+std::string SocketAddress::asString(bool numeric) const
+{
+    if (!numeric)
+        return host + ":" + port;
+    // Canonicalise into numeric id
+    const ::addrinfo& ai = getAddrInfo(*this);
+
+    return asString(ai.ai_addr, ai.ai_addrlen);
+}
+
+bool SocketAddress::nextAddress() {
+    bool r = currentAddrInfo->ai_next != 0;
+    if (r)
+        currentAddrInfo = currentAddrInfo->ai_next;
+    return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+    if (!currentAddrInfo) return;
+
+    ::addrinfo& ai = *currentAddrInfo;
+    switch (ai.ai_family) {
+    case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+    case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+    default: throw Exception(QPID_MSG("Unexpected socket type"));
+    }
 }
 
 const ::addrinfo& getAddrInfo(const SocketAddress& sa)
 {
-    return *sa.addrInfo;
+    if (!sa.addrInfo) {
+        ::addrinfo hints;
+        ::memset(&hints, 0, sizeof(hints));
+        hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+        hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+        hints.ai_socktype = SOCK_STREAM;
+
+        const char* node = 0;
+        if (sa.host.empty()) {
+            hints.ai_flags |= AI_PASSIVE;
+        } else {
+            node = sa.host.c_str();
+        }
+        const char* service = sa.port.empty() ? "0" : sa.port.c_str();
+
+        int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
+        if (n != 0)
+            throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+        sa.currentAddrInfo = sa.addrInfo;
+    }
+
+    return *sa.currentAddrInfo;
 }
 
 }}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SslAsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SslAsynchIO.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SslAsynchIO.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SslAsynchIO.h Fri Oct 21 14:42:12 2011
@@ -39,9 +39,6 @@ namespace qpid {
 namespace sys {
 namespace windows {
     
-class Socket;
-class Poller;
-
 /*
  * SSL/Schannel shim between the frame-handling and AsynchIO layers.
  * SslAsynchIO creates a regular AsynchIO object to handle I/O and this class

Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/StrError.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/StrError.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/StrError.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/StrError.cpp Fri Oct 21 14:42:12 2011
@@ -30,6 +30,7 @@ namespace sys {
 std::string strError(int err) {
     const size_t bufsize = 512;
     char buf[bufsize];
+    buf[0] = 0;
     if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK
                             | FORMAT_MESSAGE_FROM_SYSTEM,
                             0,
@@ -39,7 +40,11 @@ std::string strError(int err) {
                             bufsize,
                             0))
     {
-        strerror_s (buf, bufsize, err);
+#ifdef _MSC_VER
+        strerror_s(buf, bufsize, err);
+#else
+        return std::string(strerror(err));
+#endif
     }
     return std::string(buf);
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org