You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [14/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Oct 21 14:42:12 2011
@@ -25,31 +25,31 @@
#include "qpid/Plugin.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-#include <memory>
+#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
- Socket listener;
- const uint16_t listeningPort;
- std::auto_ptr<AsynchAcceptor> acceptor;
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
+ uint16_t listeningPort;
public:
- AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay);
+ AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
- std::string getHost() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
@@ -57,27 +57,78 @@ class AsynchIOProtocolFactory : public P
void connectFailed(const Socket&, int, const std::string&, ConnectFailedCallback);
};
+static bool sslMultiplexEnabled(void)
+{
+ Options o;
+ Plugin::addOptions(o);
+
+ if (o.find_nothrow("ssl-multiplex", false)) {
+ // This option is added by the SSL plugin when the SSL port
+ // is configured to be the same as the main port.
+ QPID_LOG(notice, "SSL multiplexing enabled");
+ return true;
+ }
+ return false;
+}
+
// Static instance to initialise plugin
static class TCPIOPlugin : public Plugin {
void earlyInitialize(Target&) {
}
-
+
void initialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
- broker->registerProtocolFactory("tcp", protocol);
+
+ // Check for SSL on the same port
+ bool shouldListen = !sslMultiplexEnabled();
+
+ ProtocolFactory::shared_ptr protocolt(
+ new AsynchIOProtocolFactory(
+ "", boost::lexical_cast<std::string>(opts.port),
+ opts.connectionBacklog,
+ opts.tcpNoDelay,
+ shouldListen));
+ if (shouldListen) {
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
+ }
+ broker->registerProtocolFactory("tcp", protocolt);
}
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) :
- tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog))
-{}
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) :
+ tcpNoDelay(nodelay)
+{
+ if (!shouldListen) {
+ return;
+ }
+
+ SocketAddress sa(host, port);
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
+
+}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
@@ -107,16 +158,14 @@ uint16_t AsynchIOProtocolFactory::getPor
return listeningPort; // Immutable no need for lock.
}
-std::string AsynchIOProtocolFactory::getHost() const {
- return listener.getSockname();
-}
-
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
- acceptor.reset(
- AsynchAcceptor::create(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
- acceptor->start(poller);
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i],
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptors[i].start(poller);
+ }
}
void AsynchIOProtocolFactory::connectFailed(
@@ -130,7 +179,7 @@ void AsynchIOProtocolFactory::connectFai
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
@@ -139,8 +188,8 @@ void AsynchIOProtocolFactory::connect(
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
-
Socket* socket = new Socket();
+ try {
AsynchConnector* c = AsynchConnector::create(
*socket,
host,
@@ -150,6 +199,12 @@ void AsynchIOProtocolFactory::connect(
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
+ } catch (std::exception&) {
+ // TODO: Design question - should we do the error callback and also throw?
+ int errCode = socket->getError();
+ connectFailed(*socket, errCode, strError(errCode), failed);
+ throw;
+ }
}
}} // namespace qpid::sys
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.cpp Fri Oct 21 14:42:12 2011
@@ -75,6 +75,12 @@ void TimerTask::cancel() {
cancelled = true;
}
+void TimerTask::setFired() {
+ // Set nextFireTime to just before now, making readyToFire() true.
+ nextFireTime = AbsTime(sys::now(), Duration(-1));
+}
+
+
Timer::Timer() :
active(false),
late(50 * TIME_MSEC),
@@ -131,12 +137,14 @@ void Timer::run()
bool warningsEnabled;
QPID_LOG_TEST(warning, warningsEnabled);
if (warningsEnabled) {
- if (delay > late && overrun > overran)
- warn.lateAndOverran(t->name, delay, overrun, Duration(start, end));
+ if (overrun > overran) {
+ if (delay > overran) // if delay is significant to an overrun.
+ warn.lateAndOverran(t->name, delay, overrun, Duration(start, end));
+ else
+ warn.overran(t->name, overrun, Duration(start, end));
+ }
else if (delay > late)
warn.late(t->name, delay);
- else if (overrun > overran)
- warn.overran(t->name, overrun, Duration(start, end));
}
continue;
} else {
@@ -183,7 +191,11 @@ void Timer::stop()
// Allow subclasses to override behavior when firing a task.
void Timer::fire(boost::intrusive_ptr<TimerTask> t) {
- t->fireTask();
+ try {
+ t->fireTask();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, "Exception thrown by timer task " << t->getName() << ": " << e.what());
+ }
}
// Provided for subclasses: called when a task is droped.
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/Timer.h Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -64,6 +64,10 @@ class TimerTask : public RefCounted {
std::string getName() const { return name; }
+ // Move the nextFireTime so readyToFire is true.
+ // Used by the cluster, where tasks are fired on cluster events, not on local time.
+ QPID_COMMON_EXTERN void setFired();
+
protected:
// Must be overridden with callback
virtual void fire() = 0;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/TimerWarnings.cpp Fri Oct 21 14:42:12 2011
@@ -56,20 +56,22 @@ void TimerWarnings::log() {
std::string task = i->first;
TaskStats& stats = i->second;
if (stats.lateDelay.count)
- QPID_LOG(warning, task << " task late "
+ QPID_LOG(info, task << " task late "
<< stats.lateDelay.count << " times by "
<< stats.lateDelay.average()/TIME_MSEC << "ms on average.");
+
if (stats.overranOverrun.count)
- QPID_LOG(warning, task << " task overran "
+ QPID_LOG(info, task << " task overran "
<< stats.overranOverrun.count << " times by "
<< stats.overranOverrun.average()/TIME_MSEC << "ms (taking "
<< stats.overranTime.average() << "ns) on average.");
- if (stats.lateAndOverranDelay.count)
- QPID_LOG(warning, task << " task overran "
- << stats.overranOverrun.count << " times by "
- << stats.overranOverrun.average()/TIME_MSEC << "ms (taking "
- << stats.overranTime.average() << "ns) on average.");
+ if (stats.lateAndOverranOverrun.count)
+ QPID_LOG(info, task << " task late and overran "
+ << stats.lateAndOverranOverrun.count << " times: late "
+ << stats.lateAndOverranDelay.average()/TIME_MSEC << "ms, overran "
+ << stats.lateAndOverranOverrun.average()/TIME_MSEC << "ms (taking "
+ << stats.lateAndOverranTime.average() << "ns) on average.");
}
nextReport = AbsTime(now(), interval);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/alloca.h Fri Oct 21 14:42:12 2011
@@ -21,19 +21,22 @@
*
*/
-#if (defined(_WINDOWS) || defined (WIN32)) && defined(_MSC_VER)
-#include <malloc.h>
-#ifdef alloc
-# undef alloc
-#endif
-#define alloc _alloc
-#ifdef alloca
-# undef alloca
-#endif
-#define alloca _alloca
+#if (defined(_WINDOWS) || defined (WIN32))
+# include <malloc.h>
+
+# if defined(_MSC_VER)
+# ifdef alloc
+# undef alloc
+# endif
+# define alloc _alloc
+# ifdef alloca
+# undef alloca
+# endif
+# define alloca _alloca
+# endif
#endif
#if !defined _WINDOWS && !defined WIN32
-#include <alloca.h>
+# include <alloca.h>
#endif
#endif /*!QPID_SYS_ALLOCA_H*/
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp Fri Oct 21 14:42:12 2011
@@ -57,6 +57,7 @@ size_t CyrusSecurityLayer::decode(const
copied += count;
decodeBuffer.position += count;
size_t decodedSize = codec->decode(decodeBuffer.data, decodeBuffer.position);
+ if (decodedSize == 0) break;
if (decodedSize < decodeBuffer.position) {
::memmove(decodeBuffer.data, decodeBuffer.data + decodedSize, decodeBuffer.position - decodedSize);
}
@@ -106,7 +107,7 @@ size_t CyrusSecurityLayer::encode(const
bool CyrusSecurityLayer::canEncode()
{
- return encrypted || codec->canEncode();
+ return codec && (encrypted || codec->canEncode());
}
void CyrusSecurityLayer::init(qpid::sys::Codec* c)
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/epoll/EpollPoller.cpp Fri Oct 21 14:42:12 2011
@@ -384,7 +384,12 @@ void PollerPrivate::resetMode(PollerHand
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
+ int rc = ::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe);
+ // If something has closed the fd in the meantime try adding it back
+ if (rc ==-1 && errno == ENOENT) {
+ rc = ::epoll_ctl(epollFd, EPOLL_CTL_ADD, eh.fd(), &epe);
+ }
+ QPID_POSIX_CHECK(rc);
eh.setActive();
return;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Oct 21 14:42:12 2011
@@ -149,11 +149,12 @@ private:
ConnectedCallback connCallback;
FailedCallback failCallback;
const Socket& socket;
+ SocketAddress sa;
public:
AsynchConnector(const Socket& socket,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb);
void start(Poller::shared_ptr poller);
@@ -161,8 +162,8 @@ public:
};
AsynchConnector::AsynchConnector(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb) :
DispatchHandle(s,
@@ -171,11 +172,13 @@ AsynchConnector::AsynchConnector(const S
boost::bind(&AsynchConnector::connComplete, this, _1)),
connCallback(connCb),
failCallback(failCb),
- socket(s)
+ socket(s),
+ sa(hostname, port)
{
socket.setNonblocking();
- SocketAddress sa(hostname, boost::lexical_cast<std::string>(port));
+
// Note, not catching any exceptions here, also has effect of destructing
+ QPID_LOG(info, "Connecting: " << sa.asString());
socket.connect(sa);
}
@@ -191,11 +194,26 @@ void AsynchConnector::stop()
void AsynchConnector::connComplete(DispatchHandle& h)
{
- h.stopWatch();
int errCode = socket.getError();
if (errCode == 0) {
+ h.stopWatch();
connCallback(socket);
} else {
+ // Retry while we cause an immediate exception
+ // (asynch failure will be handled by re-entering here at the top)
+ while (sa.nextAddress()) {
+ try {
+ // Try next address without deleting ourselves
+ QPID_LOG(debug, "Ignored socket connect error: " << strError(errCode));
+ QPID_LOG(info, "Retrying connect: " << sa.asString());
+ socket.connect(sa);
+ return;
+ } catch (const std::exception& e) {
+ QPID_LOG(debug, "Ignored socket connect exception: " << e.what());
+ }
+ errCode = socket.getError();
+ }
+ h.stopWatch();
failCallback(socket, errCode, strError(errCode));
}
DispatchHandle::doDelete();
@@ -589,8 +607,8 @@ AsynchAcceptor* AsynchAcceptor::create(c
}
AsynchConnector* AsynchConnector::create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb)
{
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/LockFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/LockFile.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/LockFile.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/LockFile.cpp Fri Oct 21 14:42:12 2011
@@ -58,8 +58,7 @@ LockFile::~LockFile() {
if (impl) {
int f = impl->fd;
if (f >= 0) {
- int unused_ret;
- unused_ret = ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value.
+ (void) ::lockf(f, F_ULOCK, 0); // Suppress warnings about ignoring return value.
::close(f);
impl->fd = -1;
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Socket.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Socket.cpp Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,65 +34,35 @@
#include <netdb.h>
#include <cstdlib>
#include <string.h>
-#include <iostream>
-
-#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
namespace qpid {
namespace sys {
namespace {
-std::string getName(int fd, bool local, bool includeService = false)
+std::string getName(int fd, bool local)
{
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
-
- int result = -1;
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
if (local) {
- result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
+ QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
} else {
- result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
+ QPID_POSIX_CHECK( ::getpeername(fd, name, &namelen) );
}
- QPID_POSIX_CHECK(result);
-
- char servName[NI_MAXSERV];
- char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
- return std::string(dispName) + ":" + std::string(servName);
-
- } else {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
- throw QPID_POSIX_ERROR(rc);
- return dispName;
- }
+ return SocketAddress::asString(name, namelen);
}
-std::string getService(int fd, bool local)
+uint16_t getLocalPort(int fd)
{
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
-
- int result = -1;
- if (local) {
- result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
- } else {
- result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
- }
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
- QPID_POSIX_CHECK(result);
+ QPID_POSIX_CHECK( ::getsockname(fd, name, &namelen) );
- char servName[NI_MAXSERV];
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
- return servName;
+ return SocketAddress::getPort(name);
}
}
@@ -119,6 +89,11 @@ void Socket::createSocket(const SocketAd
try {
if (nonblocking) setNonblocking();
if (nodelay) setTcpNoDelay();
+ if (getAddrInfo(sa).ai_family == AF_INET6) {
+ int flag = 1;
+ int result = ::setsockopt(socket, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&flag, sizeof(flag));
+ QPID_POSIX_CHECK(result);
+ }
} catch (std::exception&) {
::close(s);
socket = -1;
@@ -126,13 +101,18 @@ void Socket::createSocket(const SocketAd
}
}
-void Socket::setTimeout(const Duration& interval) const
-{
- const int& socket = impl->fd;
- struct timeval tv;
- toTimeval(tv, interval);
- setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
- setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
+Socket* Socket::createSameTypeSocket() const {
+ int& socket = impl->fd;
+ // Socket currently has no actual socket attached
+ if (socket == -1)
+ return new Socket;
+
+ ::sockaddr_storage sa;
+ ::socklen_t salen = sizeof(sa);
+ QPID_POSIX_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+ int s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s < 0) throw QPID_POSIX_ERROR(errno);
+ return new Socket(new IOHandlePrivate(s));
}
void Socket::setNonblocking() const {
@@ -149,20 +129,27 @@ void Socket::setTcpNoDelay() const
nodelay = true;
if (socket != -1) {
int flag = 1;
- int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+ int result = ::setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
QPID_POSIX_CHECK(result);
}
}
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, const std::string& port) const
{
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
connect(sa);
}
void Socket::connect(const SocketAddress& addr) const
{
- connectname = addr.asString();
+ // The display name for an outbound connection needs to be the name that was specified
+ // for the address rather than a resolved IP address as we don't know which of
+ // the IP addresses is actually the one that will be connected to.
+ peername = addr.asString(false);
+
+ // However the string we compare with the local port must be numeric or it might not
+ // match when it should as getLocalAddress() will always be numeric
+ std::string connectname = addr.asString();
createSocket(addr);
@@ -170,7 +157,24 @@ void Socket::connect(const SocketAddress
// TODO the correct thing to do here is loop on failure until you've used all the returned addresses
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
(errno != EINPROGRESS)) {
- throw Exception(QPID_MSG(strError(errno) << ": " << connectname));
+ throw Exception(QPID_MSG(strError(errno) << ": " << peername));
+ }
+ // When connecting to a port on the same host which no longer has
+ // a process associated with it, the OS occasionally chooses the
+ // remote port (which is unoccupied) as the port to bind the local
+ // end of the socket, resulting in a "circular" connection.
+ //
+ // This seems like something the OS should prevent but I have
+ // confirmed that sporadic hangs in
+ // cluster_tests.LongTests.test_failover on RHEL5 are caused by
+ // such a circular connection.
+ //
+ // Raise an error if we see such a connection, since we know there is
+ // no listener on the peer address.
+ //
+ if (getLocalAddress() == connectname) {
+ close();
+ throw Exception(QPID_MSG("Connection refused: " << peername));
}
}
@@ -183,9 +187,9 @@ Socket::close() const
socket = -1;
}
-int Socket::listen(uint16_t port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
{
- SocketAddress sa("", boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
return listen(sa, backlog);
}
@@ -195,26 +199,24 @@ int Socket::listen(const SocketAddress&
const int& socket = impl->fd;
int yes=1;
- QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
+ QPID_POSIX_CHECK(::setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
- struct sockaddr_in name;
- socklen_t namelen = sizeof(name);
- if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- return ntohs(name.sin_port);
+ return getLocalPort(socket);
}
Socket* Socket::accept() const
{
int afd = ::accept(impl->fd, 0, 0);
- if ( afd >= 0)
- return new Socket(new IOHandlePrivate(afd));
+ if ( afd >= 0) {
+ Socket* s = new Socket(new IOHandlePrivate(afd));
+ s->localname = localname;
+ return s;
+ }
else if (errno == EAGAIN)
return 0;
else throw QPID_POSIX_ERROR(errno);
@@ -230,37 +232,20 @@ int Socket::write(const void *buf, size_
return ::write(impl->fd, buf, count);
}
-std::string Socket::getSockname() const
-{
- return getName(impl->fd, true);
-}
-
-std::string Socket::getPeername() const
-{
- return getName(impl->fd, false);
-}
-
std::string Socket::getPeerAddress() const
{
- if (connectname.empty()) {
- connectname = getName(impl->fd, false, true);
+ if (peername.empty()) {
+ peername = getName(impl->fd, false);
}
- return connectname;
+ return peername;
}
std::string Socket::getLocalAddress() const
{
- return getName(impl->fd, true, true);
-}
-
-uint16_t Socket::getLocalPort() const
-{
- return std::atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t Socket::getRemotePort() const
-{
- return std::atoi(getService(impl->fd, true).c_str());
+ if (localname.empty()) {
+ localname = getName(impl->fd, true);
+ }
+ return localname;
}
int Socket::getError() const
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/SocketAddress.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/SocketAddress.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/SocketAddress.cpp Fri Oct 21 14:42:12 2011
@@ -21,11 +21,13 @@
#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/posix/check.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include <sys/socket.h>
-#include <string.h>
+#include <netinet/in.h>
#include <netdb.h>
+#include <string.h>
namespace qpid {
namespace sys {
@@ -46,15 +48,9 @@ SocketAddress::SocketAddress(const Socke
SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
{
- if (&sa != this) {
- host = sa.host;
- port = sa.port;
-
- if (addrInfo) {
- ::freeaddrinfo(addrInfo);
- addrInfo = 0;
- }
- }
+ SocketAddress temp(sa);
+
+ std::swap(temp, *this);
return *this;
}
@@ -65,9 +61,61 @@ SocketAddress::~SocketAddress()
}
}
-std::string SocketAddress::asString() const
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen)
+{
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo(addr, addrlen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ std::string s;
+ switch (addr->sa_family) {
+ case AF_INET: s += dispName; break;
+ case AF_INET6: s += "["; s += dispName; s+= "]"; break;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+ s += ":";
+ s += servName;
+ return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+ switch (addr->sa_family) {
+ case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+ case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+ default:throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
+
+std::string SocketAddress::asString(bool numeric) const
{
- return host + ":" + port;
+ if (!numeric)
+ return host + ":" + port;
+ // Canonicalise into numeric id
+ const ::addrinfo& ai = getAddrInfo(*this);
+
+ return asString(ai.ai_addr, ai.ai_addrlen);
+}
+
+bool SocketAddress::nextAddress() {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+ if (!currentAddrInfo) return;
+
+ ::addrinfo& ai = *currentAddrInfo;
+ switch (ai.ai_family) {
+ case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+ case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
}
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
@@ -75,7 +123,8 @@ const ::addrinfo& getAddrInfo(const Sock
if (!sa.addrInfo) {
::addrinfo hints;
::memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // Change this to support IPv6
+ hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
hints.ai_socktype = SOCK_STREAM;
const char* node = 0;
@@ -88,10 +137,11 @@ const ::addrinfo& getAddrInfo(const Sock
int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
if (n != 0)
- throw Exception(QPID_MSG("Cannot resolve " << sa.host << ": " << ::gai_strerror(n)));
+ throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+ sa.currentAddrInfo = sa.addrInfo;
}
- return *sa.addrInfo;
+ return *sa.currentAddrInfo;
}
}}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Thread.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Thread.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Thread.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Thread.cpp Fri Oct 21 14:42:12 2011
@@ -37,7 +37,8 @@ void* runRunnable(void* p)
}
}
-struct ThreadPrivate {
+class ThreadPrivate {
+public:
pthread_t thread;
ThreadPrivate(Runnable* runnable) {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Time.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Time.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Time.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/posix/Time.cpp Fri Oct 21 14:42:12 2011
@@ -27,6 +27,7 @@
#include <stdio.h>
#include <sys/time.h>
#include <unistd.h>
+#include <iomanip>
namespace {
int64_t max_abstime() { return std::numeric_limits<int64_t>::max(); }
@@ -103,6 +104,12 @@ void outputFormattedNow(std::ostream& o)
o << " ";
}
+void outputHiresNow(std::ostream& o) {
+ ::timespec time;
+ ::clock_gettime(CLOCK_REALTIME, &time);
+ o << time.tv_sec << "." << std::setw(9) << std::setfill('0') << time.tv_nsec << "s ";
+}
+
void sleep(int secs) {
::sleep(secs);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/RdmaIO.cpp Fri Oct 21 14:42:12 2011
@@ -140,8 +140,8 @@ namespace Rdma {
// Prepost recv buffers before we go any further
qp->allocateRecvBuffers(recvBufferCount, bufferSize+FrameHeaderSize);
- // Create xmit buffers
- qp->createSendBuffers(xmitBufferCount, bufferSize+FrameHeaderSize);
+ // Create xmit buffers, reserve space for frame header.
+ qp->createSendBuffers(xmitBufferCount, bufferSize, FrameHeaderSize);
}
AsynchIO::~AsynchIO() {
@@ -210,12 +210,14 @@ namespace Rdma {
}
break;
case 1:
- Buffer* ob = buff ? buff : getSendBuffer();
+ if (!buff)
+ buff = getSendBuffer();
// Add FrameHeader after frame data
FrameHeader header(credit);
- ::memcpy(ob->bytes()+ob->dataCount(), &header, FrameHeaderSize);
- ob->dataCount(ob->dataCount()+FrameHeaderSize);
- qp->postSend(ob);
+ assert(buff->dataCount() <= buff->byteCount()); // ensure app data doesn't impinge on reserved space.
+ ::memcpy(buff->bytes()+buff->dataCount(), &header, FrameHeaderSize);
+ buff->dataCount(buff->dataCount()+FrameHeaderSize);
+ qp->postSend(buff);
break;
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.cpp Fri Oct 21 14:42:12 2011
@@ -50,8 +50,9 @@ namespace Rdma {
return count;
}
- Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) :
- bufferSize(byteCount)
+ Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount,
+ const int32_t reserve) :
+ bufferSize(byteCount + reserve), reserved(reserve)
{
sge.addr = (uintptr_t) bytes;
sge.length = 0;
@@ -163,21 +164,21 @@ namespace Rdma {
}
// Create buffers to use for writing
- void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize)
+ void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize, int reserved)
{
assert(!smr);
// Round up buffersize to cacheline (64 bytes)
- bufferSize = (bufferSize+63) & (~63);
+ int dataLength = (bufferSize+reserved+63) & (~63);
// Allocate memory block for all receive buffers
- char* mem = new char [sendBufferCount * bufferSize];
- smr = regMr(pd.get(), mem, sendBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE);
+ char* mem = new char [sendBufferCount * dataLength];
+ smr = regMr(pd.get(), mem, sendBufferCount * dataLength, ::IBV_ACCESS_LOCAL_WRITE);
sendBuffers.reserve(sendBufferCount);
freeBuffers.reserve(sendBufferCount);
for (int i = 0; i<sendBufferCount; ++i) {
// Allocate xmit buffer
- sendBuffers.push_back(Buffer(smr->lkey, &mem[i*bufferSize], bufferSize));
+ sendBuffers.push_back(Buffer(smr->lkey, &mem[i*dataLength], bufferSize, reserved));
freeBuffers.push_back(i);
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/rdma/rdma_wrap.h Fri Oct 21 14:42:12 2011
@@ -57,8 +57,9 @@ namespace Rdma {
void dataCount(int32_t);
private:
- Buffer(uint32_t lkey, char* bytes, const int32_t byteCount);
+ Buffer(uint32_t lkey, char* bytes, const int32_t byteCount, const int32_t reserve=0);
int32_t bufferSize;
+ int32_t reserved; // for framing header
::ibv_sge sge;
};
@@ -66,8 +67,9 @@ namespace Rdma {
return (char*) sge.addr;
}
+ /** return the number of bytes available for application data */
inline int32_t Buffer::byteCount() const {
- return bufferSize;
+ return bufferSize - reserved;
}
inline int32_t Buffer::dataCount() const {
@@ -75,6 +77,8 @@ namespace Rdma {
}
inline void Buffer::dataCount(int32_t s) {
+ // catch any attempt to overflow a buffer
+ assert(s <= bufferSize + reserved);
sge.length = s;
}
@@ -136,7 +140,7 @@ namespace Rdma {
typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
// Create a buffers to use for writing
- void createSendBuffers(int sendBufferCount, int bufferSize);
+ void createSendBuffers(int sendBufferCount, int dataSize, int headerSize);
// Get a send buffer
Buffer* getSendBuffer();
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslHandler.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslHandler.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslHandler.h Fri Oct 21 14:42:12 2011
@@ -35,7 +35,7 @@ namespace sys {
namespace ssl {
class SslIO;
-class SslIOBufferBase;
+struct SslIOBufferBase;
class SslSocket;
class SslHandler : public OutputControl {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.cpp Fri Oct 21 14:42:12 2011
@@ -68,29 +68,33 @@ __thread int64_t threadMaxReadTimeNs = 2
* Asynch Acceptor
*/
-SslAcceptor::SslAcceptor(const SslSocket& s, Callback callback) :
+template <class T>
+SslAcceptorTmpl<T>::SslAcceptorTmpl(const T& s, Callback callback) :
acceptedCallback(callback),
- handle(s, boost::bind(&SslAcceptor::readable, this, _1), 0, 0),
+ handle(s, boost::bind(&SslAcceptorTmpl<T>::readable, this, _1), 0, 0),
socket(s) {
s.setNonblocking();
ignoreSigpipe();
}
-SslAcceptor::~SslAcceptor()
+template <class T>
+SslAcceptorTmpl<T>::~SslAcceptorTmpl()
{
handle.stopWatch();
}
-void SslAcceptor::start(Poller::shared_ptr poller) {
+template <class T>
+void SslAcceptorTmpl<T>::start(Poller::shared_ptr poller) {
handle.startWatch(poller);
}
/*
* We keep on accepting as long as there is something to accept
*/
-void SslAcceptor::readable(DispatchHandle& h) {
- SslSocket* s;
+template <class T>
+void SslAcceptorTmpl<T>::readable(DispatchHandle& h) {
+ Socket* s;
do {
errno = 0;
// TODO: Currently we ignore the peers address, perhaps we should
@@ -110,6 +114,10 @@ void SslAcceptor::readable(DispatchHandl
h.rewatch();
}
+// Explicitly instantiate the templates we need
+template class SslAcceptorTmpl<SslSocket>;
+template class SslAcceptorTmpl<SslMuxSocket>;
+
/*
* Asynch Connector
*/
@@ -117,7 +125,7 @@ void SslAcceptor::readable(DispatchHandl
SslConnector::SslConnector(const SslSocket& s,
Poller::shared_ptr poller,
std::string hostname,
- uint16_t port,
+ std::string port,
ConnectedCallback connCb,
FailedCallback failCb) :
DispatchHandle(s,
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslIo.h Fri Oct 21 14:42:12 2011
@@ -29,26 +29,30 @@
namespace qpid {
namespace sys {
+
+class Socket;
+
namespace ssl {
-
+
class SslSocket;
/*
* Asynchronous ssl acceptor: accepts connections then does a callback
* with the accepted fd
*/
-class SslAcceptor {
+template <class T>
+class SslAcceptorTmpl {
public:
- typedef boost::function1<void, const SslSocket&> Callback;
+ typedef boost::function1<void, const Socket&> Callback;
private:
Callback acceptedCallback;
qpid::sys::DispatchHandle handle;
- const SslSocket& socket;
+ const T& socket;
public:
- SslAcceptor(const SslSocket& s, Callback callback);
- ~SslAcceptor();
+ SslAcceptorTmpl(const T& s, Callback callback);
+ ~SslAcceptorTmpl();
void start(qpid::sys::Poller::shared_ptr poller);
private:
@@ -73,7 +77,7 @@ public:
SslConnector(const SslSocket& socket,
Poller::shared_ptr poller,
std::string hostname,
- uint16_t port,
+ std::string port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.cpp Fri Oct 21 14:42:12 2011
@@ -25,11 +25,13 @@
#include "qpid/Exception.h"
#include "qpid/sys/posix/check.h"
#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/log/Statement.h"
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/errno.h>
+#include <poll.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
@@ -50,36 +52,6 @@ namespace sys {
namespace ssl {
namespace {
-std::string getName(int fd, bool local, bool includeService = false)
-{
- ::sockaddr_storage name; // big enough for any socket address
- ::socklen_t namelen = sizeof(name);
-
- int result = -1;
- if (local) {
- result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
- } else {
- result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
- }
-
- QPID_POSIX_CHECK(result);
-
- char servName[NI_MAXSERV];
- char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw QPID_POSIX_ERROR(rc);
- return std::string(dispName) + ":" + std::string(servName);
-
- } else {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
- throw QPID_POSIX_ERROR(rc);
- return dispName;
- }
-}
-
std::string getService(int fd, bool local)
{
::sockaddr_storage name; // big enough for any socket address
@@ -132,7 +104,7 @@ std::string getDomainFromSubject(std::st
}
-SslSocket::SslSocket() : IOHandle(new IOHandlePrivate()), socket(0), prototype(0)
+SslSocket::SslSocket() : socket(0), prototype(0)
{
impl->fd = ::socket (PF_INET, SOCK_STREAM, 0);
if (impl->fd < 0) throw QPID_POSIX_ERROR(errno);
@@ -144,7 +116,7 @@ SslSocket::SslSocket() : IOHandle(new IO
* returned from accept. Because we use posix accept rather than
* PR_Accept, we have to reset the handshake.
*/
-SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : IOHandle(ioph), socket(0), prototype(0)
+SslSocket::SslSocket(IOHandlePrivate* ioph, PRFileDesc* model) : Socket(ioph), socket(0), prototype(0)
{
socket = SSL_ImportFD(model, PR_ImportTCPSocket(impl->fd));
NSS_CHECK(SSL_ResetHandshake(socket, true));
@@ -158,7 +130,7 @@ void SslSocket::setNonblocking() const
PR_SetSocketOption(socket, &option);
}
-void SslSocket::connect(const std::string& host, uint16_t port) const
+void SslSocket::connect(const std::string& host, const std::string& port) const
{
std::stringstream namestream;
namestream << host << ":" << port;
@@ -180,7 +152,7 @@ void SslSocket::connect(const std::strin
PRHostEnt hostEntry;
PR_CHECK(PR_GetHostByName(host.data(), hostBuffer, PR_NETDB_BUF_SIZE, &hostEntry));
PRNetAddr address;
- int value = PR_EnumerateHostEnt(0, &hostEntry, port, &address);
+ int value = PR_EnumerateHostEnt(0, &hostEntry, boost::lexical_cast<PRUint16>(port), &address);
if (value < 0) {
throw Exception(QPID_MSG("Error getting address for host: " << ErrorString()));
} else if (value == 0) {
@@ -238,6 +210,7 @@ int SslSocket::listen(uint16_t port, int
SslSocket* SslSocket::accept() const
{
+ QPID_LOG(trace, "Accepting SSL connection.");
int afd = ::accept(impl->fd, 0, 0);
if ( afd >= 0) {
return new SslSocket(new IOHandlePrivate(afd), prototype);
@@ -248,36 +221,109 @@ SslSocket* SslSocket::accept() const
}
}
-int SslSocket::read(void *buf, size_t count) const
-{
- return PR_Read(socket, buf, count);
-}
+#define SSL_STREAM_MAX_WAIT_ms 20
+#define SSL_STREAM_MAX_RETRIES 2
-int SslSocket::write(const void *buf, size_t count) const
-{
- return PR_Write(socket, buf, count);
-}
+static bool isSslStream(int afd) {
+ int retries = SSL_STREAM_MAX_RETRIES;
+ unsigned char buf[5] = {};
-std::string SslSocket::getSockname() const
-{
- return getName(impl->fd, true);
+ do {
+ struct pollfd fd = {afd, POLLIN, 0};
+
+ /*
+ * Note that this is blocking the accept thread, so connections that
+ * send no data can limit the rate at which we can accept new
+ * connections.
+ */
+ if (::poll(&fd, 1, SSL_STREAM_MAX_WAIT_ms) > 0) {
+ errno = 0;
+ int result = recv(afd, buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT);
+ if (result == sizeof(buf)) {
+ break;
+ }
+ if (errno && errno != EAGAIN) {
+ int err = errno;
+ ::close(afd);
+ throw QPID_POSIX_ERROR(err);
+ }
+ }
+ } while (retries-- > 0);
+
+ if (retries < 0) {
+ return false;
+ }
+
+ /*
+ * SSLv2 Client Hello format
+ * http://www.mozilla.org/projects/security/pki/nss/ssl/draft02.html
+ *
+ * Bytes 0-1: RECORD-LENGTH
+ * Byte 2: MSG-CLIENT-HELLO (1)
+ * Byte 3: CLIENT-VERSION-MSB
+ * Byte 4: CLIENT-VERSION-LSB
+ *
+ * Allowed versions:
+ * 2.0 - SSLv2
+ * 3.0 - SSLv3
+ * 3.1 - TLS 1.0
+ * 3.2 - TLS 1.1
+ * 3.3 - TLS 1.2
+ *
+ * The version sent in the Client-Hello is the latest version supported by
+ * the client. NSS may send version 3.x in an SSLv2 header for
+ * maximum compatibility.
+ */
+ bool isSSL2Handshake = buf[2] == 1 && // MSG-CLIENT-HELLO
+ ((buf[3] == 3 && buf[4] <= 3) || // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3)
+ (buf[3] == 2 && buf[4] == 0)); // SSL 2
+
+ /*
+ * SSLv3/TLS Client Hello format
+ * RFC 2246
+ *
+ * Byte 0: ContentType (handshake - 22)
+ * Bytes 1-2: ProtocolVersion {major, minor}
+ *
+ * Allowed versions:
+ * 3.0 - SSLv3
+ * 3.1 - TLS 1.0
+ * 3.2 - TLS 1.1
+ * 3.3 - TLS 1.2
+ */
+ bool isSSL3Handshake = buf[0] == 22 && // handshake
+ (buf[1] == 3 && buf[2] <= 3); // SSL 3.0 & TLS 1.0-1.2 (v3.1-3.3)
+
+ return isSSL2Handshake || isSSL3Handshake;
}
-std::string SslSocket::getPeername() const
+Socket* SslMuxSocket::accept() const
{
- return getName(impl->fd, false);
+ int afd = ::accept(impl->fd, 0, 0);
+ if (afd >= 0) {
+ QPID_LOG(trace, "Accepting connection with optional SSL wrapper.");
+ if (isSslStream(afd)) {
+ QPID_LOG(trace, "Accepted SSL connection.");
+ return new SslSocket(new IOHandlePrivate(afd), prototype);
+ } else {
+ QPID_LOG(trace, "Accepted Plaintext connection.");
+ return new Socket(new IOHandlePrivate(afd));
+ }
+ } else if (errno == EAGAIN) {
+ return 0;
+ } else {
+ throw QPID_POSIX_ERROR(errno);
+ }
}
-std::string SslSocket::getPeerAddress() const
+int SslSocket::read(void *buf, size_t count) const
{
- if (!connectname.empty())
- return connectname;
- return getName(impl->fd, false, true);
+ return PR_Read(socket, buf, count);
}
-std::string SslSocket::getLocalAddress() const
+int SslSocket::write(const void *buf, size_t count) const
{
- return getName(impl->fd, true, true);
+ return PR_Write(socket, buf, count);
}
uint16_t SslSocket::getLocalPort() const
@@ -290,17 +336,6 @@ uint16_t SslSocket::getRemotePort() cons
return atoi(getService(impl->fd, true).c_str());
}
-int SslSocket::getError() const
-{
- int result;
- socklen_t rSize = sizeof (result);
-
- if (::getsockopt(impl->fd, SOL_SOCKET, SO_ERROR, &result, &rSize) < 0)
- throw QPID_POSIX_ERROR(errno);
-
- return result;
-}
-
void SslSocket::setTcpNoDelay(bool nodelay) const
{
if (nodelay) {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/ssl/SslSocket.h Fri Oct 21 14:42:12 2011
@@ -23,6 +23,7 @@
*/
#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Socket.h"
#include <nspr.h>
#include <string>
@@ -36,7 +37,7 @@ class Duration;
namespace ssl {
-class SslSocket : public qpid::sys::IOHandle
+class SslSocket : public qpid::sys::Socket
{
public:
/** Create a socket wrapper for descriptor. */
@@ -53,7 +54,7 @@ public:
* NSSInit().*/
void setCertName(const std::string& certName);
- void connect(const std::string& host, uint16_t port) const;
+ void connect(const std::string& host, const std::string& port) const;
void close() const;
@@ -75,45 +76,13 @@ public:
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
- /** Returns the "socket name" ie the address bound to
- * the near end of the socket
- */
- std::string getSockname() const;
-
- /** Returns the "peer name" ie the address bound to
- * the remote end of the socket
- */
- std::string getPeername() const;
-
- /**
- * Returns an address (host and port) for the remote end of the
- * socket
- */
- std::string getPeerAddress() const;
- /**
- * Returns an address (host and port) for the local end of the
- * socket
- */
- std::string getLocalAddress() const;
-
- /**
- * Returns the full address of the connection: local and remote host and port.
- */
- std::string getFullAddress() const { return getLocalAddress()+"-"+getPeerAddress(); }
-
uint16_t getLocalPort() const;
uint16_t getRemotePort() const;
- /**
- * Returns the error code stored in the socket. This may be used
- * to determine the result of a non-blocking connect.
- */
- int getError() const;
-
int getKeyLen() const;
std::string getClientAuthId() const;
-private:
+protected:
mutable std::string connectname;
mutable PRFileDesc* socket;
std::string certname;
@@ -126,6 +95,13 @@ private:
mutable PRFileDesc* prototype;
SslSocket(IOHandlePrivate* ioph, PRFileDesc* model);
+ friend class SslMuxSocket;
+};
+
+class SslMuxSocket : public SslSocket
+{
+public:
+ Socket* accept() const;
};
}}}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIO.cpp Fri Oct 21 14:42:12 2011
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/windows/check.h"
+#include "qpid/sys/windows/mingw32_compat.h"
#include <boost/thread/once.hpp>
@@ -46,16 +47,13 @@ namespace {
/*
* The function pointers for AcceptEx and ConnectEx need to be looked up
- * at run time. Make sure this is done only once.
+ * at run time.
*/
-boost::once_flag lookUpAcceptExOnce = BOOST_ONCE_INIT;
-LPFN_ACCEPTEX fnAcceptEx = 0;
-typedef void (*lookUpFunc)(const qpid::sys::Socket &);
-
-void lookUpAcceptEx() {
- SOCKET h = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+const LPFN_ACCEPTEX lookUpAcceptEx(const qpid::sys::Socket& s) {
+ SOCKET h = toSocketHandle(s);
GUID guidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
+ LPFN_ACCEPTEX fnAcceptEx;
WSAIoctl(h,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx,
@@ -65,9 +63,9 @@ void lookUpAcceptEx() {
&dwBytes,
NULL,
NULL);
- closesocket(h);
if (fnAcceptEx == 0)
throw qpid::Exception(QPID_MSG("Failed to look up AcceptEx"));
+ return fnAcceptEx;
}
}
@@ -94,18 +92,15 @@ private:
AsynchAcceptor::Callback acceptedCallback;
const Socket& socket;
+ const LPFN_ACCEPTEX fnAcceptEx;
};
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
- socket(s) {
+ socket(s),
+ fnAcceptEx(lookUpAcceptEx(s)) {
s.setNonblocking();
-#if (BOOST_VERSION >= 103500) /* boost 1.35 or later reversed the args */
- boost::call_once(lookUpAcceptExOnce, lookUpAcceptEx);
-#else
- boost::call_once(lookUpAcceptEx, lookUpAcceptExOnce);
-#endif
}
AsynchAcceptor::~AsynchAcceptor()
@@ -114,7 +109,8 @@ AsynchAcceptor::~AsynchAcceptor()
}
void AsynchAcceptor::start(Poller::shared_ptr poller) {
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ PollerHandle ph = PollerHandle(socket);
+ poller->monitorHandle(ph, Poller::INPUT);
restart ();
}
@@ -122,25 +118,26 @@ void AsynchAcceptor::restart(void) {
DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
this,
- toSocketHandle(socket));
+ socket);
BOOL status;
- status = ::fnAcceptEx(toSocketHandle(socket),
- toSocketHandle(*result->newSocket),
- result->addressBuffer,
- 0,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- AsynchAcceptResult::SOCKADDRMAXLEN,
- &bytesReceived,
- result->overlapped());
+ status = fnAcceptEx(toSocketHandle(socket),
+ toSocketHandle(*result->newSocket),
+ result->addressBuffer,
+ 0,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ AsynchAcceptResult::SOCKADDRMAXLEN,
+ &bytesReceived,
+ result->overlapped());
QPID_WINDOWS_CHECK_ASYNC_START(status);
}
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- SOCKET listener)
- : callback(cb), acceptor(acceptor), listener(listener) {
- newSocket.reset (new Socket());
+ const Socket& listener)
+ : callback(cb), acceptor(acceptor),
+ listener(toSocketHandle(listener)),
+ newSocket(listener.createSameTypeSocket()) {
}
void AsynchAcceptResult::success(size_t /*bytesTransferred*/) {
@@ -154,7 +151,7 @@ void AsynchAcceptResult::success(size_t
delete this;
}
-void AsynchAcceptResult::failure(int status) {
+void AsynchAcceptResult::failure(int /*status*/) {
//if (status != WSA_OPERATION_ABORTED)
// Can there be anything else? ;
delete this;
@@ -173,20 +170,20 @@ private:
FailedCallback failCallback;
const Socket& socket;
const std::string hostname;
- const uint16_t port;
+ const std::string port;
public:
AsynchConnector(const Socket& socket,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb = 0);
void start(Poller::shared_ptr poller);
};
AsynchConnector::AsynchConnector(const Socket& sock,
- std::string hname,
- uint16_t p,
+ const std::string& hname,
+ const std::string& p,
ConnectedCallback connCb,
FailedCallback failCb) :
connCallback(connCb), failCallback(failCb), socket(sock),
@@ -216,8 +213,8 @@ AsynchAcceptor* AsynchAcceptor::create(c
}
AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
- std::string hostname,
- uint16_t port,
+ const std::string& hostname,
+ const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb)
{
@@ -410,8 +407,9 @@ void AsynchIO::queueForDeletion() {
}
void AsynchIO::start(Poller::shared_ptr poller0) {
+ PollerHandle ph = PollerHandle(socket);
poller = poller0;
- poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
+ poller->monitorHandle(ph, Poller::INPUT);
if (writeQueue.size() > 0) // Already have data queued for write
notifyPendingWrite();
startReading();
@@ -584,7 +582,6 @@ void AsynchIO::notifyIdle(void) {
void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
writeInProgress = true;
InterlockedIncrement(&opsInProgress);
- int writeCount = buff->byteCount-buff->dataCount;
AsynchWriteResult *result =
new AsynchWriteResult(boost::bind(&AsynchIO::completion, this, _1),
buff,
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIoResult.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIoResult.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/AsynchIoResult.h Fri Oct 21 14:42:12 2011
@@ -83,22 +83,22 @@ class AsynchAcceptResult : public Asynch
public:
AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb,
AsynchAcceptor *acceptor,
- SOCKET listener);
+ const qpid::sys::Socket& listener);
virtual void success (size_t bytesTransferred);
virtual void failure (int error);
private:
virtual void complete(void) {} // No-op for this class.
- std::auto_ptr<qpid::sys::Socket> newSocket;
qpid::sys::AsynchAcceptor::Callback callback;
AsynchAcceptor *acceptor;
SOCKET listener;
+ std::auto_ptr<qpid::sys::Socket> newSocket;
// AcceptEx needs a place to write the local and remote addresses
// when accepting the connection. Place those here; get enough for
// IPv6 addresses, even if the socket is IPv4.
- enum { SOCKADDRMAXLEN = sizeof sockaddr_in6 + 16,
+ enum { SOCKADDRMAXLEN = sizeof(sockaddr_in6) + 16,
SOCKADDRBUFLEN = 2 * SOCKADDRMAXLEN };
char addressBuffer[SOCKADDRBUFLEN];
};
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/IocpPoller.cpp Fri Oct 21 14:42:12 2011
@@ -152,9 +152,9 @@ void Poller::monitorHandle(PollerHandle&
}
// All no-ops...
-void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {}
-void Poller::registerHandle(PollerHandle& handle) {}
-void Poller::unregisterHandle(PollerHandle& handle) {}
+void Poller::unmonitorHandle(PollerHandle& /*handle*/, Direction /*dir*/) {}
+void Poller::registerHandle(PollerHandle& /*handle*/) {}
+void Poller::unregisterHandle(PollerHandle& /*handle*/) {}
Poller::Event Poller::wait(Duration timeout) {
DWORD timeoutMs = 0;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Shlib.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Shlib.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Shlib.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Shlib.cpp Fri Oct 21 14:42:12 2011
@@ -44,7 +44,8 @@ void Shlib::unload() {
}
void* Shlib::getSymbol(const char* name) {
- void* sym = GetProcAddress(static_cast<HMODULE>(handle), name);
+ // Double cast avoids warning about casting function pointer to object
+ void *sym = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(GetProcAddress(static_cast<HMODULE>(handle), name)));
if (sym == NULL)
throw QPID_WINDOWS_ERROR(GetLastError());
return sym;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp Fri Oct 21 14:42:12 2011
@@ -20,19 +20,18 @@
*/
#include "qpid/sys/Socket.h"
+
#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/windows/IoHandlePrivate.h"
#include "qpid/sys/windows/check.h"
-#include "qpid/sys/Time.h"
+#include "qpid/sys/windows/IoHandlePrivate.h"
-#include <cstdlib>
-#include <string.h>
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
#include <winsock2.h>
-#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
-
// Need to initialize WinSock. Ideally, this would be a singleton or embedded
// in some one-time initialization function. I tried boost singleton and could
// not get it to compile (and others located in google had the same problem).
@@ -84,53 +83,30 @@ namespace sys {
namespace {
-std::string getName(SOCKET fd, bool local, bool includeService = false)
+std::string getName(SOCKET fd, bool local)
{
- sockaddr_in name; // big enough for any socket address
- socklen_t namelen = sizeof(name);
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
if (local) {
- QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
+ QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
} else {
- QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
+ QPID_WINSOCK_CHECK(::getpeername(fd, name, &namelen));
}
- char servName[NI_MAXSERV];
- char dispName[NI_MAXHOST];
- if (includeService) {
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- dispName, sizeof(dispName),
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return std::string(dispName) + ":" + std::string(servName);
- } else {
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- dispName, sizeof(dispName),
- 0, 0,
- NI_NUMERICHOST) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return dispName;
- }
+ return SocketAddress::asString(name, namelen);
}
-std::string getService(SOCKET fd, bool local)
+uint16_t getLocalPort(int fd)
{
- sockaddr_in name; // big enough for any socket address
- socklen_t namelen = sizeof(name);
-
- if (local) {
- QPID_WINSOCK_CHECK(::getsockname(fd, (sockaddr*)&name, &namelen));
- } else {
- QPID_WINSOCK_CHECK(::getpeername(fd, (sockaddr*)&name, &namelen));
- }
+ ::sockaddr_storage name_s; // big enough for any socket address
+ ::sockaddr* name = (::sockaddr*)&name_s;
+ ::socklen_t namelen = sizeof(name_s);
+
+ QPID_WINSOCK_CHECK(::getsockname(fd, name, &namelen));
- char servName[NI_MAXSERV];
- if (int rc = ::getnameinfo((sockaddr*)&name, namelen,
- 0, 0,
- servName, sizeof(servName),
- NI_NUMERICHOST | NI_NUMERICSERV) != 0)
- throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
- return servName;
+ return SocketAddress::getPort(name);
}
} // namespace
@@ -138,13 +114,7 @@ Socket::Socket() :
IOHandle(new IOHandlePrivate),
nonblocking(false),
nodelay(false)
-{
- SOCKET& socket = impl->fd;
- if (socket != INVALID_SOCKET) Socket::close();
- SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
- if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
- socket = s;
-}
+{}
Socket::Socket(IOHandlePrivate* h) :
IOHandle(h),
@@ -152,8 +122,7 @@ Socket::Socket(IOHandlePrivate* h) :
nodelay(false)
{}
-void
-Socket::createSocket(const SocketAddress& sa) const
+void Socket::createSocket(const SocketAddress& sa) const
{
SOCKET& socket = impl->fd;
if (socket != INVALID_SOCKET) Socket::close();
@@ -168,24 +137,24 @@ Socket::createSocket(const SocketAddress
if (nonblocking) setNonblocking();
if (nodelay) setTcpNoDelay();
} catch (std::exception&) {
- closesocket(s);
+ ::closesocket(s);
socket = INVALID_SOCKET;
throw;
}
}
-void Socket::setTimeout(const Duration& interval) const
-{
- const SOCKET& socket = impl->fd;
- int64_t nanosecs = interval;
- nanosecs /= (1000 * 1000); // nsecs -> usec -> msec
- int msec = 0;
- if (nanosecs > std::numeric_limits<int>::max())
- msec = std::numeric_limits<int>::max();
- else
- msec = static_cast<int>(nanosecs);
- setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&msec, sizeof(msec));
- setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&msec, sizeof(msec));
+Socket* Socket::createSameTypeSocket() const {
+ SOCKET& socket = impl->fd;
+ // Socket currently has no actual socket attached
+ if (socket == INVALID_SOCKET)
+ return new Socket;
+
+ ::sockaddr_storage sa;
+ ::socklen_t salen = sizeof(sa);
+ QPID_WINSOCK_CHECK(::getsockname(socket, (::sockaddr*)&sa, &salen));
+ SOCKET s = ::socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ return new Socket(new IOHandlePrivate(s));
}
void Socket::setNonblocking() const {
@@ -193,30 +162,25 @@ void Socket::setNonblocking() const {
QPID_WINSOCK_CHECK(ioctlsocket(impl->fd, FIONBIO, &nonblock));
}
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, const std::string& port) const
{
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
connect(sa);
}
void
Socket::connect(const SocketAddress& addr) const
{
+ peername = addr.asString(false);
+
+ createSocket(addr);
+
const SOCKET& socket = impl->fd;
- const addrinfo *addrs = &(getAddrInfo(addr));
- int error = 0;
+ int err;
WSASetLastError(0);
- while (addrs != 0) {
- if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) ||
- (WSAGetLastError() == WSAEWOULDBLOCK))
- break;
- // Error... save this error code and see if there are other address
- // to try before throwing the exception.
- error = WSAGetLastError();
- addrs = addrs->ai_next;
- }
- if (error)
- throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname));
+ if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) != 0) &&
+ ((err = ::WSAGetLastError()) != WSAEWOULDBLOCK))
+ throw qpid::Exception(QPID_MSG(strError(err) << ": " << peername));
}
void
@@ -247,24 +211,26 @@ int Socket::read(void *buf, size_t count
return received;
}
-int Socket::listen(uint16_t port, int backlog) const
+int Socket::listen(const std::string& host, const std::string& port, int backlog) const
+{
+ SocketAddress sa(host, port);
+ return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& addr, int backlog) const
{
+ createSocket(addr);
+
const SOCKET& socket = impl->fd;
BOOL yes=1;
QPID_WINSOCK_CHECK(setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char *)&yes, sizeof(yes)));
- struct sockaddr_in name;
- memset(&name, 0, sizeof(name));
- name.sin_family = AF_INET;
- name.sin_port = htons(port);
- name.sin_addr.s_addr = 0;
- if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(WSAGetLastError())));
+
+ if (::bind(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) == SOCKET_ERROR)
+ throw Exception(QPID_MSG("Can't bind to " << addr.asString() << ": " << strError(WSAGetLastError())));
if (::listen(socket, backlog) == SOCKET_ERROR)
- throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(WSAGetLastError())));
-
- socklen_t namelen = sizeof(name);
- QPID_WINSOCK_CHECK(::getsockname(socket, (struct sockaddr*)&name, &namelen));
- return ntohs(name.sin_port);
+ throw Exception(QPID_MSG("Can't listen on " <<addr.asString() << ": " << strError(WSAGetLastError())));
+
+ return getLocalPort(socket);
}
Socket* Socket::accept() const
@@ -277,36 +243,20 @@ Socket* Socket::accept() const
else throw QPID_WINDOWS_ERROR(WSAGetLastError());
}
-std::string Socket::getSockname() const
-{
- return getName(impl->fd, true);
-}
-
-std::string Socket::getPeername() const
-{
- return getName(impl->fd, false);
-}
-
std::string Socket::getPeerAddress() const
{
- if (!connectname.empty())
- return std::string (connectname);
- return getName(impl->fd, false, true);
+ if (peername.empty()) {
+ peername = getName(impl->fd, false);
+ }
+ return peername;
}
std::string Socket::getLocalAddress() const
{
- return getName(impl->fd, true, true);
-}
-
-uint16_t Socket::getLocalPort() const
-{
- return atoi(getService(impl->fd, true).c_str());
-}
-
-uint16_t Socket::getRemotePort() const
-{
- return atoi(getService(impl->fd, true).c_str());
+ if (localname.empty()) {
+ localname = getName(impl->fd, true);
+ }
+ return localname;
}
int Socket::getError() const
Propchange: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/Socket.cpp
('svn:executable' removed)
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SocketAddress.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SocketAddress.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SocketAddress.cpp Fri Oct 21 14:42:12 2011
@@ -21,7 +21,13 @@
#include "qpid/sys/SocketAddress.h"
-#include "qpid/sys/windows/check.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+
+// Ensure we get all of winsock2.h
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
#include <winsock2.h>
#include <ws2tcpip.h>
@@ -35,37 +41,111 @@ SocketAddress::SocketAddress(const std::
port(port0),
addrInfo(0)
{
- ::addrinfo hints;
- ::memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
- hints.ai_socktype = SOCK_STREAM;
-
- const char* node = 0;
- if (host.empty()) {
- hints.ai_flags |= AI_PASSIVE;
- } else {
- node = host.c_str();
- }
- const char* service = port.empty() ? "0" : port.c_str();
+}
+
+SocketAddress::SocketAddress(const SocketAddress& sa) :
+ host(sa.host),
+ port(sa.port),
+ addrInfo(0)
+{
+}
- int n = ::getaddrinfo(node, service, &hints, &addrInfo);
- if (n != 0)
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+SocketAddress& SocketAddress::operator=(const SocketAddress& sa)
+{
+ SocketAddress temp(sa);
+
+ std::swap(temp, *this);
+ return *this;
}
SocketAddress::~SocketAddress()
{
- ::freeaddrinfo(addrInfo);
+ if (addrInfo) {
+ ::freeaddrinfo(addrInfo);
+ }
}
-std::string SocketAddress::asString() const
+std::string SocketAddress::asString(::sockaddr const * const addr, size_t addrlen)
{
- return host + ":" + port;
+ char servName[NI_MAXSERV];
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo(addr, addrlen,
+ dispName, sizeof(dispName),
+ servName, sizeof(servName),
+ NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+ throw qpid::Exception(QPID_MSG(gai_strerror(rc)));
+ std::string s;
+ switch (addr->sa_family) {
+ case AF_INET: s += dispName; break;
+ case AF_INET6: s += "["; s += dispName; s+= "]"; break;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+ s += ":";
+ s += servName;
+ return s;
+}
+
+uint16_t SocketAddress::getPort(::sockaddr const * const addr)
+{
+ switch (addr->sa_family) {
+ case AF_INET: return ntohs(((::sockaddr_in*)addr)->sin_port);
+ case AF_INET6: return ntohs(((::sockaddr_in6*)addr)->sin6_port);
+ default:throw Exception(QPID_MSG("Unexpected socket type"));
+ }
+}
+
+std::string SocketAddress::asString(bool numeric) const
+{
+ if (!numeric)
+ return host + ":" + port;
+ // Canonicalise into numeric id
+ const ::addrinfo& ai = getAddrInfo(*this);
+
+ return asString(ai.ai_addr, ai.ai_addrlen);
+}
+
+bool SocketAddress::nextAddress() {
+ bool r = currentAddrInfo->ai_next != 0;
+ if (r)
+ currentAddrInfo = currentAddrInfo->ai_next;
+ return r;
+}
+
+void SocketAddress::setAddrInfoPort(uint16_t port) {
+ if (!currentAddrInfo) return;
+
+ ::addrinfo& ai = *currentAddrInfo;
+ switch (ai.ai_family) {
+ case AF_INET: ((::sockaddr_in*)ai.ai_addr)->sin_port = htons(port); return;
+ case AF_INET6:((::sockaddr_in6*)ai.ai_addr)->sin6_port = htons(port); return;
+ default: throw Exception(QPID_MSG("Unexpected socket type"));
+ }
}
const ::addrinfo& getAddrInfo(const SocketAddress& sa)
{
- return *sa.addrInfo;
+ if (!sa.addrInfo) {
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_flags = AI_ADDRCONFIG; // Only use protocols that we have configured interfaces for
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+ hints.ai_socktype = SOCK_STREAM;
+
+ const char* node = 0;
+ if (sa.host.empty()) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ node = sa.host.c_str();
+ }
+ const char* service = sa.port.empty() ? "0" : sa.port.c_str();
+
+ int n = ::getaddrinfo(node, service, &hints, &sa.addrInfo);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << sa.asString(false) << ": " << ::gai_strerror(n)));
+ sa.currentAddrInfo = sa.addrInfo;
+ }
+
+ return *sa.currentAddrInfo;
}
}}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SslAsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SslAsynchIO.h?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SslAsynchIO.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/SslAsynchIO.h Fri Oct 21 14:42:12 2011
@@ -39,9 +39,6 @@ namespace qpid {
namespace sys {
namespace windows {
-class Socket;
-class Poller;
-
/*
* SSL/Schannel shim between the frame-handling and AsynchIO layers.
* SslAsynchIO creates a regular AsynchIO object to handle I/O and this class
Modified: qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/StrError.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/StrError.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/StrError.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/sys/windows/StrError.cpp Fri Oct 21 14:42:12 2011
@@ -30,6 +30,7 @@ namespace sys {
std::string strError(int err) {
const size_t bufsize = 512;
char buf[bufsize];
+ buf[0] = 0;
if (0 == FormatMessage (FORMAT_MESSAGE_MAX_WIDTH_MASK
| FORMAT_MESSAGE_FROM_SYSTEM,
0,
@@ -39,7 +40,11 @@ std::string strError(int err) {
bufsize,
0))
{
- strerror_s (buf, bufsize, err);
+#ifdef _MSC_VER
+ strerror_s(buf, bufsize, err);
+#else
+ return std::string(strerror(err));
+#endif
}
return std::string(buf);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org