You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2009/10/12 07:53:28 UTC
svn commit: r824237 - in /qpid/trunk/qpid/cpp/src/qpid/sys: Socket.h
epoll/EpollPoller.cpp posix/Socket.cpp windows/Socket.cpp
Author: astitcher
Date: Mon Oct 12 05:53:27 2009
New Revision: 824237
URL: http://svn.apache.org/viewvc?rev=824237&view=rev
Log:
Refactored Socket to allow for IPv6 and unix domain socket
Modified:
qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=824237&r1=824236&r2=824237&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Mon Oct 12 05:53:27 2009
@@ -39,12 +39,9 @@
/** Create a socket wrapper for descriptor. */
QPID_COMMON_EXTERN Socket();
- /** Create an initialized TCP socket */
- void createTcp() const;
-
/** Set timeout for read and write */
void setTimeout(const Duration& interval) const;
-
+
/** Set socket non blocking */
void setNonblocking() const;
@@ -59,7 +56,7 @@
*@return The bound port.
*/
QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
-
+
/** Returns the "socket name" ie the address bound to
* the near end of the socket
*/
@@ -102,8 +99,12 @@
QPID_COMMON_EXTERN void setTcpNoDelay(bool nodelay) const;
private:
+ /** Create socket */
+ void createSocket(const SocketAddress&) const;
+
Socket(IOHandlePrivate*);
mutable std::string connectname;
+ mutable bool nonblocking;
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=824237&r1=824236&r2=824237&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Oct 12 05:53:27 2009
@@ -60,19 +60,23 @@
DELETED
};
- int fd;
::__uint32_t events;
+ const IOHandlePrivate* ioHandle;
PollerHandle* pollerHandle;
FDStat stat;
Mutex lock;
- PollerHandlePrivate(int f, PollerHandle* p) :
- fd(f),
+ PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
events(0),
+ ioHandle(h),
pollerHandle(p),
stat(ABSENT) {
}
+ int fd() const {
+ return toFd(ioHandle);
+ }
+
bool isActive() const {
return stat == MONITORED || stat == MONITORED_HUNGUP;
}
@@ -131,7 +135,7 @@
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl), this))
+ impl(new PollerHandlePrivate(h.impl, this))
{}
PollerHandle::~PollerHandle() {
@@ -303,7 +307,7 @@
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe));
eh.setActive();
}
@@ -313,7 +317,7 @@
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
@@ -344,7 +348,7 @@
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
eh.setActive();
return;
@@ -382,7 +386,7 @@
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
}
void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
@@ -408,7 +412,7 @@
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
}
void Poller::shutdown() {
@@ -443,7 +447,7 @@
epe.events = 0;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
if (eh.isInactive()) {
eh.setInterrupted();
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=824237&r1=824236&r2=824237&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Mon Oct 12 05:53:27 2009
@@ -97,22 +97,30 @@
}
Socket::Socket() :
- IOHandle(new IOHandlePrivate)
-{
- createTcp();
-}
+ IOHandle(new IOHandlePrivate),
+ nonblocking(false)
+{}
Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h)
+ IOHandle(h),
+ nonblocking(false)
{}
-void Socket::createTcp() const
+void Socket::createSocket(const SocketAddress& sa) const
{
int& socket = impl->fd;
if (socket != -1) Socket::close();
- int s = ::socket (AF_INET, SOCK_STREAM, 0);
+ int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0);
if (s < 0) throw QPID_POSIX_ERROR(errno);
socket = s;
+
+ try {
+ if (nonblocking) setNonblocking();
+ } catch (std::exception&) {
+ ::close(s);
+ socket = -1;
+ throw;
+ }
}
void Socket::setTimeout(const Duration& interval) const
@@ -125,7 +133,9 @@
}
void Socket::setNonblocking() const {
- QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
+ int& socket = impl->fd;
+ if (socket != -1) QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK));
+ nonblocking = true;
}
void Socket::connect(const std::string& host, uint16_t port) const
@@ -138,8 +148,9 @@
{
connectname = addr.asString();
- const int& socket = impl->fd;
+ createSocket(addr);
+ const int& socket = impl->fd;
// TODO the correct thing to do here is loop on failure until you've used all the returned addresses
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
(errno != EINPROGRESS)) {
@@ -158,11 +169,14 @@
int Socket::listen(uint16_t port, int backlog) const
{
+ SocketAddress sa("", boost::lexical_cast<std::string>(port));
+
+ createSocket(sa);
+
const int& socket = impl->fd;
int yes=1;
QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
- SocketAddress sa("", boost::lexical_cast<std::string>(port));
if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp?rev=824237&r1=824236&r2=824237&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp Mon Oct 12 05:53:27 2009
@@ -138,15 +138,6 @@
Socket::Socket() :
IOHandle(new IOHandlePrivate)
{
- createTcp();
-}
-
-Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h)
-{}
-
-void Socket::createTcp() const
-{
SOCKET& socket = impl->fd;
if (socket != INVALID_SOCKET) Socket::close();
SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
@@ -154,6 +145,10 @@
socket = s;
}
+Socket::Socket(IOHandlePrivate* h) :
+ IOHandle(h)
+{}
+
void Socket::setTimeout(const Duration& interval) const
{
const SOCKET& socket = impl->fd;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org