You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/08/09 20:27:28 UTC

svn commit: r564333 - in /incubator/qpid/trunk/qpid: ./ cpp/src/qpid/broker/Broker.cpp cpp/src/qpid/client/Connector.cpp cpp/src/qpid/client/Connector.h cpp/src/qpid/sys/Acceptor.h cpp/src/qpid/sys/AsynchIOAcceptor.cpp

Author: astitcher
Date: Thu Aug  9 11:27:27 2007
New Revision: 564333

URL: http://svn.apache.org/viewvc?view=rev&rev=564333
Log:
 r948@fuschia:  andrew | 2007-08-09 18:46:30 +0100
  r913@fuschia:  andrew | 2007-08-07 12:56:10 +0100
  Removed extraneous parameter when creating Broker Acceptor
 
 r949@fuschia:  andrew | 2007-08-09 18:46:30 +0100
  r914@fuschia:  andrew | 2007-08-07 12:56:55 +0100
  Work in Progress: making the client library use the AsynchIO layer
 
 r950@fuschia:  andrew | 2007-08-09 18:46:30 +0100
  r937@fuschia:  andrew | 2007-08-09 02:29:41 +0100
  Shutdown connection properly
 
 r951@fuschia:  andrew | 2007-08-09 18:46:31 +0100
 
 r952@fuschia:  andrew | 2007-08-09 19:27:03 +0100
 Finishing touches to first cut of AsynchIO for client

Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Thu Aug  9 11:27:27 2007
@@ -1,2 +1,3 @@
 8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1224
-c99eadab-1afc-4df6-acde-a632afdabecb:/local/qpid/trunk/qpid:899
+c99eadab-1afc-4df6-acde-a632afdabecb:/local/qpid-trunk:947
+c99eadab-1afc-4df6-acde-a632afdabecb:/local/qpid/trunk/qpid:952

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Aug  9 11:27:27 2007
@@ -153,8 +153,7 @@
         const_cast<Acceptor::shared_ptr&>(acceptor) =
             Acceptor::create(config.port,
                              config.connectionBacklog,
-                             config.workerThreads,
-                             false);
+                             config.workerThreads);
         QPID_LOG(info, "Listening on port " << getPort());
     }
     return *acceptor;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Aug  9 11:27:27 2007
@@ -25,6 +25,12 @@
 #include "qpid/framing/AMQFrame.h"
 #include "Connector.h"
 
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+
+#include <boost/bind.hpp>
+
 namespace qpid {
 namespace client {
 
@@ -43,7 +49,6 @@
     idleIn(0), idleOut(0), 
     timeoutHandler(0),
     shutdownHandler(0),
-    inbuf(receive_buffer_size), 
     outbuf(send_buffer_size)
 { }
 
@@ -56,6 +61,7 @@
 void Connector::connect(const std::string& host, int port){
     socket.connect(host, port);
     closed = false;
+    poller = Poller::shared_ptr(new Poller);
     receiver = Thread(this);
 }
 
@@ -68,7 +74,7 @@
 bool Connector::closeInternal() {
     Mutex::ScopedLock l(closedLock);
     if (!closed) {
-        socket.close();
+        poller->shutdown();
         closed = true;
         return true;
     }
@@ -91,6 +97,8 @@
     return this; 
 }
 
+// TODO: astitcher 20070908: Writing still needs to be transferred to the aynchronous IO
+// framework.
 void Connector::send(AMQFrame& frame){
     writeBlock(&frame);
     QPID_LOG(trace, "SENT: " << frame);
@@ -121,6 +129,10 @@
         shutdownHandler->shutdown();
 }
 
+// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing
+// can never be called. The timeut processing needs to be added into the underlying Dispatcher code
+//
+// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof
 void Connector::checkIdle(ssize_t status){
     if(timeoutHandler){
         AbsTime t = now();
@@ -166,33 +178,65 @@
     timeoutHandler = handler;
 }
 
-void Connector::run(){
-    try{
-	while(!closed){
-            ssize_t available = inbuf.available();
-            if(available < 1){
-                THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
-            }
-            ssize_t received = socket.recv(inbuf.start(), available);
-	    checkIdle(received);
 
-	    if(!closed && received > 0){
-		inbuf.move(received);
-		inbuf.flip();//position = 0, limit = total data read
-		
-		AMQFrame frame(version);
-		while(frame.decode(inbuf)){
-                    QPID_LOG(trace, "RECV: " << frame);
-		    input->received(frame);
-		}
-                //need to compact buffer to preserve any 'extra' data
-                inbuf.compact();
-	    }
+// Buffer definition
+struct Buff : public AsynchIO::Buffer {
+    Buff() :
+        AsynchIO::Buffer(new char[65536], 65536)
+    {}
+    ~Buff()
+    { delete [] bytes;}
+};
+
+void Connector::readbuff(AsynchIO& aio, AsynchIO::Buffer* buff) {
+    framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+	AMQFrame frame(version);
+	while(frame.decode(in)){
+	    QPID_LOG(trace, "RECV: " << frame);
+		input->received(frame);
 	}
-    } catch (const std::exception& e) {
+	// TODO: unreading needs to go away, and when we can cope
+	// with multiple sub-buffers in the general buffer scheme, it will
+	if (in.available() != 0) {
+		// Adjust buffer for used bytes and then "unread them"
+		buff->dataStart += buff->dataCount-in.available();
+		buff->dataCount = in.available();
+		aio.unread(buff);
+	} else {
+		// Give whole buffer back to aio subsystem
+		aio.queueReadBuffer(buff);
+	}
+}
+
+void Connector::eof(AsynchIO&) {
+	handleClosed();
+}
+
+// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
+// will never be called
+void Connector::run(){
+	try {
+	    Dispatcher d(poller);
+	
+	    AsynchIO* aio = new AsynchIO(socket,
+	    	boost::bind(&Connector::readbuff, this, _1, _2),
+	    	boost::bind(&Connector::eof, this, _1),
+                boost::bind(&Connector::eof, this, _1));
+	    
+	    for (int i = 0; i < 32; i++) {
+	        aio->queueReadBuffer(new Buff);
+	    }
+	
+	    aio->start(poller);
+	    d.run();
+        aio->queueForDeletion();
+        socket.close();
+	} catch (const std::exception& e) {
         QPID_LOG(error, e.what());
         handleClosed();
     }
 }
+
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Thu Aug  9 11:27:27 2007
@@ -34,9 +34,10 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Socket.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/AsynchIO.h"
 
 namespace qpid {
-
+	
 namespace client {
 
 class Connector : public framing::OutputHandler, 
@@ -62,7 +63,6 @@
     framing::InitiationHandler* initialiser;
     framing::OutputHandler* output;
 	
-    framing::Buffer inbuf;
     framing::Buffer outbuf;
 
     sys::Mutex writeLock;
@@ -70,6 +70,8 @@
 
     sys::Socket socket;
 
+    sys::Poller::shared_ptr poller;
+
     void checkIdle(ssize_t status);
     void writeBlock(framing::AMQDataBlock* data);
     void writeToSocket(char* data, size_t available);
@@ -78,7 +80,10 @@
     void run();
     void handleClosed();
     bool closeInternal();
-
+    
+    void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::Buffer*);
+    void eof(qpid::sys::AsynchIO&);
+    
   friend class Channel;
   public:
     Connector(framing::ProtocolVersion pVersion,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Thu Aug  9 11:27:27 2007
@@ -33,7 +33,7 @@
 class Acceptor : public qpid::SharedObject<Acceptor>
 {
   public:
-    static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false);
+    static Acceptor::shared_ptr create(int16_t port, int backlog, int threads);
     virtual ~Acceptor() = 0;
     virtual uint16_t getPort() const = 0;
     virtual std::string getHost() const = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Thu Aug  9 11:27:27 2007
@@ -49,7 +49,7 @@
 	const uint16_t listeningPort;
 
 public:
-    AsynchIOAcceptor(int16_t port, int backlog, int threads, bool trace);
+    AsynchIOAcceptor(int16_t port, int backlog, int threads);
     ~AsynchIOAcceptor() {}
     void run(ConnectionInputHandlerFactory* factory);
     void shutdown();
@@ -61,13 +61,13 @@
     void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
 };
 
-Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
 {
     return
-    	Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads, trace));
+    	Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
 }
 
-AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads, bool) :
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
  	poller(new Poller),
  	numIOThreads(threads),
  	listeningPort(listener.listen(port, backlog))