You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/07/27 19:19:31 UTC

svn commit: r560323 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/client/ qpid/sys/ qpid/sys/apr/ qpid/sys/epoll/ qpid/sys/posix/ tests/

Author: astitcher
Date: Fri Jul 27 10:19:30 2007
New Revision: 560323

URL: http://svn.apache.org/viewvc?view=rev&rev=560323
Log:
* Asynchronous network IO subsystem
- This is now implemented such that it very nearly only depends on the platform
  code (Socker & Poller), this is not 100% true at present, but should be simple
  to finish.
- This is still not the default (use "./configure --disable-apr-netio" to get it)
- Interrupting the broker gives a known error
- Default for number of broker io threads is not correct (needs to be number of CPUs -
  it will run slower with too many io threads)

* EventChannel code
- Deleted all EventChannel code as it's entirely superceded by this new shiny code ;-)

* Rearranged the platform Socket implementations a bit for better abstraction

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelConnection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/EventChannelThreads.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PosixAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jul 27 10:19:30 2007
@@ -58,18 +58,12 @@
 apr_plat_hdr = \
   qpid/sys/apr/Condition.h \
   qpid/sys/apr/Mutex.h \
-  qpid/sys/apr/Socket.h \
   qpid/sys/apr/Thread.h
 
 posix_netio_src = \
-  qpid/sys/posix/EventChannel.cpp \
-  qpid/sys/posix/EventChannelAcceptor.cpp \
-  qpid/sys/posix/EventChannelConnection.cpp \
-  qpid/sys/posix/EventChannelThreads.cpp
+  qpid/sys/AsynchIOAcceptor.cpp
 
-posix_netio_hdr = \
-  qpid/sys/posix/EventChannel.h \
-  qpid/sys/posix/EventChannelThreads.h
+posix_netio_hdr = 
 
 posix_plat_src = \
   qpid/sys/Dispatcher.cpp \
@@ -86,7 +80,6 @@
   qpid/sys/posix/Condition.h \
   qpid/sys/posix/PrivatePosix.h \
   qpid/sys/posix/Mutex.h \
-  qpid/sys/posix/Socket.h \
   qpid/sys/posix/Thread.h
 
 if USE_APR_NETIO

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Fri Jul 27 10:19:30 2007
@@ -54,7 +54,6 @@
 }
 
 void Connector::connect(const std::string& host, int port){
-    socket = Socket::createTcp();
     socket.connect(host, port);
     closed = false;
     receiver = Thread(this);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Fri Jul 27 10:19:30 2007
@@ -37,9 +37,11 @@
     virtual ~Acceptor() = 0;
     virtual uint16_t getPort() const = 0;
     virtual std::string getHost() const = 0;
-    virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory) = 0;
+    virtual void run(ConnectionInputHandlerFactory* factory) = 0;
     virtual void shutdown() = 0;
 };
+
+inline Acceptor::~Acceptor() {}
 
 }}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Fri Jul 27 10:19:30 2007
@@ -35,14 +35,14 @@
  */
 class AsynchAcceptor {
 public:
-    typedef boost::function1<void, int> Callback;
+    typedef boost::function1<void, const Socket&> Callback;
 
 private:
     Callback acceptedCallback;
     DispatchHandle handle;
 
 public:
-    AsynchAcceptor(int fd, Callback callback);
+    AsynchAcceptor(const Socket& s, Callback callback);
     void start(Poller::shared_ptr poller);
 
 private:
@@ -75,9 +75,9 @@
             bytes(b),
             byteCount(s),
             dataStart(0),
-            dataCount(s)
+            dataCount(0)
         {}
-
+        
         virtual ~Buffer()
         {}
     };
@@ -85,6 +85,7 @@
     typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
     typedef boost::function1<void, AsynchIO&> EofCallback;
     typedef boost::function1<void, AsynchIO&> DisconnectCallback;
+    typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
     typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
     typedef boost::function1<void, AsynchIO&> IdleCallback;
 
@@ -92,26 +93,33 @@
     ReadCallback readCallback;
     EofCallback eofCallback;
     DisconnectCallback disCallback;
+    ClosedCallback closedCallback;
     BuffersEmptyCallback emptyCallback;
     IdleCallback idleCallback;
     std::deque<Buffer*> bufferQueue;
     std::deque<Buffer*> writeQueue;
+    bool queuedClose;
 
 public:
-    AsynchIO(int fd,
+    AsynchIO(const Socket& s,
         ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
-        BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+        ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
     void queueForDeletion();
 
     void start(Poller::shared_ptr poller);
     void queueReadBuffer(Buffer* buff);
-    void queueWrite(Buffer* buff);
+    void queueWrite(Buffer* buff = 0);
+    void unread(Buffer* buff);
+    void queueWriteClose();
+    Buffer* getQueuedBuffer();
+    const Socket& getSocket() const { return DispatchHandle::getSocket(); }
 
 private:
     ~AsynchIO();
     void readable(DispatchHandle& handle);
     void writeable(DispatchHandle& handle);
     void disconnected(DispatchHandle& handle);
+    void close(DispatchHandle& handle);
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?view=auto&rev=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Fri Jul 27 10:19:30 2007
@@ -0,0 +1,308 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Acceptor.h"
+
+#include "Socket.h"
+#include "AsynchIO.h"
+#include "Mutex.h"
+#include "Thread.h"
+
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+#include <boost/assert.hpp>
+#include <queue>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class AsynchIOAcceptor : public Acceptor {
+	Poller::shared_ptr poller;
+	Socket listener;
+	int numIOThreads;
+	const uint16_t listeningPort;
+
+public:
+    AsynchIOAcceptor(int16_t port, int backlog, int threads, bool trace);
+    ~AsynchIOAcceptor() {}
+    void run(ConnectionInputHandlerFactory* factory);
+    void shutdown();
+        
+    uint16_t getPort() const;
+    std::string getHost() const;
+
+private:
+    void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
+};
+
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
+{
+    return
+    	Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads, trace));
+}
+
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads, bool) :
+ 	poller(new Poller),
+ 	numIOThreads(threads),
+ 	listeningPort(listener.listen(port, backlog))
+{}
+
+// Buffer definition
+struct Buff : public AsynchIO::Buffer {
+    Buff() :
+        AsynchIO::Buffer(new char[65536], 65536)
+    {}
+    ~Buff()
+    { delete [] bytes;}
+};
+
+class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
+	AsynchIO* aio;
+	ConnectionInputHandler* inputHandler;
+	std::queue<framing::AMQFrame> frameQueue;
+	Mutex frameQueueLock;
+	bool frameQueueClosed;
+	bool initiated;
+	
+public:
+	AsynchIOHandler() :
+		inputHandler(0),
+		frameQueueClosed(false),
+		initiated(false)
+	{}
+	
+	~AsynchIOHandler() {
+		if (inputHandler)
+			inputHandler->closed();
+		delete inputHandler;
+	}
+
+	void init(AsynchIO* a, ConnectionInputHandler* h) {
+		aio = a;
+		inputHandler = h;
+	}
+
+	// Output side
+	void send(framing::AMQFrame&);
+	void close();
+
+	// Input side
+	void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff);
+	void eof(AsynchIO& aio);
+	void disconnect(AsynchIO& aio);
+	
+	// Notifications
+	void nobuffs(AsynchIO& aio);
+	void idle(AsynchIO& aio);
+	void closedSocket(AsynchIO& aio, const Socket& s);
+};
+
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
+
+	AsynchIOHandler* async = new AsynchIOHandler; 
+	ConnectionInputHandler* handler = f->create(async);
+    AsynchIO* aio = new AsynchIO(s,
+    	boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+    	boost::bind(&AsynchIOHandler::eof, async, _1),
+    	boost::bind(&AsynchIOHandler::disconnect, async, _1),
+		boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+    	boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+    	boost::bind(&AsynchIOHandler::idle, async, _1));
+	async->init(aio, handler);
+
+    // Give connection some buffers to use
+    for (int i = 0; i < 4; i++) {
+        aio->queueReadBuffer(new Buff);
+    }
+    aio->start(poller);
+}
+
+
+uint16_t AsynchIOAcceptor::getPort() const {
+    return listeningPort; // Immutable no need for lock.
+}
+
+std::string AsynchIOAcceptor::getHost() const {
+    return listener.getSockname();
+}
+
+void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
+	Dispatcher d(poller);
+	AsynchAcceptor
+		acceptor(listener,
+			boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
+	acceptor.start(poller);
+	
+	std::vector<Thread*> t(numIOThreads-1);
+
+	// Run n-1 io threads
+	for (int i=0; i<numIOThreads-1; ++i)
+		t[i] = new Thread(d);
+
+	// Run final thread
+	d.run();
+	
+	// Now wait for n-1 io threads to exit
+	for (int i=0; i>numIOThreads-1; ++i) {
+		t[i]->join();
+		delete t[i];
+	}
+}
+
+void AsynchIOAcceptor::shutdown() {
+	poller->shutdown();
+}
+
+// Output side
+void AsynchIOHandler::send(framing::AMQFrame& frame) {
+	// TODO: Need to find out if we are in the callback context,
+	// in the callback thread if so we can go further than just queuing the frame
+	// to be handled later
+	{
+	ScopedLock<Mutex> l(frameQueueLock);
+	// Ignore anything seen after closing
+	if (!frameQueueClosed)
+		frameQueue.push(frame);
+	}
+	
+	// Activate aio for writing here
+	aio->queueWrite();
+}
+
+void AsynchIOHandler::close() {
+	ScopedLock<Mutex> l(frameQueueLock);
+	frameQueueClosed = true;
+}
+
+// Input side
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) {
+	framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+    if(initiated){
+        framing::AMQFrame frame;
+        try{
+            while(frame.decode(in)) {
+                QPID_LOG(debug, "RECV: " << frame);
+		        inputHandler->received(frame);
+    		}
+		}catch(const std::exception& e){
+	    	QPID_LOG(error, e.what());
+		}
+	}else{
+	    framing::ProtocolInitiation protocolInit;
+	    if(protocolInit.decode(in)){
+	        QPID_LOG(debug, "INIT [" << aio << "]");
+	        inputHandler->initiated(protocolInit);
+	        initiated = true;
+	    }
+	}
+	// TODO: unreading needs to go away, and when we can cope
+	// with multiple sub-buffers in the general buffer scheme, it will
+	if (in.available() != 0) {
+		// Adjust buffer for used bytes and then "unread them"
+		buff->dataStart += buff->dataCount-in.available();
+		buff->dataCount = in.available();
+		aio->unread(buff);
+	} else {
+		// Give whole buffer back to aio subsystem
+		aio->queueReadBuffer(buff);
+	}
+}
+
+void AsynchIOHandler::eof(AsynchIO&) {
+	inputHandler->closed();
+	aio->queueWriteClose();
+}
+
+void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
+	delete &s;
+	aio->queueForDeletion();
+	delete this;
+}
+
+void AsynchIOHandler::disconnect(AsynchIO& a) {
+	// treat the same as eof
+	eof(a);
+}
+
+// Notifications
+void AsynchIOHandler::nobuffs(AsynchIO&) {
+}
+
+void AsynchIOHandler::idle(AsynchIO&){
+	ScopedLock<Mutex> l(frameQueueLock);
+	
+	if (frameQueue.empty()) {
+		// At this point we know that we're write idling the connection
+		// so we could note that somewhere or do something special
+		return;
+	}
+	
+	// Try and get a queued buffer if not then construct new one
+	AsynchIO::Buffer* buff = aio->getQueuedBuffer();
+	if (!buff)
+		buff = new Buff;
+	std::auto_ptr<framing::Buffer> out(new framing::Buffer(buff->bytes, buff->byteCount));
+	int buffUsed = 0;
+
+	while (!frameQueue.empty()) {
+		framing::AMQFrame frame = frameQueue.front();
+		frameQueue.pop();
+
+		// Encode output frame
+		int frameSize = frame.size();
+		if (frameSize > buff->byteCount)
+			THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+		
+		// If we've filled current buffer then flush and get new one
+		if (frameSize > int(out->available())) {
+			buff->dataCount = buffUsed;
+			aio->queueWrite(buff);
+
+			buff = aio->getQueuedBuffer();
+			if (!buff)
+				buff = new Buff;
+			out.reset(new framing::Buffer(buff->bytes, buff->byteCount));
+			buffUsed = 0;			
+		}
+
+		frame.encode(*out);
+		buffUsed += frameSize;
+		QPID_LOG(debug, "SENT: " << frame);
+	}
+
+	buff->dataCount = buffUsed;
+	aio->queueWrite(buff);
+
+	if (frameQueueClosed) {
+		aio->queueWriteClose();
+	}
+	
+}
+
+}} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.cpp Fri Jul 27 10:19:30 2007
@@ -94,10 +94,11 @@
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
+    case DELAYED_IDLE:
         break;
     case DELAYED_R:
     case DELAYED_W:
-    case CALLBACK:
+    case DELAYED_INACTIVE:
         state = r ?
             (w ? DELAYED_RW : DELAYED_R) :
             DELAYED_W;
@@ -132,6 +133,7 @@
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
+    case DELAYED_IDLE:
         break;
     case DELAYED_R:
     case DELAYED_RW:
@@ -140,7 +142,7 @@
     case DELAYED_W:
         state = DELAYED_RW;
         break;
-    case CALLBACK:
+    case DELAYED_INACTIVE:
         state = DELAYED_R;
         break;
     case ACTIVE_R:
@@ -168,6 +170,7 @@
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
+    case DELAYED_IDLE:
         break;
     case DELAYED_W:
     case DELAYED_RW:
@@ -176,7 +179,7 @@
     case DELAYED_R:
         state = DELAYED_RW;
         break;
-    case CALLBACK:
+    case DELAYED_INACTIVE:
         state = DELAYED_W;
         break;
     case INACTIVE:
@@ -204,15 +207,16 @@
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
+    case DELAYED_IDLE:
         break;
     case DELAYED_R:
-        state = CALLBACK;
+        state = DELAYED_INACTIVE;
         break;
     case DELAYED_RW:
         state = DELAYED_W;    
         break;
     case DELAYED_W:
-    case CALLBACK:
+    case DELAYED_INACTIVE:
     case DELAYED_DELETE:
         break;
     case ACTIVE_R:
@@ -239,15 +243,16 @@
     ScopedLock<Mutex> lock(stateLock);
     switch(state) {
     case IDLE:
+    case DELAYED_IDLE:
         break;
     case DELAYED_W:
-        state = CALLBACK;
+        state = DELAYED_INACTIVE;
         break;
     case DELAYED_RW:
         state = DELAYED_R;
         break;
     case DELAYED_R:
-    case CALLBACK:
+    case DELAYED_INACTIVE:
     case DELAYED_DELETE:
         break;
     case ACTIVE_W:
@@ -270,12 +275,13 @@
     ScopedLock<Mutex> lock(stateLock);
     switch (state) {
     case IDLE:
+    case DELAYED_IDLE:
         break;
     case DELAYED_R:
     case DELAYED_W:
     case DELAYED_RW:
-    case CALLBACK:
-        state = CALLBACK;
+    case DELAYED_INACTIVE:
+        state = DELAYED_INACTIVE;
         break;
     case DELAYED_DELETE:
         break;
@@ -289,32 +295,46 @@
 
 void DispatchHandle::stopWatch() {
     ScopedLock<Mutex> lock(stateLock);
-    if ( state == IDLE) {
+    switch (state) {
+    case IDLE:
+    case DELAYED_IDLE:
+    case DELAYED_DELETE:
     	return;
+    case DELAYED_R:
+    case DELAYED_W:
+    case DELAYED_RW:
+    case DELAYED_INACTIVE:
+    	state = DELAYED_IDLE;
+    	break;
+    default:
+	    state = IDLE;
+	    break;
     }
     assert(poller);
     poller->delFd(*this);
     poller.reset();
-    state = IDLE;
 }
 
 // The slightly strange switch structure
 // is to ensure that the lock is released before
 // we do the delete
 void DispatchHandle::doDelete() {
+	// Ensure that we're no longer watching anything
+	stopWatch();
+
     // If we're in the middle of a callback defer the delete
     {
     ScopedLock<Mutex> lock(stateLock);
     switch (state) {
-    case DELAYED_R:
-    case DELAYED_W:
-    case DELAYED_RW:
-    case CALLBACK:
+    case DELAYED_IDLE:
     case DELAYED_DELETE:
         state = DELAYED_DELETE;
         return;
+    case IDLE:
+    	break;
     default:
-        break;
+    	// Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
+        assert(false);
     }
     }
     // If we're not then do it right away
@@ -359,7 +379,7 @@
     case Poller::DISCONNECTED:
         {
         ScopedLock<Mutex> lock(stateLock);
-        state = CALLBACK;
+        state = DELAYED_INACTIVE;
         }
         if (disconnectedCallback) {
             disconnectedCallback(*this);
@@ -386,10 +406,11 @@
         poller->modFd(*this, Poller::INOUT);
         state = ACTIVE_RW;
         return;
-    case CALLBACK:
+    case DELAYED_INACTIVE:
         state = INACTIVE;
         return;
-    case IDLE:
+    case DELAYED_IDLE:
+    	state = IDLE;
     	return;
     default:
         // This should be impossible

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Dispatcher.h Fri Jul 27 10:19:30 2007
@@ -49,12 +49,13 @@
     Mutex stateLock;
     enum {
         IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
-        CALLBACK, DELAYED_R, DELAYED_W, DELAYED_RW, DELAYED_DELETE
+        DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
+        DELAYED_DELETE
     } state;
 
 public:
-    DispatchHandle(int fd, Callback rCb, Callback wCb, Callback dCb) :
-      PollerHandle(fd),
+    DispatchHandle(const Socket& s, Callback rCb, Callback wCb, Callback dCb) :
+      PollerHandle(s),
       readableCallback(rCb),
       writableCallback(wCb),
       disconnectedCallback(dCb),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Poller.h Fri Jul 27 10:19:30 2007
@@ -23,6 +23,7 @@
  */
 
 #include "Time.h"
+#include "Socket.h"
 
 #include <stdint.h>
 
@@ -39,14 +40,14 @@
 class PollerHandle {
     friend class Poller;
 
-    PollerHandlePrivate* impl;
-    const int fd;
+    PollerHandlePrivate* const impl;
+    const Socket& socket;
 
 public:
-    PollerHandle(int fd0);
+    PollerHandle(const Socket& s);
     virtual ~PollerHandle();
-
-    int getFD() const { return fd; }
+    
+    const Socket& getSocket() const {return socket;}
 };
 
 /**
@@ -55,7 +56,7 @@
  */
 class PollerPrivate;
 class Poller {
-    PollerPrivate* impl;
+    PollerPrivate* const impl;
 
 public:
     typedef boost::shared_ptr<Poller> shared_ptr;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Fri Jul 27 10:19:30 2007
@@ -21,11 +21,74 @@
  * under the License.
  *
  */
+#include <string>
+#include "qpid/sys/Time.h"
 
-#ifdef USE_APR_PLATFORM
-#include "apr/Socket.h"
-#else
-#include "posix/Socket.h"
-#endif
+struct sockaddr;
 
+namespace qpid {
+namespace sys {
+
+class SocketPrivate;
+class Socket
+{
+	friend class Poller;
+
+	SocketPrivate* const impl;
+
+public:
+    /** Create a socket wrapper for descriptor. */
+    Socket();
+    ~Socket();
+
+    /** Create an initialized TCP socket */
+    void createTcp() const;
+    
+    /** Set timeout for read and write */
+    void setTimeout(const Duration& interval) const;
+    
+    /** Set socket non blocking */
+    void setNonblocking() const;
+
+    void connect(const std::string& host, int port) const;
+
+    void close() const;
+
+    enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 } ErrorCode;
+
+    /** Returns bytes sent or an ErrorCode value < 0. */
+    ssize_t send(const void* data, size_t size) const;
+
+    /**
+     * Returns bytes received, an ErrorCode value < 0 or 0
+     * if the connection closed in an orderly manner.
+     */
+    ssize_t recv(void* data, size_t size) const;
+
+    /** Bind to a port and start listening.
+     *@param port 0 means choose an available port.
+     *@param backlog maximum number of pending connections.
+     *@return The bound port.
+     */
+    int listen(int port = 0, int backlog = 10) const;
+    
+    /** Returns the "socket name" ie the address bound to 
+     * the near end of the socket
+     */
+    std::string getSockname() const;
+
+    /** Accept a connection from a socket that is already listening
+     * and has an incoming connection
+     */
+    Socket* accept(struct sockaddr *addr, socklen_t *addrlen) const;
+
+    // TODO The following are raw operations, maybe they need better wrapping? 
+    int read(void *buf, size_t count) const;
+    int write(const void *buf, size_t count) const;
+
+private:
+	Socket(SocketPrivate*);
+};
+
+}}
 #endif  /*!_sys_Socket_h*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRAcceptor.cpp Fri Jul 27 10:19:30 2007
@@ -56,8 +56,6 @@
 {
     return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
 }
-// Must define Acceptor virtual dtor.
-Acceptor::~Acceptor() {}
 
 APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
     port(port_),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/Socket.cpp Fri Jul 27 10:19:30 2007
@@ -20,31 +20,56 @@
  */
 
 
-#include "Socket.h"
+#include "qpid/sys/Socket.h"
+
 #include "APRBase.h"
 #include "APRPool.h"
 
+#include <apr_network_io.h>
+
+namespace qpid {
+namespace sys {
+
+class SocketPrivate {
+public:
+	SocketPrivate(apr_socket_t* s = 0) :
+		socket(s)
+	{}
+
+	apr_socket_t* socket;
+};
+
+Socket::Socket() :
+	impl(new SocketPrivate)
+{
+	createTcp();
+}
+
+Socket::Socket(SocketPrivate* sp) :
+	impl(sp)
+{}
 
-using namespace qpid::sys;
+Socket::~Socket() {
+	delete impl;
+}
 
-Socket Socket::createTcp() {
-    Socket s;
+void Socket::createTcp() const {
+	apr_socket_t*& socket = impl->socket;
+    apr_socket_t* s;
     CHECK_APR_SUCCESS(
         apr_socket_create(
-            &s.socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+            &s, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
             APRPool::get()));
-    return s;
-}
-
-Socket::Socket(apr_socket_t* s) {
     socket = s;
 }
 
-void Socket::setTimeout(const Duration& interval) {
+void Socket::setTimeout(const Duration& interval) const {
+	apr_socket_t*& socket = impl->socket;
     apr_socket_timeout_set(socket, interval/TIME_USEC);
 }
 
-void Socket::connect(const std::string& host, int port) {
+void Socket::connect(const std::string& host, int port) const {
+	apr_socket_t*& socket = impl->socket;
     apr_sockaddr_t* address;
     CHECK_APR_SUCCESS(
         apr_sockaddr_info_get(
@@ -53,14 +78,16 @@
     CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
 }
 
-void Socket::close() {
+void Socket::close() const {
+	apr_socket_t*& socket = impl->socket;
     if (socket == 0) return;
     CHECK_APR_SUCCESS(apr_socket_close(socket));
     socket = 0;
 }
 
-ssize_t Socket::send(const void* data, size_t size)
+ssize_t Socket::send(const void* data, size_t size) const
 {
+	apr_socket_t*& socket = impl->socket;
     apr_size_t sent = size;
     apr_status_t status =
         apr_socket_send(socket, reinterpret_cast<const char*>(data), &sent);
@@ -70,8 +97,9 @@
     return sent;
 }
 
-ssize_t Socket::recv(void* data, size_t size)
+ssize_t Socket::recv(void* data, size_t size) const
 {
+	apr_socket_t*& socket = impl->socket;
     apr_size_t received = size;
     apr_status_t status =
         apr_socket_recv(socket, reinterpret_cast<char*>(data), &received);
@@ -83,4 +111,4 @@
     return received;
 }
 
-
+}} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Fri Jul 27 10:19:30 2007
@@ -22,6 +22,7 @@
 #include "qpid/sys/Poller.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/posix/check.h"
+#include "qpid/sys/posix/PrivatePosix.h"
 
 #include <sys/epoll.h>
 #include <errno.h>
@@ -88,11 +89,11 @@
     }
 };
 
-PollerHandle::PollerHandle(int fd0) :
+PollerHandle::PollerHandle(const Socket& s) :
     impl(new PollerHandlePrivate),
-    fd(fd0)
+    socket(s)
 {}
-    
+
 PollerHandle::~PollerHandle() {
     delete impl;
 }
@@ -186,7 +187,7 @@
     }
     epe.data.ptr = &handle;
     
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, handle.getFD(), &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, toFd(handle.socket.impl), &epe));
     
     // Record monitoring state of this fd
     eh.events = epe.events;
@@ -197,7 +198,7 @@
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);
     assert(!eh.isIdle());
-    int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, handle.getFD(), 0);
+    int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, toFd(handle.socket.impl), 0);
     // Ignore EBADF since deleting a nonexistent fd has the overall required result!
     // And allows the case where a sloppy program closes the fd and then does the delFd()
     if (rc == -1 && errno != EBADF) {
@@ -216,7 +217,7 @@
     epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
     epe.data.ptr = &handle;
     
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
     
     // Record monitoring state of this fd
     eh.events = epe.events;
@@ -232,7 +233,7 @@
     epe.events = eh.events;        
     epe.data.ptr = &handle;
 
-    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, handle.getFD(), &epe));
+    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, toFd(handle.socket.impl), &epe));
 
     eh.setActive();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Jul 27 10:19:30 2007
@@ -24,8 +24,6 @@
 #include "check.h"
 
 #include <unistd.h>
-#include <fcntl.h>
-#include <sys/types.h>
 #include <sys/socket.h>
 #include <signal.h>
 #include <errno.h>
@@ -37,13 +35,6 @@
 namespace {
 
 /*
- * Make file descriptor non-blocking
- */
-void nonblocking(int fd) {
-    QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
-}
-
-/*
  * Make *process* not generate SIGPIPE when writing to closed
  * pipe/socket (necessary as default action is to terminate process)
  */
@@ -57,11 +48,11 @@
  * Asynch Acceptor
  */
 
-AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
     acceptedCallback(callback),
-    handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
+    handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
 
-    nonblocking(fd);
+    s.setNonblocking();
     ignoreSigpipe();
 }
 
@@ -73,18 +64,16 @@
  * We keep on accepting as long as there is something to accept
  */
 void AsynchAcceptor::readable(DispatchHandle& h) {
-    int afd;
+    Socket* s;
     do {
         errno = 0;
         // TODO: Currently we ignore the peers address, perhaps we should
         // log it or use it for connection acceptance.
-        afd = ::accept(h.getFD(), 0, 0);
-        if (afd >= 0) {
-            acceptedCallback(afd);
-        } else if (errno == EAGAIN) {
-            break;
+        s = h.getSocket().accept(0, 0);
+        if (s) {
+            acceptedCallback(*s);
         } else {
-            QPID_POSIX_CHECK(afd);
+        	break;
         }
     } while (true);
 
@@ -94,21 +83,23 @@
 /*
  * Asynch reader/writer
  */
-AsynchIO::AsynchIO(int fd,
+AsynchIO::AsynchIO(const Socket& s,
     ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
-    BuffersEmptyCallback eCb, IdleCallback iCb) :
+    ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
 
-    DispatchHandle(fd, 
+    DispatchHandle(s, 
         boost::bind(&AsynchIO::readable, this, _1),
         boost::bind(&AsynchIO::writeable, this, _1),
         boost::bind(&AsynchIO::disconnected, this, _1)),
     readCallback(rCb),
     eofCallback(eofCb),
     disCallback(disCb),
+    closedCallback(cCb),
     emptyCallback(eCb),
-    idleCallback(iCb) {
+    idleCallback(iCb),
+    queuedClose(false) {
 
-    nonblocking(fd);
+    s.setNonblocking();
 }
 
 struct deleter
@@ -131,15 +122,58 @@
 }
 
 void AsynchIO::queueReadBuffer(Buffer* buff) {
+	assert(buff);
+    buff->dataStart = 0;
+    buff->dataCount = 0;
+    bufferQueue.push_back(buff);
+    DispatchHandle::rewatchRead();
+}
+
+void AsynchIO::unread(Buffer* buff) {
+	assert(buff);
+	if (buff->dataStart != 0) {
+		memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+		buff->dataStart = 0;
+	}
     bufferQueue.push_front(buff);
     DispatchHandle::rewatchRead();
 }
 
+// Either queue for writing or announce that there is something to write
+// and we should ask for it
 void AsynchIO::queueWrite(Buffer* buff) {
-    writeQueue.push_front(buff);
+	// If no buffer then don't queue anything
+	// (but still wake up for writing) 
+	if (buff) {
+		// If we've already closed the socket then throw the write away
+		if (queuedClose) {
+			bufferQueue.push_front(buff);
+			return;
+		} else {
+    		writeQueue.push_front(buff);
+		}
+	}
     DispatchHandle::rewatchWrite();
 }
 
+void AsynchIO::queueWriteClose() {
+	queuedClose = true;
+}
+
+/** Return a queued buffer if there are enough
+ * to spare
+ */
+AsynchIO::Buffer* AsynchIO::getQueuedBuffer() {
+	// Always keep at least one buffer (it might have data that was "unread" in it)
+	if (bufferQueue.size()<=1)
+		return 0;
+	Buffer* buff = bufferQueue.back();
+	buff->dataStart = 0;
+	buff->dataCount = 0;
+	bufferQueue.pop_back();
+	return buff;
+}
+
 /*
  * We keep on reading as long as we have something to read and a buffer to put
  * it in
@@ -149,19 +183,19 @@
         // (Try to) get a buffer
         if (!bufferQueue.empty()) {
             // Read into buffer
-            Buffer* buff = bufferQueue.back();
-            bufferQueue.pop_back();
+            Buffer* buff = bufferQueue.front();
+            bufferQueue.pop_front();
             errno = 0;
-            int rc = ::read(h.getFD(), buff->bytes, buff->byteCount);
+            int readCount = buff->byteCount-buff->dataCount;
+            int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount);
             if (rc == 0) {
                 eofCallback(*this);
                 h.unwatchRead();
                 return;
             } else if (rc > 0) {
-                buff->dataStart = 0;
-                buff->dataCount = rc;
+                buff->dataCount += rc;
                 readCallback(*this, buff);
-                if (rc != buff->byteCount) {
+                if (rc != readCount) {
                     // If we didn't fill the read buffer then time to stop reading
                     return;
                 }
@@ -209,7 +243,7 @@
             writeQueue.pop_back();
             errno = 0;
             assert(buff->dataStart+buff->dataCount <= buff->byteCount);
-            int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount);
+            int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount);
             if (rc >= 0) {
                 // If we didn't write full buffer put rest back
                 if (rc != buff->dataCount) {
@@ -238,12 +272,17 @@
                 }
             }
         } else {
+        	// If we're waiting to close the socket then can do it now as there is nothing to write
+        	if (queuedClose) {
+        		close(h);
+        		return;
+        	}
             // Fd is writable, but nothing to write
             if (idleCallback) {
                 idleCallback(*this);
             }
             // If we still have no buffers to write we can't do anything more
-            if (writeQueue.empty()) {
+            if (writeQueue.empty() && !queuedClose) {
                 h.unwatchWrite();
                 return;
             }
@@ -252,8 +291,25 @@
 }
         
 void AsynchIO::disconnected(DispatchHandle& h) {
+	// If we've already queued close do it before callback
+	if (queuedClose) {
+		close(h);
+	}
+	
     if (disCallback) {
         disCallback(*this);
         h.unwatch();
     }
 }
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(DispatchHandle& h) {
+	h.stopWatch();
+	h.getSocket().close();
+	if (closedCallback) {
+		closedCallback(*this, getSocket());
+	}
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h Fri Jul 27 10:19:30 2007
@@ -30,9 +30,14 @@
 namespace qpid {
 namespace sys {
 
+// Private Time related implementation details
 struct timespec& toTimespec(struct timespec& ts, const Duration& t);
 struct timeval& toTimeval(struct timeval& tv, const Duration& t);
 Duration toTime(const struct timespec& ts);
+
+// Private socket related implementation details
+class SocketPrivate;
+int toFd(const SocketPrivate* s);
 
 }}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Fri Jul 27 10:19:30 2007
@@ -25,6 +25,8 @@
 #include "check.h"
 #include "PrivatePosix.h"
 
+#include <fcntl.h>
+#include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/errno.h>
 #include <netinet/in.h>
@@ -32,30 +34,63 @@
 
 #include <boost/format.hpp>
 
-using namespace qpid::sys;
+namespace qpid {
+namespace sys {
 
-Socket Socket::createTcp() 
+class SocketPrivate {
+public:
+	SocketPrivate(int f = -1) :
+		fd(f)
+	{}
+
+	int fd;
+};
+
+Socket::Socket() :
+	impl(new SocketPrivate)
+{
+	createTcp();
+}
+
+Socket::Socket(SocketPrivate* sp) :
+	impl(sp)
+{}
+
+Socket::~Socket() {
+	delete impl;
+}
+
+void Socket::createTcp() const
 {
+	int& socket = impl->fd;
+	if (socket != -1) Socket::close();
     int s = ::socket (PF_INET, SOCK_STREAM, 0);
     if (s < 0) throw QPID_POSIX_ERROR(errno);
-    return s;
+    socket = s;
 }
 
-Socket::Socket(int descriptor) : socket(descriptor) {}
-
-void Socket::setTimeout(const Duration& interval)
+void Socket::setTimeout(const Duration& interval) const
 {
+	const int& socket = impl->fd;
     struct timeval tv;
     toTimeval(tv, interval);
     setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
     setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
 }
 
-void Socket::connect(const std::string& host, int port)
+void Socket::setNonblocking() const {
+    QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
+}
+
+
+void Socket::connect(const std::string& host, int port) const
 {
+	const int& socket = impl->fd;
     struct sockaddr_in name;
     name.sin_family = AF_INET;
     name.sin_port = htons(port);
+    // TODO: Be good to make this work for IPv6 as well as IPv4
+    // Use more modern lookup functions
     struct hostent* hp = gethostbyname ( host.c_str() );
     if (hp == 0) throw QPID_POSIX_ERROR(errno);
     memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
@@ -64,16 +99,18 @@
 }
 
 void
-Socket::close()
+Socket::close() const
 {
-    if (socket == 0) return;
+	int& socket = impl->fd;
+    if (socket == -1) return;
     if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
-    socket = 0;
+    socket = -1;
 }
 
 ssize_t
-Socket::send(const void* data, size_t size)
+Socket::send(const void* data, size_t size) const
 {
+	const int& socket = impl->fd;
     ssize_t sent = ::send(socket, data, size, 0);
     if (sent < 0) {
         if (errno == ECONNRESET) return SOCKET_EOF;
@@ -84,8 +121,9 @@
 }
 
 ssize_t
-Socket::recv(void* data, size_t size)
+Socket::recv(void* data, size_t size) const
 {
+	const int& socket = impl->fd;
     ssize_t received = ::recv(socket, data, size, 0);
     if (received < 0) {
         if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
@@ -94,8 +132,9 @@
     return received;
 }
 
-int Socket::listen(int port, int backlog) 
+int Socket::listen(int port, int backlog) const
 {
+	const int& socket = impl->fd;
     struct sockaddr_in name;
     name.sin_family = AF_INET;
     name.sin_port = htons(port);
@@ -111,8 +150,45 @@
 
     return ntohs(name.sin_port);
 }
+
+Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
+{
+	int afd = ::accept(impl->fd, addr, addrlen);
+	if ( afd >= 0)
+		return new Socket(new SocketPrivate(afd));
+	else if (errno == EAGAIN)
+		return 0;
+    else throw QPID_POSIX_ERROR(errno);
+}
+
+int Socket::read(void *buf, size_t count) const
+{
+	return ::read(impl->fd, buf, count);
+}
+
+int Socket::write(const void *buf, size_t count) const
+{
+	return ::write(impl->fd, buf, count);
+}
+
+std::string Socket::getSockname() const
+{
+    ::sockaddr_storage name; // big enough for any socket address    
+    ::socklen_t namelen = sizeof(name);
+
+	const int& socket = impl->fd;
+    if (::getsockname(socket, (::sockaddr*)&name, &namelen) < 0)
+        throw QPID_POSIX_ERROR(errno);
     
-int Socket::fd() const
+    char dispName[NI_MAXHOST];
+    if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName), 0, 0, NI_NUMERICHOST) != 0)
+    	throw QPID_POSIX_ERROR(rc);
+    return dispName;
+}
+
+int toFd(const SocketPrivate* s)
 {
-    return socket;
+    return s->fd;
 }
+
+}} // namespace qpid::sys

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default?view=diff&rev=560323&r1=560322&r2=560323
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp-default Fri Jul 27 10:19:30 2007
@@ -7,4 +7,12 @@
    obj:*/libcpg.so.2.0.0
 }
 
+{
+   Uninitialised value problem in dlopen
+   Memcheck:Cond
+   fun:_dl_relocate_object
+   fun:*dl_*
+   obj:/lib64/ld-2.6.so
+   obj:*
+}