You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/10/12 07:53:28 UTC

svn commit: r824237 - in /qpid/trunk/qpid/cpp/src/qpid/sys: Socket.h epoll/EpollPoller.cpp posix/Socket.cpp windows/Socket.cpp

Author: astitcher
Date: Mon Oct 12 05:53:27 2009
New Revision: 824237

URL: http://svn.apache.org/viewvc?rev=824237&view=rev
Log:
Refactored Socket to allow for IPv6 and unix domain socket

Modified:
    qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
    qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=824237&r1=824236&r2=824237&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Mon Oct 12 05:53:27 2009
@@ -39,12 +39,9 @@
     /** Create a socket wrapper for descriptor. */
     QPID_COMMON_EXTERN Socket();
 
-    /** Create an initialized TCP socket */
-    void createTcp() const;
-    
     /** Set timeout for read and write */
     void setTimeout(const Duration& interval) const;
-    
+
     /** Set socket non blocking */
     void setNonblocking() const;
 
@@ -59,7 +56,7 @@
      *@return The bound port.
      */
     QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
-    
+
     /** Returns the "socket name" ie the address bound to 
      * the near end of the socket
      */
@@ -102,8 +99,12 @@
     QPID_COMMON_EXTERN void setTcpNoDelay(bool nodelay) const;
 
 private:
+    /** Create socket */
+    void createSocket(const SocketAddress&) const;
+
     Socket(IOHandlePrivate*);
     mutable std::string connectname;
+    mutable bool nonblocking;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=824237&r1=824236&r2=824237&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Oct 12 05:53:27 2009
@@ -60,19 +60,23 @@
         DELETED
     };
 
-    int fd;
     ::__uint32_t events;
+    const IOHandlePrivate* ioHandle;
     PollerHandle* pollerHandle;
     FDStat stat;
     Mutex lock;
 
-    PollerHandlePrivate(int f, PollerHandle* p) :
-      fd(f),
+    PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
       events(0),
+      ioHandle(h),
       pollerHandle(p),
       stat(ABSENT) {
     }
 
+    int fd() const {
+        return toFd(ioHandle);
+    }
+
     bool isActive() const {
         return stat == MONITORED || stat == MONITORED_HUNGUP;
     }
@@ -131,7 +135,7 @@
 };
 
 PollerHandle::PollerHandle(const IOHandle& h) :
-    impl(new PollerHandlePrivate(toFd(h.impl), this))
+    impl(new PollerHandlePrivate(h.impl, this))
 {}
 
 PollerHandle::~PollerHandle() {
@@ -303,7 +307,7 @@
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe));
 
     eh.setActive();
 }
@@ -313,7 +317,7 @@
     ScopedLock<Mutex> l(eh.lock);
     assert(!eh.isIdle());
 
-    int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
+    int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0);
     // Ignore EBADF since deleting a nonexistent fd has the overall required result!
     // And allows the case where a sloppy program closes the fd and then does the delFd()
     if (rc == -1 && errno != EBADF) {
@@ -344,7 +348,7 @@
         epe.data.u64 = 0; // Keep valgrind happy
         epe.data.ptr = &eh;
 
-        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
 
         eh.setActive();
         return;
@@ -382,7 +386,7 @@
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
 }
 
 void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
@@ -408,7 +412,7 @@
     epe.data.u64 = 0; // Keep valgrind happy
     epe.data.ptr = &eh;
 
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
 }
 
 void Poller::shutdown() {
@@ -443,7 +447,7 @@
         epe.events = 0;
         epe.data.u64 = 0; // Keep valgrind happy
         epe.data.ptr = &eh;
-        QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+        QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
 
         if (eh.isInactive()) {
             eh.setInterrupted();

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=824237&r1=824236&r2=824237&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Mon Oct 12 05:53:27 2009
@@ -97,22 +97,30 @@
 }
 
 Socket::Socket() :
-	IOHandle(new IOHandlePrivate)
-{
-	createTcp();
-}
+    IOHandle(new IOHandlePrivate),
+    nonblocking(false)
+{}
 
 Socket::Socket(IOHandlePrivate* h) :
-	IOHandle(h)
+    IOHandle(h),
+    nonblocking(false)
 {}
 
-void Socket::createTcp() const
+void Socket::createSocket(const SocketAddress& sa) const
 {
     int& socket = impl->fd;
     if (socket != -1) Socket::close();
-    int s = ::socket (AF_INET, SOCK_STREAM, 0);
+    int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0);
     if (s < 0) throw QPID_POSIX_ERROR(errno);
     socket = s;
+
+    try {
+        if (nonblocking) setNonblocking();
+    } catch (std::exception&) {
+        ::close(s);
+        socket = -1;
+        throw;
+    }
 }
 
 void Socket::setTimeout(const Duration& interval) const
@@ -125,7 +133,9 @@
 }
 
 void Socket::setNonblocking() const {
-    QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
+    int& socket = impl->fd;
+    if (socket != -1) QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK));
+    nonblocking = true;
 }
 
 void Socket::connect(const std::string& host, uint16_t port) const
@@ -138,8 +148,9 @@
 {
     connectname = addr.asString();
 
-    const int& socket = impl->fd;
+    createSocket(addr);
 
+    const int& socket = impl->fd;
     // TODO the correct thing to do here is loop on failure until you've used all the returned addresses
     if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
         (errno != EINPROGRESS)) {
@@ -158,11 +169,14 @@
 
 int Socket::listen(uint16_t port, int backlog) const
 {
+    SocketAddress sa("", boost::lexical_cast<std::string>(port));
+
+    createSocket(sa);
+
     const int& socket = impl->fd;
     int yes=1;
     QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
 
-    SocketAddress sa("", boost::lexical_cast<std::string>(port));
     if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
         throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
     if (::listen(socket, backlog) < 0)

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp?rev=824237&r1=824236&r2=824237&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp Mon Oct 12 05:53:27 2009
@@ -138,15 +138,6 @@
 Socket::Socket() :
 	IOHandle(new IOHandlePrivate)
 {
-	createTcp();
-}
-
-Socket::Socket(IOHandlePrivate* h) :
-	IOHandle(h)
-{}
-
-void Socket::createTcp() const
-{
     SOCKET& socket = impl->fd;
     if (socket != INVALID_SOCKET) Socket::close();
     SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
@@ -154,6 +145,10 @@
     socket = s;
 }
 
+Socket::Socket(IOHandlePrivate* h) :
+	IOHandle(h)
+{}
+
 void Socket::setTimeout(const Duration& interval) const
 {
     const SOCKET& socket = impl->fd;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org