You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2012/08/10 19:27:38 UTC

svn commit: r1371775 - in /qpid/trunk/qpid/cpp/src/qpid: client/SslConnector.cpp sys/SslPlugin.cpp sys/ssl/SslHandler.cpp sys/ssl/SslHandler.h sys/ssl/SslIo.cpp sys/ssl/SslIo.h

Author: astitcher
Date: Fri Aug 10 17:27:38 2012
New Revision: 1371775

URL: http://svn.apache.org/viewvc?rev=1371775&view=rev
Log:
SSL changes for new buffer management
- Needed to rework SslConnector to mirror
  TCPConnector in order to make changes to
  the client side, but now Unix SSL and TCP
  client implementations are much more alike.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.h
    qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.h

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1371775&r1=1371774&r2=1371775&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Fri Aug 10 17:27:38 2012
@@ -38,7 +38,6 @@
 #include "qpid/Msg.h"
 
 #include <iostream>
-#include <map>
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 
@@ -54,53 +53,29 @@ using boost::str;
 
 class SslConnector : public Connector
 {
-    struct Buff;
-
-    /** Batch up frames for writing to aio. */
-    class Writer : public framing::FrameHandler {
-        typedef sys::ssl::SslIOBufferBase BufferBase;
-        typedef std::vector<framing::AMQFrame> Frames;
-
-        const uint16_t maxFrameSize;
-        sys::Mutex lock;
-        sys::ssl::SslIO* aio;
-        BufferBase* buffer;
-        Frames frames;
-        size_t lastEof; // Position after last EOF in frames
-        framing::Buffer encode;
-        size_t framesEncoded;
-        std::string identifier;
-        Bounds* bounds;
-
-        void writeOne();
-        void newBuffer();
-
-      public:
-
-        Writer(uint16_t maxFrameSize, Bounds*);
-        ~Writer();
-        void init(std::string id, sys::ssl::SslIO*);
-        void handle(framing::AMQFrame&);
-        void write(sys::ssl::SslIO&);
-    };
+    typedef std::deque<framing::AMQFrame> Frames;
 
     const uint16_t maxFrameSize;
+
+    sys::Mutex lock;
+    Frames frames;
+    size_t lastEof; // Position after last EOF in frames
+    uint64_t currentSize;
+    Bounds* bounds;
+
     framing::ProtocolVersion version;
     bool initiated;
-    SecuritySettings securitySettings;
-
-    sys::Mutex closedLock;
     bool closed;
 
     sys::ShutdownHandler* shutdownHandler;
     framing::InputHandler* input;
 
-    Writer writer;
-
     sys::ssl::SslSocket socket;
 
     sys::ssl::SslIO* aio;
+    std::string identifier;
     Poller::shared_ptr poller;
+    SecuritySettings securitySettings;
 
     ~SslConnector();
 
@@ -110,10 +85,7 @@ class SslConnector : public Connector
     void eof(qpid::sys::ssl::SslIO&);
     void disconnected(qpid::sys::ssl::SslIO&);
 
-    std::string identifier;
-
     void connect(const std::string& host, const std::string& port);
-    void init();
     void close();
     void send(framing::AMQFrame& frame);
     void abort() {} // TODO: Need to fix for heartbeat timeouts to work
@@ -126,17 +98,16 @@ class SslConnector : public Connector
     const SecuritySettings* getSecuritySettings();
     void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&);
 
+    size_t decode(const char* buffer, size_t size);
+    size_t encode(const char* buffer, size_t size);
+    bool canEncode();
+
 public:
     SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
               const ConnectionSettings&,
               ConnectionImpl*);
 };
 
-struct SslConnector::Buff : public SslIO::BufferBase {
-    Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}
-    ~Buff() { delete [] bytes;}
-};
-
 // Static constructor which registers connector here
 namespace {
     Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
@@ -170,12 +141,14 @@ SslConnector::SslConnector(Poller::share
                      const ConnectionSettings& settings,
                      ConnectionImpl* cimpl)
     : maxFrameSize(settings.maxFrameSize),
+      lastEof(0),
+      currentSize(0),
+      bounds(cimpl),
       version(ver),
       initiated(false),
       closed(true),
       shutdownHandler(0),
       input(0),
-      writer(maxFrameSize, cimpl),
       aio(0),
       poller(p)
 {
@@ -192,7 +165,7 @@ SslConnector::~SslConnector() {
 }
 
 void SslConnector::connect(const std::string& host, const std::string& port){
-    Mutex::ScopedLock l(closedLock);
+    Mutex::ScopedLock l(lock);
     assert(closed);
     try {
         socket.connect(host, port);
@@ -201,7 +174,6 @@ void SslConnector::connect(const std::st
         throw TransportFailure(e.what());
     }
 
-    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     closed = false;
     aio = new SslIO(socket,
                        boost::bind(&SslConnector::readbuff, this, _1, _2),
@@ -210,21 +182,16 @@ void SslConnector::connect(const std::st
                        boost::bind(&SslConnector::socketClosed, this, _1, _2),
                        0, // nobuffs
                        boost::bind(&SslConnector::writebuff, this, _1));
-    writer.init(identifier, aio);
-}
 
-void SslConnector::init(){
-    Mutex::ScopedLock l(closedLock);
+    aio->createBuffers(maxFrameSize);
+    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     ProtocolInitiation init(version);
     writeDataBlock(init);
-    for (int i = 0; i < 32; i++) {
-        aio->queueReadBuffer(new Buff(maxFrameSize));
-    }
     aio->start(poller);
 }
 
 void SslConnector::close() {
-    Mutex::ScopedLock l(closedLock);
+    Mutex::ScopedLock l(lock);
     if (!closed) {
         closed = true;
         if (aio)
@@ -260,76 +227,110 @@ const std::string& SslConnector::getIden
 }
 
 void SslConnector::send(AMQFrame& frame) {
-    writer.handle(frame);
-}
-
-SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
-{
-}
-
-SslConnector::Writer::~Writer() { delete buffer; }
-
-void SslConnector::Writer::init(std::string id, sys::ssl::SslIO* a) {
-    Mutex::ScopedLock l(lock);
-    identifier = id;
-    aio = a;
-    newBuffer();
-}
-void SslConnector::Writer::handle(framing::AMQFrame& frame) {
+    bool notifyWrite = false;
+    {
     Mutex::ScopedLock l(lock);
     frames.push_back(frame);
-    if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) {
+    //only ask to write if this is the end of a frameset or if we
+    //already have a buffers worth of data
+    currentSize += frame.encodedSize();
+    if (frame.getEof()) {
         lastEof = frames.size();
-        aio->notifyPendingWrite();
+        notifyWrite = true;
+    } else {
+        notifyWrite = (currentSize >= maxFrameSize);
+    }
+    /*
+      NOTE: Moving the following line into this mutex block
+            is a workaround for BZ 570168, in which the test
+            testConcurrentSenders causes a hang about 1.5%
+            of the time.  ( To see the hang much more frequently
+            leave this line out of the mutex block, and put a
+            small usleep just before it.)
+
+            TODO mgoulish - fix the underlying cause and then
+                            move this call back outside the mutex.
+    */
+    if (notifyWrite && !closed) aio->notifyPendingWrite();
     }
-    QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
 }
 
-void SslConnector::Writer::writeOne() {
-    assert(buffer);
-    framesEncoded = 0;
+void SslConnector::writebuff(SslIO& /*aio*/)
+{
+    // It's possible to be disconnected and be writable
+    if (closed)
+        return;
 
-    buffer->dataStart = 0;
-    buffer->dataCount = encode.getPosition();
-    aio->queueWrite(buffer);
-    newBuffer();
-}
+    if (!canEncode()) {
+        return;
+    }
 
-void SslConnector::Writer::newBuffer() {
-    buffer = aio->getQueuedBuffer();
-    if (!buffer) buffer = new Buff(maxFrameSize);
-    encode = framing::Buffer(buffer->bytes, buffer->byteCount);
-    framesEncoded = 0;
+    SslIO::BufferBase* buffer = aio->getQueuedBuffer();
+    if (buffer) {
+
+        size_t encoded = encode(buffer->bytes, buffer->byteCount);
+
+        buffer->dataStart = 0;
+        buffer->dataCount = encoded;
+        aio->queueWrite(buffer);
+    }
 }
 
 // Called in IO thread.
-void SslConnector::Writer::write(sys::ssl::SslIO&) {
+bool SslConnector::canEncode()
+{
     Mutex::ScopedLock l(lock);
-    assert(buffer);
+    //have at least one full frameset or a whole buffers worth of data
+    return lastEof || currentSize >= maxFrameSize;
+}
+
+// Called in IO thread.
+size_t SslConnector::encode(const char* buffer, size_t size)
+{
+    framing::Buffer out(const_cast<char*>(buffer), size);
     size_t bytesWritten(0);
-    for (size_t i = 0; i < lastEof; ++i) {
-        AMQFrame& frame = frames[i];
-        uint32_t size = frame.encodedSize();
-        if (size > encode.available()) writeOne();
-        assert(size <= encode.available());
-        frame.encode(encode);
-        ++framesEncoded;
-        bytesWritten += size;
+    {
+        Mutex::ScopedLock l(lock);
+        while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+            frames.front().encode(out);
+            QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front());
+            frames.pop_front();
+            if (lastEof) --lastEof;
+        }
+        bytesWritten = size - out.available();
+        currentSize -= bytesWritten;
     }
-    frames.erase(frames.begin(), frames.begin()+lastEof);
-    lastEof = 0;
     if (bounds) bounds->reduce(bytesWritten);
-    if (encode.getPosition() > 0) writeOne();
+    return bytesWritten;
 }
 
-void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) {
-    framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff)
+{
+    int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount);
+    // TODO: unreading needs to go away, and when we can cope
+    // with multiple sub-buffers in the general buffer scheme, it will
+    if (decoded < buff->dataCount) {
+        // Adjust buffer for used bytes and then "unread them"
+        buff->dataStart += decoded;
+        buff->dataCount -= decoded;
+        aio.unread(buff);
+    } else {
+        // Give whole buffer back to aio subsystem
+        aio.queueReadBuffer(buff);
+    }
+}
 
+size_t SslConnector::decode(const char* buffer, size_t size)
+{
+    framing::Buffer in(const_cast<char*>(buffer), size);
     if (!initiated) {
         framing::ProtocolInitiation protocolInit;
         if (protocolInit.decode(in)) {
-            //TODO: check the version is correct
             QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
+            if(!(protocolInit==version)){
+                throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+                                         << " supported version " << version));
+            }
         }
         initiated = true;
     }
@@ -338,25 +339,12 @@ void SslConnector::readbuff(SslIO& aio, 
         QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
         input->received(frame);
     }
-    // TODO: unreading needs to go away, and when we can cope
-    // with multiple sub-buffers in the general buffer scheme, it will
-    if (in.available() != 0) {
-        // Adjust buffer for used bytes and then "unread them"
-        buff->dataStart += buff->dataCount-in.available();
-        buff->dataCount = in.available();
-        aio.unread(buff);
-    } else {
-        // Give whole buffer back to aio subsystem
-        aio.queueReadBuffer(buff);
-    }
-}
-
-void SslConnector::writebuff(SslIO& aio_) {
-    writer.write(aio_);
+    return size - in.available();
 }
 
 void SslConnector::writeDataBlock(const AMQDataBlock& data) {
-    SslIO::BufferBase* buff = new Buff(maxFrameSize);
+    SslIO::BufferBase* buff = aio->getQueuedBuffer();
+    assert(buff);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
     buff->dataCount = data.encodedSize();

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1371775&r1=1371774&r2=1371775&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp Fri Aug 10 17:27:38 2012
@@ -191,7 +191,7 @@ void SslEstablished(Poller::shared_ptr p
                                  boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
                                  boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
 
-    async->init(aio,timer, maxTime, 4);
+    async->init(aio,timer, maxTime);
     aio->start(poller);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp?rev=1371775&r1=1371774&r2=1371775&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.cpp Fri Aug 10 17:27:38 2012
@@ -33,15 +33,6 @@ namespace sys {
 namespace ssl {
 
 
-// Buffer definition
-struct Buff : public SslIO::BufferBase {
-    Buff() :
-        SslIO::BufferBase(new char[65536], 65536)
-    {}
-    ~Buff()
-    { delete [] bytes;}
-};
-
 struct ProtocolTimeoutTask : public sys::TimerTask {
     SslHandler& handler;
     std::string id;
@@ -78,7 +69,7 @@ SslHandler::~SslHandler() {
     delete codec;
 }
 
-void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) {
+void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime) {
     aio = a;
 
     // Start timer for this connection
@@ -86,17 +77,14 @@ void SslHandler::init(SslIO* a, Timer& t
     timer.add(timeoutTimerTask);
 
     // Give connection some buffers to use
-    for (int i = 0; i < numBuffs; i++) {
-        aio->queueReadBuffer(new Buff);
-    }
+    aio->createBuffers();
 }
 
 void SslHandler::write(const framing::ProtocolInitiation& data)
 {
     QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
     SslIO::BufferBase* buff = aio->getQueuedBuffer();
-    if (!buff)
-        buff = new Buff;
+    assert(buff);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
     buff->dataCount = data.encodedSize();
@@ -205,10 +193,11 @@ void SslHandler::idle(SslIO&){
         return;
     }
     if (codec == 0) return;
-    if (codec->canEncode()) {
-        // Try and get a queued buffer if not then construct new one
-        SslIO::BufferBase* buff = aio->getQueuedBuffer();
-        if (!buff) buff = new Buff;
+    if (!codec->canEncode()) {
+        return;
+    }
+    SslIO::BufferBase* buff = aio->getQueuedBuffer();
+    if (buff) {
         size_t encoded=codec->encode(buff->bytes, buff->byteCount);
         buff->dataCount = encoded;
         aio->queueWrite(buff);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.h?rev=1371775&r1=1371774&r2=1371775&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslHandler.h Fri Aug 10 17:27:38 2012
@@ -60,7 +60,7 @@ class SslHandler : public OutputControl 
   public:
     SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
     ~SslHandler();
-    void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
+    void init(SslIO* a, Timer& timer, uint32_t maxTime);
 
     void setClient() { isClient = true; }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=1371775&r1=1371774&r2=1371775&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp Fri Aug 10 17:27:38 2012
@@ -197,15 +197,7 @@ SslIO::SslIO(const SslSocket& s,
     s.setNonblocking();
 }
 
-struct deleter
-{
-    template <typename T>
-    void operator()(T *ptr){ delete ptr;}
-};
-
 SslIO::~SslIO() {
-    std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
-    std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
 }
 
 void SslIO::queueForDeletion() {
@@ -216,6 +208,19 @@ void SslIO::start(Poller::shared_ptr pol
     DispatchHandle::startWatch(poller);
 }
 
+void SslIO::createBuffers(uint32_t size) {
+    // Allocate all the buffer memory at once
+    bufferMemory.reset(new char[size*BufferCount]);
+
+    // Create the Buffer structs in a vector
+    // And push into the buffer queue
+    buffers.reserve(BufferCount);
+    for (uint32_t i = 0; i < BufferCount; i++) {
+        buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+        queueReadBuffer(&buffers[i]);
+    }
+}
+
 void SslIO::queueReadBuffer(BufferBase* buff) {
     assert(buff);
     buff->dataStart = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.h?rev=1371775&r1=1371774&r2=1371775&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ssl/SslIo.h Fri Aug 10 17:27:38 2012
@@ -25,6 +25,7 @@
 #include "qpid/sys/SecuritySettings.h"
 
 #include <boost/function.hpp>
+#include <boost/shared_array.hpp>
 #include <deque>
 
 namespace qpid {
@@ -87,8 +88,8 @@ private:
 };
 
 struct SslIOBufferBase {
-    char* const bytes;
-    const int32_t byteCount;
+    char* bytes;
+    int32_t byteCount;
     int32_t dataStart;
     int32_t dataCount;
     
@@ -127,7 +128,9 @@ public:
     typedef boost::function1<void, SslIO&> IdleCallback;
     typedef boost::function1<void, SslIO&> RequestCallback;
 
-
+    SslIO(const SslSocket& s,
+          ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+          ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
 private:
     ReadCallback readCallback;
     EofCallback eofCallback;
@@ -138,6 +141,8 @@ private:
     const SslSocket& socket;
     std::deque<BufferBase*> bufferQueue;
     std::deque<BufferBase*> writeQueue;
+    std::vector<BufferBase> buffers;
+    boost::shared_array<char> bufferMemory;
     bool queuedClose;
     /**
      * This flag is used to detect and handle concurrency between
@@ -148,12 +153,21 @@ private:
     volatile bool writePending;
 
 public:
-    SslIO(const SslSocket& s,
-        ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
-        ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+    /*
+     * Size of IO buffers - this is the maximum possible frame size + 1
+     */
+    const static uint32_t MaxBufferSize = 65536;
+
+    /*
+     * Number of IO buffers allocated - I think the code can only use 2 -
+     * 1 for reading and 1 for writing, allocate 4 for safety
+     */
+    const static uint32_t BufferCount = 4;
+
     void queueForDeletion();
 
     void start(qpid::sys::Poller::shared_ptr poller);
+    void createBuffers(uint32_t size = MaxBufferSize);
     void queueReadBuffer(BufferBase* buff);
     void unread(BufferBase* buff);
     void queueWrite(BufferBase* buff);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org