You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/07/27 19:19:31 UTC
svn commit: r560323 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/client/
qpid/sys/ qpid/sys/apr/ qpid/sys/epoll/ qpid/sys/posix/ tests/
Author: astitcher
Date: Fri Jul 27 10:19:30 2007
New Revision: 560323
URL: http://svn.apache.org/viewvc?view=rev&rev=560323
Log:
* Asynchronous network IO subsystem
- This is now implemented such that it very nearly only depends on the platform
code (Socker & Poller), this is not 100% true at present, but should be simple
to finish.
- This is still not the default (use "./configure --disable-apr-netio" to get it)
- Interrupting the broker gives a known error
- Default for number of broker io threads is not correct (needs to be number of CPUs -
it will run slower with too many io threads)
* EventChannel code
- Deleted all EventChannel code as it's entirely superceded by this new shiny code ;-)
* Rearranged the platform Socket implementations a bit for better abstraction
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PosixAcceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jul 27 10:19:30 2007
@@ -58,18 +58,12 @@
apr_plat_hdr = \
qpid/sys/apr/Condition.h \
qpid/sys/apr/Mutex.h \
- qpid/sys/apr/Socket.h \
qpid/sys/apr/Thread.h
posix_netio_src = \
- qpid/sys/posix/EventChannel.cpp \
- qpid/sys/posix/EventChannelAcceptor.cpp \
- qpid/sys/posix/EventChannelConnection.cpp \
- qpid/sys/posix/EventChannelThreads.cpp
+ qpid/sys/AsynchIOAcceptor.cpp
-posix_netio_hdr = \
- qpid/sys/posix/EventChannel.h \
- qpid/sys/posix/EventChannelThreads.h
+posix_netio_hdr =
posix_plat_src = \
qpid/sys/Dispatcher.cpp \
@@ -86,7 +80,6 @@
qpid/sys/posix/Condition.h \
qpid/sys/posix/PrivatePosix.h \
qpid/sys/posix/Mutex.h \
- qpid/sys/posix/Socket.h \
qpid/sys/posix/Thread.h
if USE_APR_NETIO
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Fri Jul 27 10:19:30 2007
@@ -54,7 +54,6 @@
}
void Connector::connect(const std::string& host, int port){
- socket = Socket::createTcp();
socket.connect(host, port);
closed = false;
receiver = Thread(this);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Fri Jul 27 10:19:30 2007
@@ -37,9 +37,11 @@
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
- virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0;
+ virtual void run(ConnectionInputHandlerFactory* factory) = 0;
virtual void shutdown() = 0;
};
+
+inline Acceptor::~Acceptor() {}
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Fri Jul 27 10:19:30 2007
@@ -35,14 +35,14 @@
*/
class AsynchAcceptor {
public:
- typedef boost::function1<void, int> Callback;
+ typedef boost::function1<void, const Socket&> Callback;
private:
Callback acceptedCallback;
DispatchHandle handle;
public:
- AsynchAcceptor(int fd, Callback callback);
+ AsynchAcceptor(const Socket& s, Callback callback);
void start(Poller::shared_ptr poller);
private:
@@ -75,9 +75,9 @@
bytes(b),
byteCount(s),
dataStart(0),
- dataCount(s)
+ dataCount(0)
{}
-
+
virtual ~Buffer()
{}
};
@@ -85,6 +85,7 @@
typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
typedef boost::function1<void, AsynchIO&> EofCallback;
typedef boost::function1<void, AsynchIO&> DisconnectCallback;
+ typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
typedef boost::function1<void, AsynchIO&> IdleCallback;
@@ -92,26 +93,33 @@
ReadCallback readCallback;
EofCallback eofCallback;
DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
BuffersEmptyCallback emptyCallback;
IdleCallback idleCallback;
std::deque<Buffer*> bufferQueue;
std::deque<Buffer*> writeQueue;
+ bool queuedClose;
public:
- AsynchIO(int fd,
+ AsynchIO(const Socket& s,
ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+ ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
void queueForDeletion();
void start(Poller::shared_ptr poller);
void queueReadBuffer(Buffer* buff);
- void queueWrite(Buffer* buff);
+ void queueWrite(Buffer* buff = 0);
+ void unread(Buffer* buff);
+ void queueWriteClose();
+ Buffer* getQueuedBuffer();
+ const Socket& getSocket() const { return DispatchHandle::getSocket(); }
private:
~AsynchIO();
void readable(DispatchHandle& handle);
void writeable(DispatchHandle& handle);
void disconnected(DispatchHandle& handle);
+ void close(DispatchHandle& handle);
};
}}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?view=auto&rev=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Fri Jul 27 10:19:30 2007
@@ -0,0 +1,308 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Acceptor.h"
+
+#include "Socket.h"
+#include "AsynchIO.h"
+#include "Mutex.h"
+#include "Thread.h"
+
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+#include <boost/assert.hpp>
+#include <queue>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class AsynchIOAcceptor : public Acceptor {
+ Poller::shared_ptr poller;
+ Socket listener;
+ int numIOThreads;
+ const uint16_t listeningPort;
+
+public:
+ AsynchIOAcceptor(int16_t port, int backlog, int threads, bool trace);
+ ~AsynchIOAcceptor() {}
+ void run(ConnectionInputHandlerFactory* factory);
+ void shutdown();
+
+ uint16_t getPort() const;
+ std::string getHost() const;
+
+private:
+ void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
+};
+
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
+{
+ return
+ Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads, trace));
+}
+
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads, bool) :
+ poller(new Poller),
+ numIOThreads(threads),
+ listeningPort(listener.listen(port, backlog))
+{}
+
+// Buffer definition
+struct Buff : public AsynchIO::Buffer {
+ Buff() :
+ AsynchIO::Buffer(new char[65536], 65536)
+ {}
+ ~Buff()
+ { delete [] bytes;}
+};
+
+class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
+ AsynchIO* aio;
+ ConnectionInputHandler* inputHandler;
+ std::queue<framing::AMQFrame> frameQueue;
+ Mutex frameQueueLock;
+ bool frameQueueClosed;
+ bool initiated;
+
+public:
+ AsynchIOHandler() :
+ inputHandler(0),
+ frameQueueClosed(false),
+ initiated(false)
+ {}
+
+ ~AsynchIOHandler() {
+ if (inputHandler)
+ inputHandler->closed();
+ delete inputHandler;
+ }
+
+ void init(AsynchIO* a, ConnectionInputHandler* h) {
+ aio = a;
+ inputHandler = h;
+ }
+
+ // Output side
+ void send(framing::AMQFrame&);
+ void close();
+
+ // Input side
+ void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff);
+ void eof(AsynchIO& aio);
+ void disconnect(AsynchIO& aio);
+
+ // Notifications
+ void nobuffs(AsynchIO& aio);
+ void idle(AsynchIO& aio);
+ void closedSocket(AsynchIO& aio, const Socket& s);
+};
+
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
+
+ AsynchIOHandler* async = new AsynchIOHandler;
+ ConnectionInputHandler* handler = f->create(async);
+ AsynchIO* aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, handler);
+
+ // Give connection some buffers to use
+ for (int i = 0; i < 4; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
+ aio->start(poller);
+}
+
+
+uint16_t AsynchIOAcceptor::getPort() const {
+ return listeningPort; // Immutable no need for lock.
+}
+
+std::string AsynchIOAcceptor::getHost() const {
+ return listener.getSockname();
+}
+
+void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
+ Dispatcher d(poller);
+ AsynchAcceptor
+ acceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
+ acceptor.start(poller);
+
+ std::vector<Thread*> t(numIOThreads-1);
+
+ // Run n-1 io threads
+ for (int i=0; i<numIOThreads-1; ++i)
+ t[i] = new Thread(d);
+
+ // Run final thread
+ d.run();
+
+ // Now wait for n-1 io threads to exit
+ for (int i=0; i>numIOThreads-1; ++i) {
+ t[i]->join();
+ delete t[i];
+ }
+}
+
+void AsynchIOAcceptor::shutdown() {
+ poller->shutdown();
+}
+
+// Output side
+void AsynchIOHandler::send(framing::AMQFrame& frame) {
+ // TODO: Need to find out if we are in the callback context,
+ // in the callback thread if so we can go further than just queuing the frame
+ // to be handled later
+ {
+ ScopedLock<Mutex> l(frameQueueLock);
+ // Ignore anything seen after closing
+ if (!frameQueueClosed)
+ frameQueue.push(frame);
+ }
+
+ // Activate aio for writing here
+ aio->queueWrite();
+}
+
+void AsynchIOHandler::close() {
+ ScopedLock<Mutex> l(frameQueueLock);
+ frameQueueClosed = true;
+}
+
+// Input side
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ if(initiated){
+ framing::AMQFrame frame;
+ try{
+ while(frame.decode(in)) {
+ QPID_LOG(debug, "RECV: " << frame);
+ inputHandler->received(frame);
+ }
+ }catch(const std::exception& e){
+ QPID_LOG(error, e.what());
+ }
+ }else{
+ framing::ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ QPID_LOG(debug, "INIT [" << aio << "]");
+ inputHandler->initiated(protocolInit);
+ initiated = true;
+ }
+ }
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (in.available() != 0) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += buff->dataCount-in.available();
+ buff->dataCount = in.available();
+ aio->unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio->queueReadBuffer(buff);
+ }
+}
+
+void AsynchIOHandler::eof(AsynchIO&) {
+ inputHandler->closed();
+ aio->queueWriteClose();
+}
+
+void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
+ delete &s;
+ aio->queueForDeletion();
+ delete this;
+}
+
+void AsynchIOHandler::disconnect(AsynchIO& a) {
+ // treat the same as eof
+ eof(a);
+}
+
+// Notifications
+void AsynchIOHandler::nobuffs(AsynchIO&) {
+}
+
+void AsynchIOHandler::idle(AsynchIO&){
+ ScopedLock<Mutex> l(frameQueueLock);
+
+ if (frameQueue.empty()) {
+ // At this point we know that we're write idling the connection
+ // so we could note that somewhere or do something special
+ return;
+ }
+
+ // Try and get a queued buffer if not then construct new one
+ AsynchIO::Buffer* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ std::auto_ptr<framing::Buffer> out(new framing::Buffer(buff->bytes, buff->byteCount));
+ int buffUsed = 0;
+
+ while (!frameQueue.empty()) {
+ framing::AMQFrame frame = frameQueue.front();
+ frameQueue.pop();
+
+ // Encode output frame
+ int frameSize = frame.size();
+ if (frameSize > buff->byteCount)
+ THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+
+ // If we've filled current buffer then flush and get new one
+ if (frameSize > int(out->available())) {
+ buff->dataCount = buffUsed;
+ aio->queueWrite(buff);
+
+ buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ out.reset(new framing::Buffer(buff->bytes, buff->byteCount));
+ buffUsed = 0;
+ }
+
+ frame.encode(*out);
+ buffUsed += frameSize;
+ QPID_LOG(debug, "SENT: " << frame);
+ }
+
+ buff->dataCount = buffUsed;
+ aio->queueWrite(buff);
+
+ if (frameQueueClosed) {
+ aio->queueWriteClose();
+ }
+
+}
+
+}} // namespace qpid::sys
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Fri Jul 27 10:19:30 2007
@@ -94,10 +94,11 @@
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_W:
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = r ?
(w ? DELAYED_RW : DELAYED_R) :
DELAYED_W;
@@ -132,6 +133,7 @@
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_RW:
@@ -140,7 +142,7 @@
case DELAYED_W:
state = DELAYED_RW;
break;
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = DELAYED_R;
break;
case ACTIVE_R:
@@ -168,6 +170,7 @@
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_W:
case DELAYED_RW:
@@ -176,7 +179,7 @@
case DELAYED_R:
state = DELAYED_RW;
break;
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = DELAYED_W;
break;
case INACTIVE:
@@ -204,15 +207,16 @@
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
- state = CALLBACK;
+ state = DELAYED_INACTIVE;
break;
case DELAYED_RW:
state = DELAYED_W;
break;
case DELAYED_W:
- case CALLBACK:
+ case DELAYED_INACTIVE:
case DELAYED_DELETE:
break;
case ACTIVE_R:
@@ -239,15 +243,16 @@
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_W:
- state = CALLBACK;
+ state = DELAYED_INACTIVE;
break;
case DELAYED_RW:
state = DELAYED_R;
break;
case DELAYED_R:
- case CALLBACK:
+ case DELAYED_INACTIVE:
case DELAYED_DELETE:
break;
case ACTIVE_W:
@@ -270,12 +275,13 @@
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_W:
case DELAYED_RW:
- case CALLBACK:
- state = CALLBACK;
+ case DELAYED_INACTIVE:
+ state = DELAYED_INACTIVE;
break;
case DELAYED_DELETE:
break;
@@ -289,32 +295,46 @@
void DispatchHandle::stopWatch() {
ScopedLock<Mutex> lock(stateLock);
- if ( state == IDLE) {
+ switch (state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ case DELAYED_DELETE:
return;
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ state = DELAYED_IDLE;
+ break;
+ default:
+ state = IDLE;
+ break;
}
assert(poller);
poller->delFd(*this);
poller.reset();
- state = IDLE;
}
// The slightly strange switch structure
// is to ensure that the lock is released before
// we do the delete
void DispatchHandle::doDelete() {
+ // Ensure that we're no longer watching anything
+ stopWatch();
+
// If we're in the middle of a callback defer the delete
{
ScopedLock<Mutex> lock(stateLock);
switch (state) {
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case CALLBACK:
+ case DELAYED_IDLE:
case DELAYED_DELETE:
state = DELAYED_DELETE;
return;
+ case IDLE:
+ break;
default:
- break;
+ // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
+ assert(false);
}
}
// If we're not then do it right away
@@ -359,7 +379,7 @@
case Poller::DISCONNECTED:
{
ScopedLock<Mutex> lock(stateLock);
- state = CALLBACK;
+ state = DELAYED_INACTIVE;
}
if (disconnectedCallback) {
disconnectedCallback(*this);
@@ -386,10 +406,11 @@
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
return;
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = INACTIVE;
return;
- case IDLE:
+ case DELAYED_IDLE:
+ state = IDLE;
return;
default:
// This should be impossible
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Fri Jul 27 10:19:30 2007
@@ -49,12 +49,13 @@
Mutex stateLock;
enum {
IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
- CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW, DELAYED_DELETE
+ DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
+ DELAYED_DELETE
} state;
public:
- DispatchHandle(int fd, Callback rCb, Callback wCb, Callback dCb) :
- PollerHandle(fd),
+ DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) :
+ PollerHandle(s),
readableCallback(rCb),
writableCallback(wCb),
disconnectedCallback(dCb),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Fri Jul 27 10:19:30 2007
@@ -23,6 +23,7 @@
*/
#include "Time.h"
+#include "Socket.h"
#include <stdint.h>
@@ -39,14 +40,14 @@
class PollerHandle {
friend class Poller;
- PollerHandlePrivate* impl;
- const int fd;
+ PollerHandlePrivate* const impl;
+ const Socket& socket;
public:
- PollerHandle(int fd0);
+ PollerHandle(const Socket& s);
virtual ~PollerHandle();
-
- int getFD() const { return fd; }
+
+ const Socket& getSocket() const {return socket;}
};
/**
@@ -55,7 +56,7 @@
*/
class PollerPrivate;
class Poller {
- PollerPrivate* impl;
+ PollerPrivate* const impl;
public:
typedef boost::shared_ptr<Poller> shared_ptr;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Fri Jul 27 10:19:30 2007
@@ -21,11 +21,74 @@
* under the License.
*
*/
+#include <string>
+#include "qpid/sys/Time.h"
-#ifdef USE_APR_PLATFORM
-#include "apr/Socket.h"
-#else
-#include "posix/Socket.h"
-#endif
+struct sockaddr;
+namespace qpid {
+namespace sys {
+
+class SocketPrivate;
+class Socket
+{
+ friend class Poller;
+
+ SocketPrivate* const impl;
+
+public:
+ /** Create a socket wrapper for descriptor. */
+ Socket();
+ ~Socket();
+
+ /** Create an initialized TCP socket */
+ void createTcp() const;
+
+ /** Set timeout for read and write */
+ void setTimeout(const Duration& interval) const;
+
+ /** Set socket non blocking */
+ void setNonblocking() const;
+
+ void connect(const std::string& host, int port) const;
+
+ void close() const;
+
+ enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
+
+ /** Returns bytes sent or an ErrorCode value < 0. */
+ ssize_t send(const void* data, size_t size) const;
+
+ /**
+ * Returns bytes received, an ErrorCode value < 0 or 0
+ * if the connection closed in an orderly manner.
+ */
+ ssize_t recv(void* data, size_t size) const;
+
+ /** Bind to a port and start listening.
+ *@param port 0 means choose an available port.
+ *@param backlog maximum number of pending connections.
+ *@return The bound port.
+ */
+ int listen(int port = 0, int backlog = 10) const;
+
+ /** Returns the "socket name" ie the address bound to
+ * the near end of the socket
+ */
+ std::string getSockname() const;
+
+ /** Accept a connection from a socket that is already listening
+ * and has an incoming connection
+ */
+ Socket* accept(struct sockaddr *addr, socklen_t *addrlen) const;
+
+ // TODO The following are raw operations, maybe they need better wrapping?
+ int read(void *buf, size_t count) const;
+ int write(const void *buf, size_t count) const;
+
+private:
+ Socket(SocketPrivate*);
+};
+
+}}
#endif /*!_sys_Socket_h*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp Fri Jul 27 10:19:30 2007
@@ -56,8 +56,6 @@
{
return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
}
-// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
port(port_),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp Fri Jul 27 10:19:30 2007
@@ -20,31 +20,56 @@
*/
-#include "Socket.h"
+#include "qpid/sys/Socket.h"
+
#include "APRBase.h"
#include "APRPool.h"
+#include <apr_network_io.h>
+
+namespace qpid {
+namespace sys {
+
+class SocketPrivate {
+public:
+ SocketPrivate(apr_socket_t* s = 0) :
+ socket(s)
+ {}
+
+ apr_socket_t* socket;
+};
+
+Socket::Socket() :
+ impl(new SocketPrivate)
+{
+ createTcp();
+}
+
+Socket::Socket(SocketPrivate* sp) :
+ impl(sp)
+{}
-using namespace qpid::sys;
+Socket::~Socket() {
+ delete impl;
+}
-Socket Socket::createTcp() {
- Socket s;
+void Socket::createTcp() const {
+ apr_socket_t*& socket = impl->socket;
+ apr_socket_t* s;
CHECK_APR_SUCCESS(
apr_socket_create(
- &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+ &s, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
APRPool::get()));
- return s;
-}
-
-Socket::Socket(apr_socket_t* s) {
socket = s;
}
-void Socket::setTimeout(const Duration& interval) {
+void Socket::setTimeout(const Duration& interval) const {
+ apr_socket_t*& socket = impl->socket;
apr_socket_timeout_set(socket, interval/TIME_USEC);
}
-void Socket::connect(const std::string& host, int port) {
+void Socket::connect(const std::string& host, int port) const {
+ apr_socket_t*& socket = impl->socket;
apr_sockaddr_t* address;
CHECK_APR_SUCCESS(
apr_sockaddr_info_get(
@@ -53,14 +78,16 @@
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
}
-void Socket::close() {
+void Socket::close() const {
+ apr_socket_t*& socket = impl->socket;
if (socket == 0) return;
CHECK_APR_SUCCESS(apr_socket_close(socket));
socket = 0;
}
-ssize_t Socket::send(const void* data, size_t size)
+ssize_t Socket::send(const void* data, size_t size) const
{
+ apr_socket_t*& socket = impl->socket;
apr_size_t sent = size;
apr_status_t status =
apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent);
@@ -70,8 +97,9 @@
return sent;
}
-ssize_t Socket::recv(void* data, size_t size)
+ssize_t Socket::recv(void* data, size_t size) const
{
+ apr_socket_t*& socket = impl->socket;
apr_size_t received = size;
apr_status_t status =
apr_socket_recv(socket, reinterpret_cast<char*>(data), &received);
@@ -83,4 +111,4 @@
return received;
}
-
+}} // namespace qpid::sys
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Fri Jul 27 10:19:30 2007
@@ -22,6 +22,7 @@
#include "qpid/sys/Poller.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/posix/check.h"
+#include "qpid/sys/posix/PrivatePosix.h"
#include <sys/epoll.h>
#include <errno.h>
@@ -88,11 +89,11 @@
}
};
-PollerHandle::PollerHandle(int fd0) :
+PollerHandle::PollerHandle(const Socket& s) :
impl(new PollerHandlePrivate),
- fd(fd0)
+ socket(s)
{}
-
+
PollerHandle::~PollerHandle() {
delete impl;
}
@@ -186,7 +187,7 @@
}
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, handle.getFD(), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, toFd(handle.socket.impl), &epe));
// Record monitoring state of this fd
eh.events = epe.events;
@@ -197,7 +198,7 @@
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0);
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, toFd(handle.socket.impl), 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
@@ -216,7 +217,7 @@
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
// Record monitoring state of this fd
eh.events = epe.events;
@@ -232,7 +233,7 @@
epe.events = eh.events;
epe.data.ptr = &handle;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
eh.setActive();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Jul 27 10:19:30 2007
@@ -24,8 +24,6 @@
#include "check.h"
#include <unistd.h>
-#include <fcntl.h>
-#include <sys/types.h>
#include <sys/socket.h>
#include <signal.h>
#include <errno.h>
@@ -37,13 +35,6 @@
namespace {
/*
- * Make file descriptor non-blocking
- */
-void nonblocking(int fd) {
- QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
-}
-
-/*
* Make *process* not generate SIGPIPE when writing to closed
* pipe/socket (necessary as default action is to terminate process)
*/
@@ -57,11 +48,11 @@
* Asynch Acceptor
*/
-AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
acceptedCallback(callback),
- handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
+ handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
- nonblocking(fd);
+ s.setNonblocking();
ignoreSigpipe();
}
@@ -73,18 +64,16 @@
* We keep on accepting as long as there is something to accept
*/
void AsynchAcceptor::readable(DispatchHandle& h) {
- int afd;
+ Socket* s;
do {
errno = 0;
// TODO: Currently we ignore the peers address, perhaps we should
// log it or use it for connection acceptance.
- afd = ::accept(h.getFD(), 0, 0);
- if (afd >= 0) {
- acceptedCallback(afd);
- } else if (errno == EAGAIN) {
- break;
+ s = h.getSocket().accept(0, 0);
+ if (s) {
+ acceptedCallback(*s);
} else {
- QPID_POSIX_CHECK(afd);
+ break;
}
} while (true);
@@ -94,21 +83,23 @@
/*
* Asynch reader/writer
*/
-AsynchIO::AsynchIO(int fd,
+AsynchIO::AsynchIO(const Socket& s,
ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- BuffersEmptyCallback eCb, IdleCallback iCb) :
+ ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
- DispatchHandle(fd,
+ DispatchHandle(s,
boost::bind(&AsynchIO::readable, this, _1),
boost::bind(&AsynchIO::writeable, this, _1),
boost::bind(&AsynchIO::disconnected, this, _1)),
readCallback(rCb),
eofCallback(eofCb),
disCallback(disCb),
+ closedCallback(cCb),
emptyCallback(eCb),
- idleCallback(iCb) {
+ idleCallback(iCb),
+ queuedClose(false) {
- nonblocking(fd);
+ s.setNonblocking();
}
struct deleter
@@ -131,15 +122,58 @@
}
void AsynchIO::queueReadBuffer(Buffer* buff) {
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ bufferQueue.push_back(buff);
+ DispatchHandle::rewatchRead();
+}
+
+void AsynchIO::unread(Buffer* buff) {
+ assert(buff);
+ if (buff->dataStart != 0) {
+ memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+ buff->dataStart = 0;
+ }
bufferQueue.push_front(buff);
DispatchHandle::rewatchRead();
}
+// Either queue for writing or announce that there is something to write
+// and we should ask for it
void AsynchIO::queueWrite(Buffer* buff) {
- writeQueue.push_front(buff);
+ // If no buffer then don't queue anything
+ // (but still wake up for writing)
+ if (buff) {
+ // If we've already closed the socket then throw the write away
+ if (queuedClose) {
+ bufferQueue.push_front(buff);
+ return;
+ } else {
+ writeQueue.push_front(buff);
+ }
+ }
DispatchHandle::rewatchWrite();
}
+void AsynchIO::queueWriteClose() {
+ queuedClose = true;
+}
+
+/** Return a queued buffer if there are enough
+ * to spare
+ */
+AsynchIO::Buffer* AsynchIO::getQueuedBuffer() {
+ // Always keep at least one buffer (it might have data that was "unread" in it)
+ if (bufferQueue.size()<=1)
+ return 0;
+ Buffer* buff = bufferQueue.back();
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ bufferQueue.pop_back();
+ return buff;
+}
+
/*
* We keep on reading as long as we have something to read and a buffer to put
* it in
@@ -149,19 +183,19 @@
// (Try to) get a buffer
if (!bufferQueue.empty()) {
// Read into buffer
- Buffer* buff = bufferQueue.back();
- bufferQueue.pop_back();
+ Buffer* buff = bufferQueue.front();
+ bufferQueue.pop_front();
errno = 0;
- int rc = ::read(h.getFD(), buff->bytes, buff->byteCount);
+ int readCount = buff->byteCount-buff->dataCount;
+ int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount);
if (rc == 0) {
eofCallback(*this);
h.unwatchRead();
return;
} else if (rc > 0) {
- buff->dataStart = 0;
- buff->dataCount = rc;
+ buff->dataCount += rc;
readCallback(*this, buff);
- if (rc != buff->byteCount) {
+ if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
return;
}
@@ -209,7 +243,7 @@
writeQueue.pop_back();
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
- int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount);
+ int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
// If we didn't write full buffer put rest back
if (rc != buff->dataCount) {
@@ -238,12 +272,17 @@
}
}
} else {
+ // If we're waiting to close the socket then can do it now as there is nothing to write
+ if (queuedClose) {
+ close(h);
+ return;
+ }
// Fd is writable, but nothing to write
if (idleCallback) {
idleCallback(*this);
}
// If we still have no buffers to write we can't do anything more
- if (writeQueue.empty()) {
+ if (writeQueue.empty() && !queuedClose) {
h.unwatchWrite();
return;
}
@@ -252,8 +291,25 @@
}
void AsynchIO::disconnected(DispatchHandle& h) {
+ // If we've already queued close do it before callback
+ if (queuedClose) {
+ close(h);
+ }
+
if (disCallback) {
disCallback(*this);
h.unwatch();
}
}
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(DispatchHandle& h) {
+ h.stopWatch();
+ h.getSocket().close();
+ if (closedCallback) {
+ closedCallback(*this, getSocket());
+ }
+}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h Fri Jul 27 10:19:30 2007
@@ -30,9 +30,14 @@
namespace qpid {
namespace sys {
+// Private Time related implementation details
struct timespec& toTimespec(struct timespec& ts, const Duration& t);
struct timeval& toTimeval(struct timeval& tv, const Duration& t);
Duration toTime(const struct timespec& ts);
+
+// Private socket related implementation details
+class SocketPrivate;
+int toFd(const SocketPrivate* s);
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Fri Jul 27 10:19:30 2007
@@ -25,6 +25,8 @@
#include "check.h"
#include "PrivatePosix.h"
+#include <fcntl.h>
+#include <sys/types.h>
#include <sys/socket.h>
#include <sys/errno.h>
#include <netinet/in.h>
@@ -32,30 +34,63 @@
#include <boost/format.hpp>
-using namespace qpid::sys;
+namespace qpid {
+namespace sys {
-Socket Socket::createTcp()
+class SocketPrivate {
+public:
+ SocketPrivate(int f = -1) :
+ fd(f)
+ {}
+
+ int fd;
+};
+
+Socket::Socket() :
+ impl(new SocketPrivate)
+{
+ createTcp();
+}
+
+Socket::Socket(SocketPrivate* sp) :
+ impl(sp)
+{}
+
+Socket::~Socket() {
+ delete impl;
+}
+
+void Socket::createTcp() const
{
+ int& socket = impl->fd;
+ if (socket != -1) Socket::close();
int s = ::socket (PF_INET, SOCK_STREAM, 0);
if (s < 0) throw QPID_POSIX_ERROR(errno);
- return s;
+ socket = s;
}
-Socket::Socket(int descriptor) : socket(descriptor) {}
-
-void Socket::setTimeout(const Duration& interval)
+void Socket::setTimeout(const Duration& interval) const
{
+ const int& socket = impl->fd;
struct timeval tv;
toTimeval(tv, interval);
setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
}
-void Socket::connect(const std::string& host, int port)
+void Socket::setNonblocking() const {
+ QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
+}
+
+
+void Socket::connect(const std::string& host, int port) const
{
+ const int& socket = impl->fd;
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(port);
+ // TODO: Be good to make this work for IPv6 as well as IPv4
+ // Use more modern lookup functions
struct hostent* hp = gethostbyname ( host.c_str() );
if (hp == 0) throw QPID_POSIX_ERROR(errno);
memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
@@ -64,16 +99,18 @@
}
void
-Socket::close()
+Socket::close() const
{
- if (socket == 0) return;
+ int& socket = impl->fd;
+ if (socket == -1) return;
if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
- socket = 0;
+ socket = -1;
}
ssize_t
-Socket::send(const void* data, size_t size)
+Socket::send(const void* data, size_t size) const
{
+ const int& socket = impl->fd;
ssize_t sent = ::send(socket, data, size, 0);
if (sent < 0) {
if (errno == ECONNRESET) return SOCKET_EOF;
@@ -84,8 +121,9 @@
}
ssize_t
-Socket::recv(void* data, size_t size)
+Socket::recv(void* data, size_t size) const
{
+ const int& socket = impl->fd;
ssize_t received = ::recv(socket, data, size, 0);
if (received < 0) {
if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
@@ -94,8 +132,9 @@
return received;
}
-int Socket::listen(int port, int backlog)
+int Socket::listen(int port, int backlog) const
{
+ const int& socket = impl->fd;
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(port);
@@ -111,8 +150,45 @@
return ntohs(name.sin_port);
}
+
+Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
+{
+ int afd = ::accept(impl->fd, addr, addrlen);
+ if ( afd >= 0)
+ return new Socket(new SocketPrivate(afd));
+ else if (errno == EAGAIN)
+ return 0;
+ else throw QPID_POSIX_ERROR(errno);
+}
+
+int Socket::read(void *buf, size_t count) const
+{
+ return ::read(impl->fd, buf, count);
+}
+
+int Socket::write(const void *buf, size_t count) const
+{
+ return ::write(impl->fd, buf, count);
+}
+
+std::string Socket::getSockname() const
+{
+ ::sockaddr_storage name; // big enough for any socket address
+ ::socklen_t namelen = sizeof(name);
+
+ const int& socket = impl->fd;
+ if (::getsockname(socket, (::sockaddr*)&name, &namelen) < 0)
+ throw QPID_POSIX_ERROR(errno);
-int Socket::fd() const
+ char dispName[NI_MAXHOST];
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
+ throw QPID_POSIX_ERROR(rc);
+ return dispName;
+}
+
+int toFd(const SocketPrivate* s)
{
- return socket;
+ return s->fd;
}
+
+}} // namespace qpid::sys
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default Fri Jul 27 10:19:30 2007
@@ -7,4 +7,12 @@
obj:*/libcpg.so.2.0.0
}
+{
+ Uninitialised value problem in dlopen
+ Memcheck:Cond
+ fun:_dl_relocate_object
+ fun:*dl_*
+ obj:/lib64/ld-2.6.so
+ obj:*
+}