You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/08/09 20:27:28 UTC
svn commit: r564333 - in /incubator/qpid/trunk/qpid: ./
cpp/src/qpid/broker/Broker.cpp cpp/src/qpid/client/Connector.cpp
cpp/src/qpid/client/Connector.h cpp/src/qpid/sys/Acceptor.h
cpp/src/qpid/sys/AsynchIOAcceptor.cpp
Author: astitcher
Date: Thu Aug 9 11:27:27 2007
New Revision: 564333
URL: http://svn.apache.org/viewvc?view=rev&rev=564333
Log:
r948@fuschia: andrew | 2007-08-09 18:46:30 +0100
r913@fuschia: andrew | 2007-08-07 12:56:10 +0100
Removed extraneous parameter when creating Broker Acceptor
r949@fuschia: andrew | 2007-08-09 18:46:30 +0100
r914@fuschia: andrew | 2007-08-07 12:56:55 +0100
Work in Progress: making the client library use the AsynchIO layer
r950@fuschia: andrew | 2007-08-09 18:46:30 +0100
r937@fuschia: andrew | 2007-08-09 02:29:41 +0100
Shutdown connection properly
r951@fuschia: andrew | 2007-08-09 18:46:31 +0100
r952@fuschia: andrew | 2007-08-09 19:27:03 +0100
Finishing touches to first cut of AsynchIO for client
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Thu Aug 9 11:27:27 2007
@@ -1,2 +1,3 @@
8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1224
-c99eadab-1afc-4df6-acde-a632afdabecb:/local/qpid/trunk/qpid:899
+c99eadab-1afc-4df6-acde-a632afdabecb:/local/qpid-trunk:947
+c99eadab-1afc-4df6-acde-a632afdabecb:/local/qpid/trunk/qpid:952
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Aug 9 11:27:27 2007
@@ -153,8 +153,7 @@
const_cast<Acceptor::shared_ptr&>(acceptor) =
Acceptor::create(config.port,
config.connectionBacklog,
- config.workerThreads,
- false);
+ config.workerThreads);
QPID_LOG(info, "Listening on port " << getPort());
}
return *acceptor;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu Aug 9 11:27:27 2007
@@ -25,6 +25,12 @@
#include "qpid/framing/AMQFrame.h"
#include "Connector.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+
+#include <boost/bind.hpp>
+
namespace qpid {
namespace client {
@@ -43,7 +49,6 @@
idleIn(0), idleOut(0),
timeoutHandler(0),
shutdownHandler(0),
- inbuf(receive_buffer_size),
outbuf(send_buffer_size)
{ }
@@ -56,6 +61,7 @@
void Connector::connect(const std::string& host, int port){
socket.connect(host, port);
closed = false;
+ poller = Poller::shared_ptr(new Poller);
receiver = Thread(this);
}
@@ -68,7 +74,7 @@
bool Connector::closeInternal() {
Mutex::ScopedLock l(closedLock);
if (!closed) {
- socket.close();
+ poller->shutdown();
closed = true;
return true;
}
@@ -91,6 +97,8 @@
return this;
}
+// TODO: astitcher 20070908: Writing still needs to be transferred to the aynchronous IO
+// framework.
void Connector::send(AMQFrame& frame){
writeBlock(&frame);
QPID_LOG(trace, "SENT: " << frame);
@@ -121,6 +129,10 @@
shutdownHandler->shutdown();
}
+// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing
+// can never be called. The timeut processing needs to be added into the underlying Dispatcher code
+//
+// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof
void Connector::checkIdle(ssize_t status){
if(timeoutHandler){
AbsTime t = now();
@@ -166,33 +178,65 @@
timeoutHandler = handler;
}
-void Connector::run(){
- try{
- while(!closed){
- ssize_t available = inbuf.available();
- if(available < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- ssize_t received = socket.recv(inbuf.start(), available);
- checkIdle(received);
- if(!closed && received > 0){
- inbuf.move(received);
- inbuf.flip();//position = 0, limit = total data read
-
- AMQFrame frame(version);
- while(frame.decode(inbuf)){
- QPID_LOG(trace, "RECV: " << frame);
- input->received(frame);
- }
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
- }
+// Buffer definition
+struct Buff : public AsynchIO::Buffer {
+ Buff() :
+ AsynchIO::Buffer(new char[65536], 65536)
+ {}
+ ~Buff()
+ { delete [] bytes;}
+};
+
+void Connector::readbuff(AsynchIO& aio, AsynchIO::Buffer* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+ AMQFrame frame(version);
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV: " << frame);
+ input->received(frame);
}
- } catch (const std::exception& e) {
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (in.available() != 0) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += buff->dataCount-in.available();
+ buff->dataCount = in.available();
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+}
+
+void Connector::eof(AsynchIO&) {
+ handleClosed();
+}
+
+// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
+// will never be called
+void Connector::run(){
+ try {
+ Dispatcher d(poller);
+
+ AsynchIO* aio = new AsynchIO(socket,
+ boost::bind(&Connector::readbuff, this, _1, _2),
+ boost::bind(&Connector::eof, this, _1),
+ boost::bind(&Connector::eof, this, _1));
+
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
+
+ aio->start(poller);
+ d.run();
+ aio->queueForDeletion();
+ socket.close();
+ } catch (const std::exception& e) {
QPID_LOG(error, e.what());
handleClosed();
}
}
+
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Thu Aug 9 11:27:27 2007
@@ -34,9 +34,10 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/AsynchIO.h"
namespace qpid {
-
+
namespace client {
class Connector : public framing::OutputHandler,
@@ -62,7 +63,6 @@
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
- framing::Buffer inbuf;
framing::Buffer outbuf;
sys::Mutex writeLock;
@@ -70,6 +70,8 @@
sys::Socket socket;
+ sys::Poller::shared_ptr poller;
+
void checkIdle(ssize_t status);
void writeBlock(framing::AMQDataBlock* data);
void writeToSocket(char* data, size_t available);
@@ -78,7 +80,10 @@
void run();
void handleClosed();
bool closeInternal();
-
+
+ void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::Buffer*);
+ void eof(qpid::sys::AsynchIO&);
+
friend class Channel;
public:
Connector(framing::ProtocolVersion pVersion,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Acceptor.h Thu Aug 9 11:27:27 2007
@@ -33,7 +33,7 @@
class Acceptor : public qpid::SharedObject<Acceptor>
{
public:
- static Acceptor::shared_ptr create(int16_t port, int backlog, int threads, bool trace = false);
+ static Acceptor::shared_ptr create(int16_t port, int backlog, int threads);
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?view=diff&rev=564333&r1=564332&r2=564333
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Thu Aug 9 11:27:27 2007
@@ -49,7 +49,7 @@
const uint16_t listeningPort;
public:
- AsynchIOAcceptor(int16_t port, int backlog, int threads, bool trace);
+ AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
void shutdown();
@@ -61,13 +61,13 @@
void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
};
-Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
{
return
- Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads, trace));
+ Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
}
-AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads, bool) :
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
poller(new Poller),
numIOThreads(threads),
listeningPort(listener.listen(port, backlog))