You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2011/08/13 00:32:05 UTC

svn commit: r1157272 - in /qpid/trunk/qpid/cpp/src: qpid/sys/ qpid/sys/posix/ qpid/sys/windows/ tests/

Author: astitcher
Date: Fri Aug 12 22:32:05 2011
New Revision: 1157272

URL: http://svn.apache.org/viewvc?rev=1157272&view=rev
Log:
QPID-3405: IPv6 support for Unix C++ ports:
- On the Listen side we create separate listening sockets for IPv4 and IPv6
  making sure to not allow the IPv6 socket to run dual stack. This makes the
  reported IPv4 addresses look "normal" and would allow us to turn control
  IPv4/IPv6 listening separately.
- On the connect side we make sure to try all the addresses returned by
  getaddrinfo() in order until we either find one that connects or have
  tried all of them.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/SocketAddress.h
    qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp   (contents, props changed)
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SocketAddress.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SocketAddress.h?rev=1157272&r1=1157271&r2=1157272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SocketAddress.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SocketAddress.h Fri Aug 12 22:32:05 2011
@@ -41,12 +41,15 @@ public:
     QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&);
     QPID_COMMON_EXTERN ~SocketAddress();
 
-    std::string asString(bool numeric=true) const;
+    QPID_COMMON_EXTERN bool nextAddress();
+    QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
+    QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
 
 private:
     std::string host;
     std::string port;
     mutable ::addrinfo* addrInfo;
+    mutable ::addrinfo* currentAddrInfo;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1157272&r1=1157271&r2=1157272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Aug 12 22:32:05 2011
@@ -25,21 +25,22 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
 #include "qpid/sys/Poller.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/log/Statement.h"
 
 #include <boost/bind.hpp>
-#include <memory>
+#include <boost/ptr_container/ptr_vector.hpp>
 
 namespace qpid {
 namespace sys {
 
 class AsynchIOProtocolFactory : public ProtocolFactory {
     const bool tcpNoDelay;
-    Socket listener;
-    const uint16_t listeningPort;
-    std::auto_ptr<AsynchAcceptor> acceptor;
+    boost::ptr_vector<Socket> listeners;
+    boost::ptr_vector<AsynchAcceptor> acceptors;
+    uint16_t listeningPort;
 
   public:
     AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
@@ -71,15 +72,38 @@ static class TCPIOPlugin : public Plugin
                     "", boost::lexical_cast<std::string>(opts.port),
                     opts.connectionBacklog,
                     opts.tcpNoDelay));
-            QPID_LOG(notice, "Listening on TCP port " << protocolt->getPort());
+            QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
             broker->registerProtocolFactory("tcp", protocolt);
         }
     }
 } tcpPlugin;
 
 AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
-    tcpNoDelay(nodelay), listeningPort(listener.listen(host, port, backlog))
-{}
+    tcpNoDelay(nodelay)
+{
+    SocketAddress sa(host, port);
+
+    // We must have at least one resolved address
+    QPID_LOG(info, "Listening to: " << sa.asString())
+    Socket* s = new Socket;
+    uint16_t lport = s->listen(sa, backlog);
+    QPID_LOG(debug, "Listened to: " << lport);
+    listeners.push_back(s);
+
+    listeningPort = lport;
+
+    // Try any other resolved addresses
+    while (sa.nextAddress()) {
+        // Hack to ensure that all listening connections are on the same port
+        sa.setAddrInfoPort(listeningPort);
+        QPID_LOG(info, "Listening to: " << sa.asString())
+        Socket* s = new Socket;
+        uint16_t lport = s->listen(sa, backlog);
+        QPID_LOG(debug, "Listened to: " << lport);
+        listeners.push_back(s);
+    }
+
+}
 
 void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
                                           ConnectionCodec::Factory* f, bool isClient) {
@@ -111,10 +135,12 @@ uint16_t AsynchIOProtocolFactory::getPor
 
 void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
                                      ConnectionCodec::Factory* fact) {
-    acceptor.reset(
-        AsynchAcceptor::create(listener,
-                           boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
-    acceptor->start(poller);
+    for (unsigned i = 0; i<listeners.size(); ++i) {
+        acceptors.push_back(
+            AsynchAcceptor::create(listeners[i],
+                            boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+        acceptors[i].start(poller);
+    }
 }
 
 void AsynchIOProtocolFactory::connectFailed(

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1157272&r1=1157271&r2=1157272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Aug 12 22:32:05 2011
@@ -149,6 +149,7 @@ private:
     ConnectedCallback connCallback;
     FailedCallback failCallback;
     const Socket& socket;
+    SocketAddress sa;
 
 public:
     AsynchConnector(const Socket& socket,
@@ -171,11 +172,13 @@ AsynchConnector::AsynchConnector(const S
                    boost::bind(&AsynchConnector::connComplete, this, _1)),
     connCallback(connCb),
     failCallback(failCb),
-    socket(s)
+    socket(s),
+    sa(hostname, port)
 {
     socket.setNonblocking();
-    SocketAddress sa(hostname, port);
+
     // Note, not catching any exceptions here, also has effect of destructing
+    QPID_LOG(info, "Connecting: " << sa.asString());
     socket.connect(sa);
 }
 
@@ -191,11 +194,26 @@ void AsynchConnector::stop()
 
 void AsynchConnector::connComplete(DispatchHandle& h)
 {
-    h.stopWatch();
     int errCode = socket.getError();
     if (errCode == 0) {
+        h.stopWatch();
         connCallback(socket);
     } else {
+        // Retry while we cause an immediate exception
+        // (asynch failure will be handled by re-entering here at the top)
+        while (sa.nextAddress()) {
+            try {
+                // Try next address without deleting ourselves
+                QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode));
+                QPID_LOG(info, "Retrying connect: " << sa.asString());
+                socket.connect(sa);
+                return;
+            } catch (const std::exception& e) {
+                QPID_LOG(debug, "Ignored socket connect exception: " << e.what());
+            }
+            errCode = socket.getError();
+        }
+        h.stopWatch();
         failCallback(socket, errCode, strError(errCode));
     }
     DispatchHandle::doDelete();

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=1157272&r1=1157271&r2=1157272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Fri Aug 12 22:32:05 2011
@@ -63,6 +63,20 @@ std::string getName(int fd, bool local)
         throw QPID_POSIX_ERROR(rc);
     return std::string(dispName) + ":" + std::string(servName);
 }
+
+uint16_t getLocalPort(int fd)
+{
+    ::sockaddr_storage name;
+    ::socklen_t namelen = sizeof(name);
+    if (::getsockname(fd, (::sockaddr*)&name, &namelen) < 0)
+        throw QPID_POSIX_ERROR(errno);
+
+    switch (name.ss_family) {
+    case AF_INET: return ntohs(((::sockaddr_in&)name).sin_port);
+    case AF_INET6: return ntohs(((::sockaddr_in6&)name).sin6_port);
+    default:throw Exception(QPID_MSG("Unexpected socket type"));
+    }
+}
 }
 
 Socket::Socket() :
@@ -88,6 +102,11 @@ void Socket::createSocket(const SocketAd
     try {
         if (nonblocking) setNonblocking();
         if (nodelay) setTcpNoDelay();
+        if (getAddrInfo(sa).ai_family == AF_INET6) {
+            int flag = 1;
+            int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag));
+            QPID_POSIX_CHECK(result);
+        }
     } catch (std::exception&) {
         ::close(s);
         socket = -1;
@@ -109,7 +128,7 @@ void Socket::setTcpNoDelay() const
     nodelay = true;
     if (socket != -1) {
         int flag = 1;
-        int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+        int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
         QPID_POSIX_CHECK(result);
     }
 }
@@ -179,19 +198,14 @@ int Socket::listen(const SocketAddress& 
 
     const int& socket = impl->fd;
     int yes=1;
-    QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
+    QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
 
     if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
         throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
     if (::listen(socket, backlog) < 0)
         throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
 
-    struct sockaddr_in name;
-    socklen_t namelen = sizeof(name);
-    if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
-        throw QPID_POSIX_ERROR(errno);
-
-    return ntohs(name.sin_port);
+    return getLocalPort(socket);
 }
 
 Socket* Socket::accept() const

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp?rev=1157272&r1=1157271&r2=1157272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/SocketAddress.cpp Fri Aug 12 22:32:05 2011
@@ -27,8 +27,6 @@
 #include <string.h>
 #include <netdb.h>
 
-#include <algorithm>
-
 namespace qpid {
 namespace sys {
 
@@ -73,19 +71,38 @@ std::string SocketAddress::asString(bool
                             dispName, sizeof(dispName),
                             servName, sizeof(servName),
                             NI_NUMERICHOST | NI_NUMERICSERV) != 0)
-        throw QPID_POSIX_ERROR(rc);
+        throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
     std::string s(dispName);
     s += ":";
     s += servName;
     return s;
 }
 
+bool SocketAddress::nextAddress() {
+    bool r = currentAddrInfo->ai_next != 0;
+    if (r)
+        currentAddrInfo = currentAddrInfo->ai_next;
+    return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+    if (!currentAddrInfo) return;
+
+    ::addrinfo& ai = *currentAddrInfo;
+    switch (ai.ai_family) {
+    case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+    case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+    default: throw Exception(QPID_MSG("Unexpected socket type"));
+    }
+}
+
 const ::addrinfo& getAddrInfo(const SocketAddress& sa)
 {
     if (!sa.addrInfo) {
         ::addrinfo hints;
         ::memset(&hints, 0, sizeof(hints));
-        hints.ai_family = AF_INET; // Change this to support IPv6
+        hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+        hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
         hints.ai_socktype = SOCK_STREAM;
 
         const char* node = 0;
@@ -99,9 +116,10 @@ const ::addrinfo& getAddrInfo(const Sock
         int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
         if (n != 0)
             throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+        sa.currentAddrInfo = sa.addrInfo;
     }
 
-    return *sa.addrInfo;
+    return *sa.currentAddrInfo;
 }
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp?rev=1157272&r1=1157271&r2=1157272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp Fri Aug 12 22:32:05 2011
@@ -211,21 +211,24 @@ int Socket::read(void *buf, size_t count
     return received;
 }
 
-int Socket::listen(const std::string&, const std::string& port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
+{
+    SocketAddress sa(host, port);
+    return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& addr, int backlog) const
 {
     const SOCKET& socket = impl->fd;
     BOOL yes=1;
     QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
-    struct sockaddr_in name;
-    memset(&name, 0, sizeof(name));
-    name.sin_family = AF_INET;
-    name.sin_port = htons(boost::lexical_cast<uint16_t>(port));
-    name.sin_addr.s_addr = 0;
-    if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
-        throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+
+    if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR)
+        throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError())));
     if (::listen(socket, backlog) == SOCKET_ERROR)
-        throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
+        throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError())));
     
+    struct sockaddr_in name;
     socklen_t namelen = sizeof(name);
     QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
     return ntohs(name.sin_port);

Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp
            ('svn:executable' removed)

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp?rev=1157272&r1=1157271&r2=1157272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp Fri Aug 12 22:32:05 2011
@@ -68,6 +68,13 @@ std::string SocketAddress::asString(bool
     return host + ":" + port;
 }
 
+bool SocketAddress::nextAddress() {
+    bool r = currentAddrInfo->ai_next != 0;
+    if (r)
+        currentAddrInfo = currentAddrInfo->ai_next;
+    return r;
+}
+
 const ::addrinfo& getAddrInfo(const SocketAddress& sa)
 {
     return *sa.addrInfo;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py?rev=1157272&r1=1157271&r2=1157272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py Fri Aug 12 22:32:05 2011
@@ -53,6 +53,7 @@ def filter_log(log):
         'stall for update|unstall, ignore update|cancelled offer .* unstall',
         'caught up',
         'active for links|Passivating links|Activating links',
+        'info Connecting: .*', # UpdateClient connection
         'info Connection.* connected to', # UpdateClient connection
         'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection
         'warning Broker closed connection: 200, OK',



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org