You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/10/20 19:10:06 UTC
svn commit: r827735 - in /qpid/trunk/qpid/cpp: include/qpid/sys/
src/qpid/broker/ src/qpid/sys/windows/
Author: shuston
Date: Tue Oct 20 17:10:05 2009
New Revision: 827735
URL: http://svn.apache.org/viewvc?rev=827735&view=rev
Log:
Carry over recent AsynchIO-level changes to Windows.
Modified:
qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp
Modified: qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/sys/IOHandle.h Tue Oct 20 17:10:05 2009
@@ -35,6 +35,8 @@
class AsynchAcceptorPrivate;
class AsynchAcceptResult;
namespace windows {
+ class AsynchAcceptor;
+ class AsynchAcceptResult;
class AsynchIO;
}
@@ -43,8 +45,8 @@
class IOHandlePrivate;
class IOHandle {
- friend class AsynchAcceptorPrivate;
- friend class AsynchAcceptResult;
+ friend class windows::AsynchAcceptResult;
+ friend class windows::AsynchAcceptor;
friend class windows::AsynchIO;
friend class PollerHandle;
@@ -52,7 +54,7 @@
protected:
IOHandlePrivate* const impl;
- IOHandle(IOHandlePrivate*);
+ IOHandle(IOHandlePrivate*);
QPID_COMMON_EXTERN virtual ~IOHandle();
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Oct 20 17:10:05 2009
@@ -201,7 +201,7 @@
void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
/** Accept connections */
- void accept();
+ QPID_BROKER_EXTERN void accept();
/** Create a connection to another broker. */
void connect(const std::string& host, uint16_t port,
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Tue Oct 20 17:10:05 2009
@@ -74,6 +74,7 @@
namespace qpid {
namespace sys {
+namespace windows {
/*
* Asynch Acceptor
@@ -88,13 +89,13 @@
* and status of each accept operation outstanding.
*/
-class AsynchAcceptorPrivate {
+class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
friend class AsynchAcceptResult;
public:
- AsynchAcceptorPrivate(const Socket& s, AsynchAcceptor::Callback callback);
- ~AsynchAcceptorPrivate();
+ AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
+ ~AsynchAcceptor();
void start(Poller::shared_ptr poller);
private:
@@ -104,19 +105,7 @@
const Socket& socket;
};
-AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
- impl(new AsynchAcceptorPrivate(s, callback))
-{}
-
-AsynchAcceptor::~AsynchAcceptor()
-{ delete impl; }
-
-void AsynchAcceptor::start(Poller::shared_ptr poller) {
- impl->start(poller);
-}
-
-AsynchAcceptorPrivate::AsynchAcceptorPrivate(const Socket& s,
- AsynchAcceptor::Callback callback)
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
socket(s) {
@@ -128,16 +117,17 @@
#endif
}
-AsynchAcceptorPrivate::~AsynchAcceptorPrivate(void) {
+AsynchAcceptor::~AsynchAcceptor()
+{
socket.close();
}
-void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
poller->monitorHandle(PollerHandle(socket), Poller::INPUT);
restart ();
}
-void AsynchAcceptorPrivate::restart(void) {
+void AsynchAcceptor::restart(void) {
DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
this,
@@ -156,7 +146,7 @@
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
- AsynchAcceptorPrivate *acceptor,
+ AsynchAcceptor *acceptor,
SOCKET listener)
: callback(cb), acceptor(acceptor), listener(listener) {
newSocket.reset (new Socket());
@@ -174,13 +164,11 @@
}
void AsynchAcceptResult::failure(int status) {
- //if (status != WSA_OPERATION_ABORTED)
- // Can there be anything else? ;
- delete this;
+ //if (status != WSA_OPERATION_ABORTED)
+ // Can there be anything else? ;
+ delete this;
}
-namespace windows {
-
/*
* AsynchConnector does synchronous connects for now... to do asynch the
* IocpPoller will need some extension to register an event handle as a
@@ -224,6 +212,12 @@
} // namespace windows
+AsynchAcceptor* AsynchAcceptor::create(const Socket& s,
+ Callback callback)
+{
+ return new windows::AsynchAcceptor(s, callback);
+}
+
AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
Poller::shared_ptr poller,
std::string hostname,
@@ -231,12 +225,12 @@
ConnectedCallback connCb,
FailedCallback failCb)
{
- return new qpid::sys::windows::AsynchConnector(s,
- poller,
- hostname,
- port,
- connCb,
- failCb);
+ return new windows::AsynchConnector(s,
+ poller,
+ hostname,
+ port,
+ connCb,
+ failCb);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIoResult.h Tue Oct 20 17:10:05 2009
@@ -30,6 +30,7 @@
namespace qpid {
namespace sys {
+namespace windows {
/*
* AsynchIoResult defines the class that receives the result of an
@@ -73,14 +74,13 @@
int status;
};
-class AsynchAcceptorPrivate;
class AsynchAcceptResult : public AsynchResult {
- friend class AsynchAcceptorPrivate;
+ friend class AsynchAcceptor;
public:
- AsynchAcceptResult(AsynchAcceptor::Callback cb,
- AsynchAcceptorPrivate *acceptor,
+ AsynchAcceptResult(qpid::sys::AsynchAcceptor::Callback cb,
+ AsynchAcceptor *acceptor,
SOCKET listener);
virtual void success (size_t bytesTransferred);
virtual void failure (int error);
@@ -89,8 +89,8 @@
virtual void complete(void) {} // No-op for this class.
std::auto_ptr<qpid::sys::Socket> newSocket;
- AsynchAcceptor::Callback callback;
- AsynchAcceptorPrivate *acceptor;
+ qpid::sys::AsynchAcceptor::Callback callback;
+ AsynchAcceptor *acceptor;
SOCKET listener;
// AcceptEx needs a place to write the local and remote addresses
@@ -106,16 +106,16 @@
typedef boost::function1<void, AsynchIoResult *> Completer;
virtual ~AsynchIoResult() {}
- AsynchIO::BufferBase *getBuff(void) const { return iobuff; }
+ qpid::sys::AsynchIO::BufferBase *getBuff(void) const { return iobuff; }
size_t getRequested(void) const { return requested; }
const WSABUF *getWSABUF(void) const { return &wsabuf; }
protected:
- void setBuff (AsynchIO::BufferBase *buffer) { iobuff = buffer; }
+ void setBuff (qpid::sys::AsynchIO::BufferBase *buffer) { iobuff = buffer; }
protected:
AsynchIoResult(Completer cb,
- AsynchIO::BufferBase *buff, size_t length)
+ qpid::sys::AsynchIO::BufferBase *buff, size_t length)
: completionCallback(cb), iobuff(buff), requested(length) {}
virtual void complete(void) = 0;
@@ -123,7 +123,7 @@
Completer completionCallback;
private:
- AsynchIO::BufferBase *iobuff;
+ qpid::sys::AsynchIO::BufferBase *iobuff;
size_t requested; // Number of bytes in original I/O request
};
@@ -137,7 +137,7 @@
public:
AsynchReadResult(AsynchIoResult::Completer cb,
- AsynchIO::BufferBase *buff,
+ qpid::sys::AsynchIO::BufferBase *buff,
size_t length)
: AsynchIoResult(cb, buff, length) {
wsabuf.buf = buff->bytes + buff->dataCount;
@@ -149,7 +149,7 @@
// complete() updates buffer then does completion callback.
virtual void complete(void) {
- AsynchIO::BufferBase *b = getBuff();
+ qpid::sys::AsynchIO::BufferBase *b = getBuff();
b->dataStart += bytes;
b->dataCount -= bytes;
completionCallback(this);
@@ -157,7 +157,7 @@
public:
AsynchWriteResult(AsynchIoResult::Completer cb,
- AsynchIO::BufferBase *buff,
+ qpid::sys::AsynchIO::BufferBase *buff,
size_t length)
: AsynchIoResult(cb, buff, length) {
wsabuf.buf = buff ? buff->bytes : 0;
@@ -188,15 +188,15 @@
public:
AsynchCallbackRequest(AsynchIoResult::Completer cb,
- AsynchIO::RequestCallback reqCb)
+ qpid::sys::AsynchIO::RequestCallback reqCb)
: AsynchIoResult(cb, 0, 0), reqCallback(reqCb) {
wsabuf.buf = 0;
wsabuf.len = 0;
}
- AsynchIO::RequestCallback reqCallback;
+ qpid::sys::AsynchIO::RequestCallback reqCallback;
};
-}}
+}}} // qpid::sys::windows
#endif /*!_windows_asynchIoResult_h*/
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IoHandlePrivate.h Tue Oct 20 17:10:05 2009
@@ -22,6 +22,7 @@
*
*/
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/windows/AsynchIoResult.h"
#include "qpid/CommonImportExport.h"
@@ -40,13 +41,13 @@
class IOHandlePrivate {
public:
IOHandlePrivate(SOCKET f = INVALID_SOCKET,
- AsynchIoResult::Completer cb = 0,
+ windows::AsynchIoResult::Completer cb = 0,
AsynchIO::RequestCallback reqCallback = 0) :
fd(f), event(cb), cbRequest(reqCallback)
{}
SOCKET fd;
- AsynchIoResult::Completer event;
+ windows::AsynchIoResult::Completer event;
AsynchIO::RequestCallback cbRequest;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp Tue Oct 20 17:10:05 2009
@@ -42,11 +42,11 @@
friend class PollerHandle;
SOCKET fd;
- AsynchIoResult::Completer cb;
+ windows::AsynchIoResult::Completer cb;
AsynchIO::RequestCallback cbRequest;
PollerHandlePrivate(SOCKET f,
- AsynchIoResult::Completer cb0 = 0,
+ windows::AsynchIoResult::Completer cb0 = 0,
AsynchIO::RequestCallback rcb = 0)
: fd(f), cb(cb0), cbRequest(rcb)
{
@@ -133,13 +133,14 @@
assert(dir == Poller::INPUT || dir == Poller::OUTPUT);
if (dir == Poller::OUTPUT) {
- AsynchWriteWanted *result = new AsynchWriteWanted(handle.impl->cb);
+ windows::AsynchWriteWanted *result =
+ new windows::AsynchWriteWanted(handle.impl->cb);
PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
}
else {
- AsynchCallbackRequest *result =
- new AsynchCallbackRequest(handle.impl->cb,
- handle.impl->cbRequest);
+ windows::AsynchCallbackRequest *result =
+ new windows::AsynchCallbackRequest(handle.impl->cb,
+ handle.impl->cbRequest);
PostQueuedCompletionStatus(impl->iocp, 0, 0, result->overlapped());
}
}
@@ -155,7 +156,7 @@
DWORD numTransferred = 0;
ULONG_PTR completionKey = 0;
OVERLAPPED *overlapped = 0;
- AsynchResult *result = 0;
+ windows::AsynchResult *result = 0;
// Wait for either an I/O operation to finish (thus signaling the
// IOCP handle) or a shutdown request to be made (thus signaling the
@@ -185,7 +186,7 @@
return Event(0, SHUTDOWN);
}
- result = AsynchResult::from_overlapped(overlapped);
+ result = windows::AsynchResult::from_overlapped(overlapped);
result->success (static_cast<size_t>(numTransferred));
}
else {
@@ -193,7 +194,7 @@
// Dequeued a completion for a failed operation. Downcast back
// to the result object and inform it that the operation failed.
DWORD status = ::GetLastError();
- result = AsynchResult::from_overlapped(overlapped);
+ result = windows::AsynchResult::from_overlapped(overlapped);
result->failure (static_cast<int>(status));
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp Tue Oct 20 17:10:05 2009
@@ -47,8 +47,8 @@
const boost::shared_ptr<sys::Poller>& poller);
~PollableConditionPrivate();
- void poke();
- void dispatch(AsynchIoResult *result);
+ void poke();
+ void dispatch(windows::AsynchIoResult *result);
private:
PollableCondition::Callback cb;
@@ -82,7 +82,7 @@
poller->monitorHandle(ph, Poller::INPUT);
}
-void PollableConditionPrivate::dispatch(AsynchIoResult *result)
+void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result)
{
delete result; // Poller::monitorHandle() allocates this
cb(parent);
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp?rev=827735&r1=827734&r2=827735&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/Socket.cpp Tue Oct 20 17:10:05 2009
@@ -135,7 +135,9 @@
} // namespace
Socket::Socket() :
- IOHandle(new IOHandlePrivate)
+ IOHandle(new IOHandlePrivate),
+ nonblocking(false),
+ nodelay(false)
{
SOCKET& socket = impl->fd;
if (socket != INVALID_SOCKET) Socket::close();
@@ -145,7 +147,9 @@
}
Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h)
+ IOHandle(h),
+ nonblocking(false),
+ nodelay(false)
{}
void
@@ -162,6 +166,7 @@
try {
if (nonblocking) setNonblocking();
+ if (nodelay) setTcpNoDelay();
} catch (std::exception&) {
closesocket(s);
socket = INVALID_SOCKET;
@@ -313,17 +318,16 @@
return result;
}
-void Socket::setTcpNoDelay(bool nodelay) const
+void Socket::setTcpNoDelay() const
{
- if (nodelay) {
- int flag = 1;
- int result = setsockopt(impl->fd,
- IPPROTO_TCP,
- TCP_NODELAY,
- (char *)&flag,
- sizeof(flag));
- QPID_WINSOCK_CHECK(result);
- }
+ int flag = 1;
+ int result = setsockopt(impl->fd,
+ IPPROTO_TCP,
+ TCP_NODELAY,
+ (char *)&flag,
+ sizeof(flag));
+ QPID_WINSOCK_CHECK(result);
+ nodelay = true;
}
}} // namespace qpid::sys
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org