You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2012/08/10 19:27:29 UTC

svn commit: r1371774 - in /qpid/trunk/qpid/cpp/src/qpid: broker/windows/ client/ sys/ sys/posix/ sys/windows/

Author: astitcher
Date: Fri Aug 10 17:27:28 2012
New Revision: 1371774

URL: http://svn.apache.org/viewvc?rev=1371774&view=rev
Log:
Rearrange buffer memory ownership to avoid leaking buffer memory

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
    qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Fri Aug 10 17:27:28 2012
@@ -281,7 +281,7 @@ void SslProtocolFactory::established(sys
                                                     boost::bind(&AsynchIOHandler::idle, async, _1));
     }
 
-    async->init(aio, brokerTimer, maxNegotiateTime, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime);
     aio->start(poller);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp Fri Aug 10 17:27:28 2012
@@ -46,11 +46,6 @@ using namespace qpid::framing;
 using boost::format;
 using boost::str;
 
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
-    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
-    ~Buff() { delete [] bytes;}
-};
-
 // Static constructor which registers connector here
 namespace {
     Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
@@ -118,9 +113,8 @@ void TCPConnector::connected(const Socke
 
 void TCPConnector::start(sys::AsynchIO* aio_) {
     aio = aio_;
-    for (int i = 0; i < 4; i++) {
-        aio->queueReadBuffer(new Buff(maxFrameSize));
-    }
+
+    aio->createBuffers(maxFrameSize);
 
     identifier = str(format("[%1%]") % socket.getFullAddress());
 }
@@ -226,15 +220,19 @@ void TCPConnector::writebuff(AsynchIO& /
         return;
 
     Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
-    if (codec->canEncode()) {
-        std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
-        if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
+
+    if (!codec->canEncode()) {
+        return;
+    }
+
+    AsynchIO::BufferBase* buffer = aio->getQueuedBuffer();
+    if (buffer) {
 
         size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
 
         buffer->dataStart = 0;
         buffer->dataCount = encoded;
-        aio->queueWrite(buffer.release());
+        aio->queueWrite(buffer);
     }
 }
 
@@ -307,6 +305,7 @@ size_t TCPConnector::decode(const char* 
 
 void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
     AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+    assert(buff);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
     buff->dataCount = data.encodedSize();

Modified: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h Fri Aug 10 17:27:28 2012
@@ -50,7 +50,6 @@ namespace client {
 class TCPConnector : public Connector, public sys::Codec
 {
     typedef std::deque<framing::AMQFrame> Frames;
-    struct Buff;
 
     const uint16_t maxFrameSize;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Fri Aug 10 17:27:28 2012
@@ -76,8 +76,8 @@ protected:
 };
 
 struct AsynchIOBufferBase {
-    char* const bytes;
-    const int32_t byteCount;
+    char* bytes;
+    int32_t byteCount;
     int32_t dataStart;
     int32_t dataCount;
     
@@ -134,9 +134,21 @@ public:
                             BuffersEmptyCallback eCb = 0,
                             IdleCallback iCb = 0);
 public:
+    /*
+     * Size of IO buffers - this is the maximum possible frame size + 1
+     */
+    const static uint32_t MaxBufferSize = 65536;
+
+    /*
+     * Number of IO buffers allocated - I think the code can only use 2 -
+     * 1 for reading and 1 for writing, allocate 4 for safety
+     */
+    const static uint32_t BufferCount = 4;
+
     virtual void queueForDeletion() = 0;
 
     virtual void start(boost::shared_ptr<Poller> poller) = 0;
+    virtual void createBuffers(uint32_t size = MaxBufferSize) = 0;
     virtual void queueReadBuffer(BufferBase* buff) = 0;
     virtual void unread(BufferBase* buff) = 0;
     virtual void queueWrite(BufferBase* buff) = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Fri Aug 10 17:27:28 2012
@@ -33,15 +33,6 @@
 namespace qpid {
 namespace sys {
 
-// Buffer definition
-struct Buff : public AsynchIO::BufferBase {
-    Buff() :
-        AsynchIO::BufferBase(new char[65536], 65536)
-    {}
-    ~Buff()
-    { delete [] bytes;}
-};
-
 struct ProtocolTimeoutTask : public sys::TimerTask {
     AsynchIOHandler& handler;
     std::string id;
@@ -79,7 +70,7 @@ AsynchIOHandler::~AsynchIOHandler() {
     delete codec;
 }
 
-void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime) {
     aio = a;
 
     // Start timer for this connection
@@ -87,17 +78,14 @@ void AsynchIOHandler::init(qpid::sys::As
     timer.add(timeoutTimerTask);
 
     // Give connection some buffers to use
-    for (int i = 0; i < numBuffs; i++) {
-        aio->queueReadBuffer(new Buff);
-    }
+    aio->createBuffers();
 }
 
 void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
 {
     QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
     AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
-    if (!buff)
-        buff = new Buff;
+    assert(buff);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
     buff->dataCount = data.encodedSize();
@@ -244,24 +232,24 @@ void AsynchIOHandler::idle(AsynchIO&){
         return;
     }
     if (codec == 0) return;
-    try {
-        if (codec->canEncode()) {
-            // Try and get a queued buffer if not then construct new one
-            AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
-            if (!buff) buff = new Buff;
+    if (!codec->canEncode()) {
+        return;
+    }
+    AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+    if (buff) {
+        try {
             size_t encoded=codec->encode(buff->bytes, buff->byteCount);
             buff->dataCount = encoded;
             aio->queueWrite(buff);
+            if (!codec->isClosed()) {
+                return;
+            }
+        } catch (const std::exception& e) {
+            QPID_LOG(error, e.what());
         }
-        if (codec->isClosed()) {
-            readError = true;
-            aio->queueWriteClose();
-        }
-    } catch (const std::exception& e) {
-        QPID_LOG(error, e.what());
-        readError = true;
-        aio->queueWriteClose();
     }
+    readError = true;
+    aio->queueWriteClose();
 }
 
 }} // namespace qpid::sys

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Fri Aug 10 17:27:28 2012
@@ -61,7 +61,7 @@ class AsynchIOHandler : public OutputCon
   public:
     QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
     QPID_COMMON_EXTERN ~AsynchIOHandler();
-    QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
+    QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
 
     QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp Fri Aug 10 17:27:28 2012
@@ -247,7 +247,7 @@ void SslMuxProtocolFactory::established(
        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
        boost::bind(&AsynchIOHandler::idle, async, _1));
 
-    async->init(aio, brokerTimer, maxNegotiateTime, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime);
     aio->start(poller);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Aug 10 17:27:28 2012
@@ -166,7 +166,7 @@ void AsynchIOProtocolFactory::establishe
        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
        boost::bind(&AsynchIOHandler::idle, async, _1));
 
-    async->init(aio, brokerTimer, maxNegotiateTime, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime);
     aio->start(poller);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Aug 10 17:27:28 2012
@@ -40,6 +40,7 @@
 
 #include <boost/bind.hpp>
 #include <boost/lexical_cast.hpp>
+#include <boost/shared_array.hpp>
 
 namespace qpid {
 namespace sys {
@@ -239,6 +240,7 @@ public:
     virtual void queueForDeletion();
 
     virtual void start(Poller::shared_ptr poller);
+    virtual void createBuffers(uint32_t size);
     virtual void queueReadBuffer(BufferBase* buff);
     virtual void unread(BufferBase* buff);
     virtual void queueWrite(BufferBase* buff);
@@ -270,6 +272,8 @@ private:
     const Socket& socket;
     std::deque<BufferBase*> bufferQueue;
     std::deque<BufferBase*> writeQueue;
+    std::vector<BufferBase> buffers;
+    boost::shared_array<char> bufferMemory;
     bool queuedClose;
     /**
      * This flag is used to detect and handle concurrency between
@@ -309,15 +313,7 @@ AsynchIO::AsynchIO(const Socket& s,
     s.setNonblocking();
 }
 
-struct deleter
-{
-    template <typename T>
-    void operator()(T *ptr){ delete ptr;}
-};
-
 AsynchIO::~AsynchIO() {
-    std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
-    std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
 }
 
 void AsynchIO::queueForDeletion() {
@@ -328,6 +324,19 @@ void AsynchIO::start(Poller::shared_ptr 
     DispatchHandle::startWatch(poller);
 }
 
+void AsynchIO::createBuffers(uint32_t size) {
+    // Allocate all the buffer memory at once
+    bufferMemory.reset(new char[size*BufferCount]);
+
+    // Create the Buffer structs in a vector
+    // And push into the buffer queue
+    buffers.reserve(BufferCount);
+    for (uint32_t i = 0; i < BufferCount; i++) {
+        buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+        queueReadBuffer(&buffers[i]);
+    }
+}
+
 void AsynchIO::queueReadBuffer(BufferBase* buff) {
     assert(buff);
     buff->dataStart = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp Fri Aug 10 17:27:28 2012
@@ -40,6 +40,7 @@
 #include <windows.h>
 
 #include <boost/bind.hpp>
+#include <boost/shared_array.hpp>
 
 namespace {
 
@@ -252,6 +253,7 @@ public:
 
     /// Take any actions needed to prepare for working with the poller.
     virtual void start(Poller::shared_ptr poller);
+    virtual void createBuffers(uint32_t size);
     virtual void queueReadBuffer(BufferBase* buff);
     virtual void unread(BufferBase* buff);
     virtual void queueWrite(BufferBase* buff);
@@ -286,6 +288,8 @@ private:
      * access to the buffer queue and write queue.
      */
     Mutex bufferQueueLock;
+    std::vector<BufferBase> buffers;
+    boost::shared_array<char> bufferMemory;
 
     // Number of outstanding I/O operations.
     volatile LONG opsInProgress;
@@ -385,15 +389,7 @@ AsynchIO::AsynchIO(const Socket& s,
     working(false) {
 }
 
-struct deleter
-{
-    template <typename T>
-    void operator()(T *ptr){ delete ptr;}
-};
-
 AsynchIO::~AsynchIO() {
-    std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
-    std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
 }
 
 void AsynchIO::queueForDeletion() {
@@ -426,6 +422,19 @@ void AsynchIO::start(Poller::shared_ptr 
     startReading();
 }
 
+void AsynchIO::createBuffers(uint32_t size) {
+    // Allocate all the buffer memory at once
+    bufferMemory.reset(new char[size*BufferCount]);
+
+    // Create the Buffer structs in a vector
+    // And push into the buffer queue
+    buffers.reserve(BufferCount);
+    for (uint32_t i = 0; i < BufferCount; i++) {
+        buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+        queueReadBuffer(&buffers[i]);
+    }
+}
+
 void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
     assert(buff);
     buff->dataStart = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp Fri Aug 10 17:27:28 2012
@@ -55,7 +55,7 @@ namespace {
      * the frame layer for writing into.
      */
     struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase {
-        std::auto_ptr<qpid::sys::AsynchIO::BufferBase> aioBuff;
+        qpid::sys::AsynchIO::BufferBase* aioBuff;
 
         SslIoBuff (qpid::sys::AsynchIO::BufferBase *base,
                    const SecPkgContext_StreamSizes &sizes)
@@ -66,7 +66,6 @@ namespace {
         {}
 
         ~SslIoBuff() {}
-        qpid::sys::AsynchIO::BufferBase* release() { return aioBuff.release(); }
     };
 }
 
@@ -101,10 +100,7 @@ SslAsynchIO::SslAsynchIO(const qpid::sys
 }
 
 SslAsynchIO::~SslAsynchIO() {
-    if (leftoverPlaintext) {
-        delete leftoverPlaintext;
-        leftoverPlaintext = 0;
-    }
+    leftoverPlaintext = 0;
 }
 
 void SslAsynchIO::queueForDeletion() {
@@ -121,6 +117,10 @@ void SslAsynchIO::start(qpid::sys::Polle
     startNegotiate();
 }
 
+void SslAsynchIO::createBuffers(uint32_t size) {
+    aio->createBuffers(size);
+}
+
 void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
     aio->queueReadBuffer(buff);
 }
@@ -148,7 +148,7 @@ void SslAsynchIO::queueWrite(AsynchIO::B
     // encoding was working on, and adjusting counts for, the SslIoBuff.
     // Update the count of the original BufferBase before handing off to
     // the I/O layer.
-    buff = sslBuff->release();
+    buff = sslBuff->aioBuff;
     SecBuffer buffs[4];
     buffs[0].cbBuffer = schSizes.cbHeader;
     buffs[0].BufferType = SECBUFFER_STREAM_HEADER;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h?rev=1371774&r1=1371773&r2=1371774&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.h Fri Aug 10 17:27:28 2012
@@ -70,6 +70,7 @@ public:
     virtual void queueForDeletion();
 
     virtual void start(qpid::sys::Poller::shared_ptr poller);
+    virtual void createBuffers(uint32_t size);
     virtual void queueReadBuffer(BufferBase* buff);
     virtual void unread(BufferBase* buff);
     virtual void queueWrite(BufferBase* buff);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org