You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC

svn commit: r1377715 [7/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/ cpp/src/qpid/asyncStore/ cpp/src/qpid...

Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp Mon Aug 27 15:40:33 2012
@@ -43,11 +43,9 @@ ManagementDirectExchange::ManagementDire
 void ManagementDirectExchange::route(Deliverable&      msg)
 {
     bool routeIt = true;
-    const std::string& routingKey = msg.getMessage().getRoutingKey();
-    const FieldTable* args = msg.getMessage().getApplicationHeaders();
 
     if (managementAgent)
-        routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion);
+        routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, false, qmfVersion);
 
     if (routeIt)
         DirectExchange::route(msg);

Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp Mon Aug 27 15:40:33 2012
@@ -42,12 +42,10 @@ ManagementTopicExchange::ManagementTopic
 void ManagementTopicExchange::route(Deliverable&      msg)
 {
     bool routeIt = true;
-    const std::string& routingKey = msg.getMessage().getRoutingKey();
-    const FieldTable* args = msg.getMessage().getApplicationHeaders();
 
     // Intercept management agent commands
     if (managementAgent)
-        routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion);
+        routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, true, qmfVersion);
 
     if (routeIt)
         TopicExchange::route(msg);

Modified: qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp Mon Aug 27 15:40:33 2012
@@ -249,7 +249,7 @@ MessageStorePlugin::destroy(const broker
 void
 MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg)
 {
-    if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) {
+    if (msg->getPersistenceId() == 0) {
         provider->second->stage(msg);
     }
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h Mon Aug 27 15:40:33 2012
@@ -76,8 +76,8 @@ protected:
 };
 
 struct AsynchIOBufferBase {
-    char* const bytes;
-    const int32_t byteCount;
+    char* bytes;
+    int32_t byteCount;
     int32_t dataStart;
     int32_t dataCount;
     
@@ -134,9 +134,21 @@ public:
                             BuffersEmptyCallback eCb = 0,
                             IdleCallback iCb = 0);
 public:
+    /*
+     * Size of IO buffers - this is the maximum possible frame size + 1
+     */
+    const static uint32_t MaxBufferSize = 65536;
+
+    /*
+     * Number of IO buffers allocated - I think the code can only use 2 -
+     * 1 for reading and 1 for writing, allocate 4 for safety
+     */
+    const static uint32_t BufferCount = 4;
+
     virtual void queueForDeletion() = 0;
 
     virtual void start(boost::shared_ptr<Poller> poller) = 0;
+    virtual void createBuffers(uint32_t size = MaxBufferSize) = 0;
     virtual void queueReadBuffer(BufferBase* buff) = 0;
     virtual void unread(BufferBase* buff) = 0;
     virtual void queueWrite(BufferBase* buff) = 0;

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp Mon Aug 27 15:40:33 2012
@@ -33,15 +33,6 @@
 namespace qpid {
 namespace sys {
 
-// Buffer definition
-struct Buff : public AsynchIO::BufferBase {
-    Buff() :
-        AsynchIO::BufferBase(new char[65536], 65536)
-    {}
-    ~Buff()
-    { delete [] bytes;}
-};
-
 struct ProtocolTimeoutTask : public sys::TimerTask {
     AsynchIOHandler& handler;
     std::string id;
@@ -79,7 +70,7 @@ AsynchIOHandler::~AsynchIOHandler() {
     delete codec;
 }
 
-void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime) {
     aio = a;
 
     // Start timer for this connection
@@ -87,17 +78,14 @@ void AsynchIOHandler::init(qpid::sys::As
     timer.add(timeoutTimerTask);
 
     // Give connection some buffers to use
-    for (int i = 0; i < numBuffs; i++) {
-        aio->queueReadBuffer(new Buff);
-    }
+    aio->createBuffers();
 }
 
 void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
 {
     QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
     AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
-    if (!buff)
-        buff = new Buff;
+    assert(buff);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
     buff->dataCount = data.encodedSize();
@@ -244,24 +232,24 @@ void AsynchIOHandler::idle(AsynchIO&){
         return;
     }
     if (codec == 0) return;
-    try {
-        if (codec->canEncode()) {
-            // Try and get a queued buffer if not then construct new one
-            AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
-            if (!buff) buff = new Buff;
+    if (!codec->canEncode()) {
+        return;
+    }
+    AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+    if (buff) {
+        try {
             size_t encoded=codec->encode(buff->bytes, buff->byteCount);
             buff->dataCount = encoded;
             aio->queueWrite(buff);
+            if (!codec->isClosed()) {
+                return;
+            }
+        } catch (const std::exception& e) {
+            QPID_LOG(error, e.what());
         }
-        if (codec->isClosed()) {
-            readError = true;
-            aio->queueWriteClose();
-        }
-    } catch (const std::exception& e) {
-        QPID_LOG(error, e.what());
-        readError = true;
-        aio->queueWriteClose();
     }
+    readError = true;
+    aio->queueWriteClose();
 }
 
 }} // namespace qpid::sys

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h Mon Aug 27 15:40:33 2012
@@ -61,7 +61,7 @@ class AsynchIOHandler : public OutputCon
   public:
     QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
     QPID_COMMON_EXTERN ~AsynchIOHandler();
-    QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
+    QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
 
     QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp Mon Aug 27 15:40:33 2012
@@ -191,7 +191,7 @@ void SslEstablished(Poller::shared_ptr p
                                  boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
                                  boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
 
-    async->init(aio,timer, maxTime, 4);
+    async->init(aio,timer, maxTime);
     aio->start(poller);
 }
 
@@ -247,7 +247,7 @@ void SslMuxProtocolFactory::established(
        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
        boost::bind(&AsynchIOHandler::idle, async, _1));
 
-    async->init(aio, brokerTimer, maxNegotiateTime, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime);
     aio->start(poller);
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp Mon Aug 27 15:40:33 2012
@@ -166,7 +166,7 @@ void AsynchIOProtocolFactory::establishe
        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
        boost::bind(&AsynchIOHandler::idle, async, _1));
 
-    async->init(aio, brokerTimer, maxNegotiateTime, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime);
     aio->start(poller);
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Aug 27 15:40:33 2012
@@ -221,8 +221,8 @@ class PollerPrivate {
         }
     };
 
-    static ReadablePipe alwaysReadable;
-    static int alwaysReadableFd;
+    ReadablePipe alwaysReadable;
+    int alwaysReadableFd;
 
     class InterruptHandle: public PollerHandle {
         std::queue<PollerHandle*> handles;
@@ -290,6 +290,7 @@ class PollerPrivate {
     }
 
     PollerPrivate() :
+        alwaysReadableFd(alwaysReadable.getFD()),
         epollFd(::epoll_create(DefaultFds)),
         isShutdown(false) {
         QPID_POSIX_CHECK(epollFd);
@@ -328,9 +329,6 @@ class PollerPrivate {
     }
 };
 
-PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
-int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
-
 void Poller::registerHandle(PollerHandle& handle) {
     PollerHandlePrivate& eh = *handle.impl;
     ScopedLock<Mutex> l(eh.lock);

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/AsynchIO.cpp Mon Aug 27 15:40:33 2012
@@ -40,6 +40,7 @@
 
 #include <boost/bind.hpp>
 #include <boost/lexical_cast.hpp>
+#include <boost/shared_array.hpp>
 
 namespace qpid {
 namespace sys {
@@ -239,6 +240,7 @@ public:
     virtual void queueForDeletion();
 
     virtual void start(Poller::shared_ptr poller);
+    virtual void createBuffers(uint32_t size);
     virtual void queueReadBuffer(BufferBase* buff);
     virtual void unread(BufferBase* buff);
     virtual void queueWrite(BufferBase* buff);
@@ -270,6 +272,8 @@ private:
     const Socket& socket;
     std::deque<BufferBase*> bufferQueue;
     std::deque<BufferBase*> writeQueue;
+    std::vector<BufferBase> buffers;
+    boost::shared_array<char> bufferMemory;
     bool queuedClose;
     /**
      * This flag is used to detect and handle concurrency between
@@ -309,15 +313,7 @@ AsynchIO::AsynchIO(const Socket& s,
     s.setNonblocking();
 }
 
-struct deleter
-{
-    template <typename T>
-    void operator()(T *ptr){ delete ptr;}
-};
-
 AsynchIO::~AsynchIO() {
-    std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
-    std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
 }
 
 void AsynchIO::queueForDeletion() {
@@ -328,6 +324,19 @@ void AsynchIO::start(Poller::shared_ptr 
     DispatchHandle::startWatch(poller);
 }
 
+void AsynchIO::createBuffers(uint32_t size) {
+    // Allocate all the buffer memory at once
+    bufferMemory.reset(new char[size*BufferCount]);
+
+    // Create the Buffer structs in a vector
+    // And push into the buffer queue
+    buffers.reserve(BufferCount);
+    for (uint32_t i = 0; i < BufferCount; i++) {
+        buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+        queueReadBuffer(&buffers[i]);
+    }
+}
+
 void AsynchIO::queueReadBuffer(BufferBase* buff) {
     assert(buff);
     buff->dataStart = 0;

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp Mon Aug 27 15:40:33 2012
@@ -91,7 +91,7 @@ void SystemInfo::getLocalIpAddresses (ui
                 // * The scope id is illegal in URL syntax
                 // * Clients won't be able to use a link local address
                 //   without adding their own (potentially different) scope id
-                sockaddr_in6* sa6 = (sockaddr_in6*)(ifap->ifa_addr);
+                sockaddr_in6* sa6 = (sockaddr_in6*)((void*)ifap->ifa_addr);
                 if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break;
                 // Fallthrough
             }

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp Mon Aug 27 15:40:33 2012
@@ -33,15 +33,6 @@ namespace sys {
 namespace ssl {
 
 
-// Buffer definition
-struct Buff : public SslIO::BufferBase {
-    Buff() :
-        SslIO::BufferBase(new char[65536], 65536)
-    {}
-    ~Buff()
-    { delete [] bytes;}
-};
-
 struct ProtocolTimeoutTask : public sys::TimerTask {
     SslHandler& handler;
     std::string id;
@@ -78,7 +69,7 @@ SslHandler::~SslHandler() {
     delete codec;
 }
 
-void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) {
+void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime) {
     aio = a;
 
     // Start timer for this connection
@@ -86,17 +77,14 @@ void SslHandler::init(SslIO* a, Timer& t
     timer.add(timeoutTimerTask);
 
     // Give connection some buffers to use
-    for (int i = 0; i < numBuffs; i++) {
-        aio->queueReadBuffer(new Buff);
-    }
+    aio->createBuffers();
 }
 
 void SslHandler::write(const framing::ProtocolInitiation& data)
 {
     QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
     SslIO::BufferBase* buff = aio->getQueuedBuffer();
-    if (!buff)
-        buff = new Buff;
+    assert(buff);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
     buff->dataCount = data.encodedSize();
@@ -205,10 +193,11 @@ void SslHandler::idle(SslIO&){
         return;
     }
     if (codec == 0) return;
-    if (codec->canEncode()) {
-        // Try and get a queued buffer if not then construct new one
-        SslIO::BufferBase* buff = aio->getQueuedBuffer();
-        if (!buff) buff = new Buff;
+    if (!codec->canEncode()) {
+        return;
+    }
+    SslIO::BufferBase* buff = aio->getQueuedBuffer();
+    if (buff) {
         size_t encoded=codec->encode(buff->bytes, buff->byteCount);
         buff->dataCount = encoded;
         aio->queueWrite(buff);

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h Mon Aug 27 15:40:33 2012
@@ -60,7 +60,7 @@ class SslHandler : public OutputControl 
   public:
     SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
     ~SslHandler();
-    void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
+    void init(SslIO* a, Timer& timer, uint32_t maxTime);
 
     void setClient() { isClient = true; }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp Mon Aug 27 15:40:33 2012
@@ -197,15 +197,7 @@ SslIO::SslIO(const SslSocket& s,
     s.setNonblocking();
 }
 
-struct deleter
-{
-    template <typename T>
-    void operator()(T *ptr){ delete ptr;}
-};
-
 SslIO::~SslIO() {
-    std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
-    std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
 }
 
 void SslIO::queueForDeletion() {
@@ -216,6 +208,19 @@ void SslIO::start(Poller::shared_ptr pol
     DispatchHandle::startWatch(poller);
 }
 
+void SslIO::createBuffers(uint32_t size) {
+    // Allocate all the buffer memory at once
+    bufferMemory.reset(new char[size*BufferCount]);
+
+    // Create the Buffer structs in a vector
+    // And push into the buffer queue
+    buffers.reserve(BufferCount);
+    for (uint32_t i = 0; i < BufferCount; i++) {
+        buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+        queueReadBuffer(&buffers[i]);
+    }
+}
+
 void SslIO::queueReadBuffer(BufferBase* buff) {
     assert(buff);
     buff->dataStart = 0;

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h Mon Aug 27 15:40:33 2012
@@ -25,6 +25,7 @@
 #include "qpid/sys/SecuritySettings.h"
 
 #include <boost/function.hpp>
+#include <boost/shared_array.hpp>
 #include <deque>
 
 namespace qpid {
@@ -87,8 +88,8 @@ private:
 };
 
 struct SslIOBufferBase {
-    char* const bytes;
-    const int32_t byteCount;
+    char* bytes;
+    int32_t byteCount;
     int32_t dataStart;
     int32_t dataCount;
     
@@ -127,7 +128,9 @@ public:
     typedef boost::function1<void, SslIO&> IdleCallback;
     typedef boost::function1<void, SslIO&> RequestCallback;
 
-
+    SslIO(const SslSocket& s,
+          ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+          ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
 private:
     ReadCallback readCallback;
     EofCallback eofCallback;
@@ -138,6 +141,8 @@ private:
     const SslSocket& socket;
     std::deque<BufferBase*> bufferQueue;
     std::deque<BufferBase*> writeQueue;
+    std::vector<BufferBase> buffers;
+    boost::shared_array<char> bufferMemory;
     bool queuedClose;
     /**
      * This flag is used to detect and handle concurrency between
@@ -148,12 +153,21 @@ private:
     volatile bool writePending;
 
 public:
-    SslIO(const SslSocket& s,
-        ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
-        ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+    /*
+     * Size of IO buffers - this is the maximum possible frame size + 1
+     */
+    const static uint32_t MaxBufferSize = 65536;
+
+    /*
+     * Number of IO buffers allocated - I think the code can only use 2 -
+     * 1 for reading and 1 for writing, allocate 4 for safety
+     */
+    const static uint32_t BufferCount = 4;
+
     void queueForDeletion();
 
     void start(qpid::sys::Poller::shared_ptr poller);
+    void createBuffers(uint32_t size = MaxBufferSize);
     void queueReadBuffer(BufferBase* buff);
     void unread(BufferBase* buff);
     void queueWrite(BufferBase* buff);

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/AsynchIO.cpp Mon Aug 27 15:40:33 2012
@@ -40,6 +40,7 @@
 #include <windows.h>
 
 #include <boost/bind.hpp>
+#include <boost/shared_array.hpp>
 
 namespace {
 
@@ -252,6 +253,7 @@ public:
 
     /// Take any actions needed to prepare for working with the poller.
     virtual void start(Poller::shared_ptr poller);
+    virtual void createBuffers(uint32_t size);
     virtual void queueReadBuffer(BufferBase* buff);
     virtual void unread(BufferBase* buff);
     virtual void queueWrite(BufferBase* buff);
@@ -286,6 +288,8 @@ private:
      * access to the buffer queue and write queue.
      */
     Mutex bufferQueueLock;
+    std::vector<BufferBase> buffers;
+    boost::shared_array<char> bufferMemory;
 
     // Number of outstanding I/O operations.
     volatile LONG opsInProgress;
@@ -385,15 +389,7 @@ AsynchIO::AsynchIO(const Socket& s,
     working(false) {
 }
 
-struct deleter
-{
-    template <typename T>
-    void operator()(T *ptr){ delete ptr;}
-};
-
 AsynchIO::~AsynchIO() {
-    std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
-    std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
 }
 
 void AsynchIO::queueForDeletion() {
@@ -426,6 +422,19 @@ void AsynchIO::start(Poller::shared_ptr 
     startReading();
 }
 
+void AsynchIO::createBuffers(uint32_t size) {
+    // Allocate all the buffer memory at once
+    bufferMemory.reset(new char[size*BufferCount]);
+
+    // Create the Buffer structs in a vector
+    // And push into the buffer queue
+    buffers.reserve(BufferCount);
+    for (uint32_t i = 0; i < BufferCount; i++) {
+        buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+        queueReadBuffer(&buffers[i]);
+    }
+}
+
 void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
     assert(buff);
     buff->dataStart = 0;

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.cpp Mon Aug 27 15:40:33 2012
@@ -55,7 +55,7 @@ namespace {
      * the frame layer for writing into.
      */
     struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase {
-        std::auto_ptr<qpid::sys::AsynchIO::BufferBase> aioBuff;
+        qpid::sys::AsynchIO::BufferBase* aioBuff;
 
         SslIoBuff (qpid::sys::AsynchIO::BufferBase *base,
                    const SecPkgContext_StreamSizes &sizes)
@@ -66,7 +66,6 @@ namespace {
         {}
 
         ~SslIoBuff() {}
-        qpid::sys::AsynchIO::BufferBase* release() { return aioBuff.release(); }
     };
 }
 
@@ -101,10 +100,7 @@ SslAsynchIO::SslAsynchIO(const qpid::sys
 }
 
 SslAsynchIO::~SslAsynchIO() {
-    if (leftoverPlaintext) {
-        delete leftoverPlaintext;
-        leftoverPlaintext = 0;
-    }
+    leftoverPlaintext = 0;
 }
 
 void SslAsynchIO::queueForDeletion() {
@@ -121,6 +117,10 @@ void SslAsynchIO::start(qpid::sys::Polle
     startNegotiate();
 }
 
+void SslAsynchIO::createBuffers(uint32_t size) {
+    aio->createBuffers(size);
+}
+
 void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
     aio->queueReadBuffer(buff);
 }
@@ -148,7 +148,7 @@ void SslAsynchIO::queueWrite(AsynchIO::B
     // encoding was working on, and adjusting counts for, the SslIoBuff.
     // Update the count of the original BufferBase before handing off to
     // the I/O layer.
-    buff = sslBuff->release();
+    buff = sslBuff->aioBuff;
     SecBuffer buffs[4];
     buffs[0].cbBuffer = schSizes.cbHeader;
     buffs[0].BufferType = SECBUFFER_STREAM_HEADER;

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.h Mon Aug 27 15:40:33 2012
@@ -70,6 +70,7 @@ public:
     virtual void queueForDeletion();
 
     virtual void start(qpid::sys::Poller::shared_ptr poller);
+    virtual void createBuffers(uint32_t size);
     virtual void queueReadBuffer(BufferBase* buff);
     virtual void unread(BufferBase* buff);
     virtual void queueWrite(BufferBase* buff);

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/Time.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/Time.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/Time.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/Time.cpp Mon Aug 27 15:40:33 2012
@@ -20,10 +20,12 @@
  */
 
 #include "qpid/sys/Time.h"
+#include <cmath>
 #include <ostream>
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/thread/thread_time.hpp>
 #include <windows.h>
+#include <time.h>
 
 using namespace boost::posix_time;
 
@@ -33,8 +35,16 @@ namespace {
 // more or less. Keep track of the start value and the conversion factor to
 // seconds.
 bool timeInitialized = false;
-LARGE_INTEGER start;
-double freq = 1.0;
+LARGE_INTEGER start_hpc;
+double hpc_freq = 1.0;
+
+double start_time;
+
+/// Static constant to remove time skew between FILETIME and POSIX
+/// time.  POSIX and Win32 use different epochs (Jan. 1, 1970 v.s.
+/// Jan. 1, 1601).  The following constant defines the difference
+/// in 100ns ticks.
+const DWORDLONG FILETIME_to_timval_skew = 0x19db1ded53e8000;
 
 }
 
@@ -114,23 +124,59 @@ void outputFormattedNow(std::ostream& o)
 }
 
 void outputHiresNow(std::ostream& o) {
+    ::time_t tv_sec;
+    ::tm timeinfo;
+    char time_string[100];
+
     if (!timeInitialized) {
-        start.QuadPart = 0;
+        // To start, get the current time from FILETIME which includes
+        // sub-second resolution. However, since FILETIME is updated a bit
+        // "bumpy" every 15 msec or so, future time displays will be the
+        // starting FILETIME plus a delta based on the high-resolution
+        // performance counter.
+        FILETIME file_time;
+        ULARGE_INTEGER start_usec;
+        ::GetSystemTimeAsFileTime(&file_time);   // This is in 100ns units
+        start_usec.LowPart = file_time.dwLowDateTime;
+        start_usec.HighPart = file_time.dwHighDateTime;
+        start_usec.QuadPart -= FILETIME_to_timval_skew;
+        start_usec.QuadPart /= 10;   // Convert 100ns to usec
+        tv_sec = (time_t)(start_usec.QuadPart / (1000 * 1000));
+        long tv_usec = (long)(start_usec.QuadPart % (1000 * 1000));
+        start_time = static_cast<double>(tv_sec);
+        start_time += tv_usec / 1000000.0;
+
+        start_hpc.QuadPart = 0;
         LARGE_INTEGER iFreq;
         iFreq.QuadPart = 1;
-        QueryPerformanceCounter(&start);
+        QueryPerformanceCounter(&start_hpc);
         QueryPerformanceFrequency(&iFreq);
-        freq = static_cast<double>(iFreq.QuadPart);
+        hpc_freq = static_cast<double>(iFreq.QuadPart);
         timeInitialized = true;
     }
-    LARGE_INTEGER iNow;
-    iNow.QuadPart = 0;
-    QueryPerformanceCounter(&iNow);
-    iNow.QuadPart -= start.QuadPart;
-    if (iNow.QuadPart < 0)
-        iNow.QuadPart = 0;
-    double now = static_cast<double>(iNow.QuadPart);
-    now /= freq;                 // now is seconds after this
-    o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s ";
+    LARGE_INTEGER hpc_now;
+    hpc_now.QuadPart = 0;
+    QueryPerformanceCounter(&hpc_now);
+    hpc_now.QuadPart -= start_hpc.QuadPart;
+    if (hpc_now.QuadPart < 0)
+        hpc_now.QuadPart = 0;
+    double now = static_cast<double>(hpc_now.QuadPart);
+    now /= hpc_freq;                 // now is seconds after this
+    double fnow = start_time + now;
+    double usec, sec;
+    usec = modf(fnow, &sec);
+    tv_sec = static_cast<time_t>(sec);
+#ifdef _MSC_VER
+    ::localtime_s(&timeinfo, &tv_sec);
+#else
+    timeinfo = *(::localtime(&tv_sec));
+#endif
+    ::strftime(time_string, 100,
+               "%Y-%m-%d %H:%M:%S",
+               &timeinfo);
+    // No way to set "max field width" to cleanly output the double usec so
+    // convert it back to integral number of usecs and print that.
+    unsigned long i_usec = usec * 1000 * 1000;
+    o << time_string << "." << std::setw(6) << std::setfill('0') << i_usec << " ";
 }
 }}

Modified: qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp Mon Aug 27 15:40:33 2012
@@ -27,6 +27,7 @@
 
 #include "qpid/log/Statement.h"
 #include "qpid/broker/FedOps.h"
+#include "qpid/broker/MapHandler.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -198,7 +199,52 @@ bool XmlExchange::unbind(Queue::shared_p
     }
 }
 
-bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) 
+namespace {
+class DefineExternals : public MapHandler
+{
+  public:
+    DefineExternals(DynamicContext* c) : context(c) { assert(context); }
+    void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleInt8(const MapHandler::CharSequence& key, int8_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleInt16(const MapHandler::CharSequence& key, int16_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleInt32(const MapHandler::CharSequence& key, int32_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleInt64(const MapHandler::CharSequence& key, int64_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleFloat(const MapHandler::CharSequence& key, float value) { process(std::string(key.data, key.size), value); }
+    void handleDouble(const MapHandler::CharSequence& key, double value) { process(std::string(key.data, key.size), value); }
+    void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/)
+    {
+        process(std::string(key.data, key.size), std::string(value.data, value.size));
+    }
+    void handleVoid(const MapHandler::CharSequence&) {}
+  private:
+    void process(const std::string& key, double value)
+    {
+        QPID_LOG(trace, "XmlExchange, external variable (double): " << key << " = " << value);
+        Item::Ptr item = context->getItemFactory()->createDouble(value, context);
+        context->setExternalVariable(X(key.c_str()), item);
+    }
+    void process(const std::string& key, int value)
+    {
+        QPID_LOG(trace, "XmlExchange, external variable (int):" << key << " = " << value);
+        Item::Ptr item = context->getItemFactory()->createInteger(value, context);
+        context->setExternalVariable(X(key.c_str()), item);
+    }
+    void process(const std::string& key, const std::string& value)
+    {
+        QPID_LOG(trace, "XmlExchange, external variable (string):" << key << " = " << value);
+        Item::Ptr item = context->getItemFactory()->createString(X(value.c_str()), context);
+        context->setExternalVariable(X(key.c_str()), item);
+    }
+
+    DynamicContext* context;
+};
+
+}
+
+bool XmlExchange::matches(Query& query, Deliverable& msg, bool parse_message_content) 
 {
     std::string msgContent;
 
@@ -212,7 +258,7 @@ bool XmlExchange::matches(Query& query, 
 
         if (parse_message_content) {
 
-            msg.getMessage().getFrames().getContent(msgContent);
+            msgContent = msg.getMessage().getContent();
 
             QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
 
@@ -231,28 +277,8 @@ bool XmlExchange::matches(Query& query, 
             }
         }
 
-        if (args) {
-            FieldTable::ValueMap::const_iterator v = args->begin();
-            for(; v != args->end(); ++v) {
-
-                if (v->second->convertsTo<double>()) {
-                    QPID_LOG(trace, "XmlExchange, external variable (double): " << v->first << " = " << v->second->get<double>());
-                    Item::Ptr value = context->getItemFactory()->createDouble(v->second->get<double>(), context.get());
-                    context->setExternalVariable(X(v->first.c_str()), value);
-                }              
-                else if (v->second->convertsTo<int>()) {
-                    QPID_LOG(trace, "XmlExchange, external variable (int):" << v->first << " = " << v->second->getData().getInt());
-                    Item::Ptr value = context->getItemFactory()->createInteger(v->second->get<int>(), context.get());
-                    context->setExternalVariable(X(v->first.c_str()), value);
-                }
-                else if (v->second->convertsTo<std::string>()) {
-                    QPID_LOG(trace, "XmlExchange, external variable (string):" << v->first << " = " << v->second->getData().getString().c_str());
-                    Item::Ptr value = context->getItemFactory()->createString(X(v->second->get<std::string>().c_str()), context.get());
-                    context->setExternalVariable(X(v->first.c_str()), value);
-                }
-
-            }
-        }
+        DefineExternals f(context.get());
+        msg.getMessage().processProperties(f);
 
         Result result = query->execute(context.get());
 #ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP
@@ -286,7 +312,6 @@ bool XmlExchange::matches(Query& query, 
 void XmlExchange::route(Deliverable& msg)
 {
     const std::string& routingKey = msg.getMessage().getRoutingKey();
-    const FieldTable* args = msg.getMessage().getApplicationHeaders();
     PreRoute pr(msg, this);
     try {
         XmlBinding::vector::ConstPtr p;
@@ -298,7 +323,7 @@ void XmlExchange::route(Deliverable& msg
         }
 
         for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
-            if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { 
+            if (matches((*i)->xquery, msg, (*i)->parse_message_content)) { 
                 b->push_back(*i);
             }
         }

Modified: qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h Mon Aug 27 15:40:33 2012
@@ -65,7 +65,7 @@ class XmlExchange : public virtual Excha
 
     qpid::sys::RWlock lock;
 
-    bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content);
+    bool matches(Query& query, Deliverable& msg, bool parse_message_content);
 
   public:
     static const std::string typeName;

Propchange: qpid/branches/asyncstore/cpp/src/tests/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/tests:r1368652-1375508

Modified: qpid/branches/asyncstore/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/CMakeLists.txt?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/asyncstore/cpp/src/tests/CMakeLists.txt Mon Aug 27 15:40:33 2012
@@ -126,6 +126,7 @@ set(unit_tests_to_build
     ExchangeTest
     HeadersExchangeTest
     MessageTest
+    QueueDepth
     QueueRegistryTest
     QueuePolicyTest
     QueueFlowLimitTest
@@ -135,16 +136,12 @@ set(unit_tests_to_build
     TimerTest
     TopicExchangeTest
     TxBufferTest
-    TxPublishTest
-    MessageBuilderTest
     ManagementTest
     MessageReplayTracker
     ConsoleTest
-    QueueEvents
     ProxyTest
     RetryList
     FrameDecoder
-    ReplicationTest
     ClientMessageTest
     PollableCondition
     Variant
@@ -165,10 +162,6 @@ remember_location(unit_test)
 
 add_library (shlibtest MODULE shlibtest.cpp)
 
-if (BUILD_CLUSTER)
-  include (cluster.cmake)
-endif (BUILD_CLUSTER)
-
 # FIXME aconway 2009-11-30: enable SSL
 #if SSL
 #include ssl.mk

Modified: qpid/branches/asyncstore/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ClientSessionTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ClientSessionTest.cpp Mon Aug 27 15:40:33 2012
@@ -621,7 +621,7 @@ QPID_AUTO_TEST_CASE(testQueueDeleted)
     fix.session.queueDeclare(arg::queue="my-queue");
     LocalQueue queue;
     fix.subs.subscribe(queue, "my-queue");
-    
+
     ScopedSuppressLogging sl;
     fix.session.queueDelete(arg::queue="my-queue");
     BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException);

Modified: qpid/branches/asyncstore/cpp/src/tests/DeliveryRecordTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/DeliveryRecordTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/DeliveryRecordTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/DeliveryRecordTest.cpp Mon Aug 27 15:40:33 2012
@@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort)
 
     list<DeliveryRecord> records;
     for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
-        DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
+        DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
         r.setId(*i);
         records.push_back(r);
     }

Modified: qpid/branches/asyncstore/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ExchangeTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ExchangeTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ExchangeTest.cpp Mon Aug 27 15:40:33 2012
@@ -35,7 +35,6 @@
 
 using std::string;
 
-using boost::intrusive_ptr;
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::sys;
@@ -62,11 +61,9 @@ QPID_AUTO_TEST_CASE(testMe)
     queue.reset();
     queue2.reset();
 
-    intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "abc", false, "id"));
-    DeliverableMessage msg(msgPtr);
+    DeliverableMessage msg(MessageUtils::createMessage("exchange", "abc"), 0);
     topic.route(msg);
     direct.route(msg);
-
 }
 
 QPID_AUTO_TEST_CASE(testIsBound)
@@ -170,16 +167,6 @@ QPID_AUTO_TEST_CASE(testDeleteGetAndRede
     BOOST_CHECK_EQUAL(string("direct"), response.first->getType());
 }
 
-intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) {
-    intrusive_ptr<Message> msg(new Message());
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
-    AMQFrame header((AMQHeaderBody()));
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
-    msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
-    return msg;
-}
-
 QPID_AUTO_TEST_CASE(testSequenceOptions)
 {
     FieldTable args;
@@ -189,46 +176,35 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
     {
         DirectExchange direct("direct1", false, args);
 
-        intrusive_ptr<Message> msg1 = cmessage("e", "abc");
-        intrusive_ptr<Message> msg2 = cmessage("e", "abc");
-        intrusive_ptr<Message> msg3 = cmessage("e", "abc");
-
-        DeliverableMessage dmsg1(msg1);
-        DeliverableMessage dmsg2(msg2);
-        DeliverableMessage dmsg3(msg3);
-
-        direct.route(dmsg1);
-        direct.route(dmsg2);
-        direct.route(dmsg3);
-
-        BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-        BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-        BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0);
+        DeliverableMessage msg2(MessageUtils::createMessage("e", "abc"), 0);
+        DeliverableMessage msg3(MessageUtils::createMessage("e", "abc"), 0);
+
+        direct.route(msg1);
+        direct.route(msg2);
+        direct.route(msg3);
+
+        BOOST_CHECK_EQUAL(1, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
+        BOOST_CHECK_EQUAL(2, msg2.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
+        BOOST_CHECK_EQUAL(3, msg3.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
 
         FanOutExchange fanout("fanout1", false, args);
         HeadersExchange header("headers1", false, args);
         TopicExchange topic ("topic1", false, args);
 
         // check other exchanges, that they preroute
-        intrusive_ptr<Message> msg4 = cmessage("e", "abc");
-        intrusive_ptr<Message> msg5 = cmessage("e", "abc");
-
-        // Need at least empty header for the HeadersExchange to route at all
-        msg5->insertCustomProperty("", "");
-        intrusive_ptr<Message> msg6 = cmessage("e", "abc");
+        DeliverableMessage msg4(MessageUtils::createMessage("e", "abc"), 0);
+        DeliverableMessage msg5(MessageUtils::createMessage("e", "abc"), 0);
+        DeliverableMessage msg6(MessageUtils::createMessage("e", "abc"), 0);
 
-        DeliverableMessage dmsg4(msg4);
-        DeliverableMessage dmsg5(msg5);
-        DeliverableMessage dmsg6(msg6);
+        fanout.route(msg4);
+        BOOST_CHECK_EQUAL(1, msg4.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
 
-        fanout.route(dmsg4);
-        BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        header.route(msg5);
+        BOOST_CHECK_EQUAL(1, msg5.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
 
-        header.route(dmsg5);
-        BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
-        topic.route(dmsg6);
-        BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        topic.route(msg6);
+        BOOST_CHECK_EQUAL(1, msg6.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
         direct.encode(buffer);
     }
     {
@@ -237,11 +213,10 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
         buffer.reset();
         DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer);
 
-        intrusive_ptr<Message> msg1 = cmessage("e", "abc");
-        DeliverableMessage dmsg1(msg1);
-        exch_dec->route(dmsg1);
+        DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0);
+        exch_dec->route(msg1);
 
-        BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        BOOST_CHECK_EQUAL(4, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
 
     }
     delete [] buff;
@@ -256,9 +231,11 @@ QPID_AUTO_TEST_CASE(testIVEOption)
     HeadersExchange header("headers1", false, args);
     TopicExchange topic ("topic1", false, args);
 
-    intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
-    msg1->insertCustomProperty("a", "abc");
-    DeliverableMessage dmsg1(msg1);
+    qpid::types::Variant::Map properties;
+    properties["routing-key"] = "abc";
+    properties["a"] = "abc";
+    Message msg1 = MessageUtils::createMessage(properties, "my-message", "direct1");
+    DeliverableMessage dmsg1(msg1, 0);
 
     FieldTable args2;
     args2.setString("x-match", "any");
@@ -273,8 +250,6 @@ QPID_AUTO_TEST_CASE(testIVEOption)
     Queue::shared_ptr queue2(new Queue("queue2", true));
     Queue::shared_ptr queue3(new Queue("queue3", true));
 
-    BOOST_CHECK(HeadersExchange::match(args2, msg1->getProperties<MessageProperties>()->getApplicationHeaders()));
-
     BOOST_CHECK(direct.bind(queue, "abc", 0));
     BOOST_CHECK(fanout.bind(queue1, "abc", 0));
     BOOST_CHECK(header.bind(queue2, "", &args2));
@@ -287,7 +262,6 @@ QPID_AUTO_TEST_CASE(testIVEOption)
 
 }
 
-
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/asyncstore/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/Makefile.am?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/asyncstore/cpp/src/tests/Makefile.am Mon Aug 27 15:40:33 2012
@@ -96,6 +96,7 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	ExchangeTest.cpp \
 	HeadersExchangeTest.cpp \
 	MessageTest.cpp \
+	QueueDepth.cpp \
 	QueueRegistryTest.cpp \
 	QueuePolicyTest.cpp \
 	QueueFlowLimitTest.cpp \
@@ -105,19 +106,15 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	TimerTest.cpp \
 	TopicExchangeTest.cpp \
 	TxBufferTest.cpp \
-	TxPublishTest.cpp \
-	MessageBuilderTest.cpp \
 	ConnectionOptions.h \
 	ForkedBroker.h \
 	ForkedBroker.cpp \
 	ManagementTest.cpp \
 	MessageReplayTracker.cpp \
 	ConsoleTest.cpp \
-	QueueEvents.cpp \
 	ProxyTest.cpp \
 	RetryList.cpp \
 	FrameDecoder.cpp \
-	ReplicationTest.cpp \
 	ClientMessageTest.cpp \
 	PollableCondition.cpp \
 	Variant.cpp \
@@ -142,7 +139,6 @@ test_store_la_SOURCES = test_store.cpp
 test_store_la_LIBADD = $(lib_broker)
 test_store_la_LDFLAGS = -module
 
-include cluster.mk
 include sasl.mk
 if SSL
 include ssl.mk
@@ -338,7 +334,6 @@ EXTRA_DIST +=								\
   dynamic_log_level_test						\
   qpid-ctrl								\
   CMakeLists.txt							\
-  cluster.cmake								\
   windows/DisableWin32ErrorWindows.cpp					\
   background.ps1							\
   find_prog.ps1								\
@@ -372,14 +367,6 @@ LONG_TESTS+=start_broker \
  stop_broker \
  run_long_federation_sys_tests
 
-if HAVE_LIBCPG
-
-LONG_TESTS+=	federated_cluster_test_with_node_failure	\
-		run_failover_soak				\
-		reliable_replication_test
-
-endif HAVE_LIBCPG
-
 EXTRA_DIST+=						\
 	fanout_perftest					\
 	shared_perftest					\

Modified: qpid/branches/asyncstore/cpp/src/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/MessageTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/MessageTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/MessageTest.cpp Mon Aug 27 15:40:33 2012
@@ -24,6 +24,7 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/Uuid.h"
+#include "MessageUtils.h"
 
 #include "unit_test.h"
 
@@ -43,49 +44,29 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
 {
     string exchange = "MyExchange";
     string routingKey = "MyRoutingKey";
+    uint64_t ttl(60);
     Uuid messageId(true);
-    string data1("abcdefg");
-    string data2("hijklmn");
+    string data("abcdefghijklmn");
 
-    boost::intrusive_ptr<Message> msg(new Message());
-
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
-    AMQFrame header((AMQHeaderBody()));
-    AMQFrame content1((AMQContentBody(data1)));
-    AMQFrame content2((AMQContentBody(data2)));
-
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
-    msg->getFrames().append(content1);
-    msg->getFrames().append(content2);
-
-    MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true);
-    mProps->setContentLength(data1.size() + data2.size());
-    mProps->setMessageId(messageId);
-    FieldTable applicationHeaders;
-    applicationHeaders.setString("abc", "xyz");
-    mProps->setApplicationHeaders(applicationHeaders);
-    DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
-    dProps->setRoutingKey(routingKey);
-    dProps->setDeliveryMode(PERSISTENT);
-    BOOST_CHECK(msg->isPersistent());
-
-    std::vector<char> buff(msg->encodedSize());
-    Buffer wbuffer(&buff[0], msg->encodedSize());
-    msg->encode(wbuffer);
-
-    Buffer rbuffer(&buff[0], msg->encodedSize());
-    msg = new Message();
-    msg->decodeHeader(rbuffer);
-    msg->decodeContent(rbuffer);
-    BOOST_CHECK_EQUAL(exchange, msg->getExchangeName());
-    BOOST_CHECK_EQUAL(routingKey, msg->getRoutingKey());
-    BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize());
-    BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength());
-    BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
-    BOOST_CHECK_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("abc"));
-    BOOST_CHECK_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
-    BOOST_CHECK(msg->isPersistent());
+    qpid::types::Variant::Map properties;
+    properties["routing-key"] = routingKey;
+    properties["ttl"] = ttl;
+    properties["durable"] = true;
+    properties["message-id"] = qpid::types::Uuid(messageId.data());
+    properties["abc"] = "xyz";
+    Message msg = MessageUtils::createMessage(properties, data);
+
+    std::string buffer;
+    encode(msg, buffer);
+    msg = Message();
+    decode(buffer, msg);
+
+    BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
+    BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize());
+    BOOST_CHECK_EQUAL(data, msg.getContent());
+    //BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+    BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc"));
+    BOOST_CHECK(msg.isPersistent());
 }
 
 QPID_AUTO_TEST_SUITE_END()

Modified: qpid/branches/asyncstore/cpp/src/tests/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/MessageUtils.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/MessageUtils.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/MessageUtils.h Mon Aug 27 15:40:33 2012
@@ -20,9 +20,11 @@
  */
 
 #include "qpid/broker/Message.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/types/Variant.h"
 
 using namespace qpid;
 using namespace broker;
@@ -33,11 +35,46 @@ namespace tests {
 
 struct MessageUtils
 {
-    static boost::intrusive_ptr<Message> createMessage(const std::string& exchange="", const std::string& routingKey="",
-                                                       const bool durable = false, const Uuid& messageId=Uuid(true),
-                                                       uint64_t contentSize = 0)
+    static Message createMessage(const qpid::types::Variant::Map& properties, const std::string& content="", const std::string& destination = "")
     {
-        boost::intrusive_ptr<broker::Message> msg(new broker::Message());
+        boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer());
+
+        AMQFrame method(( MessageTransferBody(ProtocolVersion(), destination, 0, 0)));
+        AMQFrame header((AMQHeaderBody()));
+
+        msg->getFrames().append(method);
+        msg->getFrames().append(header);
+        if (content.size()) {
+            msg->getFrames().getHeaders()->get<MessageProperties>(true)->setContentLength(content.size());
+            AMQFrame data((AMQContentBody(content)));
+            msg->getFrames().append(data);
+        }
+        for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            if (i->first == "routing-key" && !i->second.isVoid()) {
+                msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(i->second);
+            } else if (i->first == "message-id" && !i->second.isVoid()) {
+                qpid::types::Uuid id = i->second;
+                qpid::framing::Uuid id2(id.data());
+                msg->getFrames().getHeaders()->get<MessageProperties>(true)->setMessageId(id2);
+            } else if (i->first == "ttl" && !i->second.isVoid()) {
+                msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(i->second);
+            } else if (i->first == "priority" && !i->second.isVoid()) {
+                msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setPriority(i->second);
+            } else if (i->first == "durable" && !i->second.isVoid()) {
+                msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(i->second.asBool() ? 2 : 1);
+            } else {
+                msg->getFrames().getHeaders()->get<MessageProperties>(true)->getApplicationHeaders().setString(i->first, i->second);
+            }
+        }
+        return Message(msg, msg);
+    }
+
+
+    static Message createMessage(const std::string& exchange="", const std::string& routingKey="",
+                                 uint64_t ttl = 0, bool durable = false, const Uuid& messageId=Uuid(true),
+                                 const std::string& content="")
+    {
+        boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer());
 
         AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
         AMQFrame header((AMQHeaderBody()));
@@ -45,18 +82,18 @@ struct MessageUtils
         msg->getFrames().append(method);
         msg->getFrames().append(header);
         MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
-        props->setContentLength(contentSize);
+        props->setContentLength(content.size());
         props->setMessageId(messageId);
         msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
         if (durable)
             msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
-        return msg;
-    }
-
-    static void addContent(boost::intrusive_ptr<Message> msg, const std::string& data)
-    {
-        AMQFrame content((AMQContentBody(data)));
-        msg->getFrames().append(content);
+        if (ttl)
+            msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
+        if (content.size()) {
+            AMQFrame data((AMQContentBody(content)));
+            msg->getFrames().append(data);
+        }
+        return Message(msg, msg);
     }
 };
 

Modified: qpid/branches/asyncstore/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/QueueFlowLimitTest.cpp Mon Aug 27 15:40:33 2012
@@ -23,8 +23,8 @@
 #include "unit_test.h"
 #include "test_tools.h"
 
-#include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/FieldValue.h"
@@ -66,21 +66,19 @@ public:
         return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
     }
 
-    static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& settings)
+    static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& arguments)
     {
+        QueueSettings settings;
+        settings.populate(arguments, settings.storeSettings);
         return QueueFlowLimit::createLimit(0, settings);
     }
 };
 
-
-
-QueuedMessage createMessage(uint32_t size)
+Message createMessage(uint32_t size)
 {
     static uint32_t seqNum;
-    QueuedMessage msg;
-    msg.payload = MessageUtils::createMessage();
-    msg.position = ++seqNum;
-    MessageUtils::addContent(msg.payload, std::string (size, 'x'));
+    Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x'));
+    msg.setSequence(++seqNum);
     return msg;
 }
 }
@@ -100,7 +98,7 @@ QPID_AUTO_TEST_CASE(testFlowCount)
     BOOST_CHECK(!flow->isFlowControlActive());
     BOOST_CHECK(flow->monitorFlowControl());
 
-    std::deque<QueuedMessage> msgs;
+    std::deque<Message> msgs;
     for (size_t i = 0; i < 6; i++) {
         msgs.push_back(createMessage(10));
         flow->enqueued(msgs.back());
@@ -135,7 +133,6 @@ QPID_AUTO_TEST_CASE(testFlowCount)
     BOOST_CHECK(!flow->isFlowControlActive());  // 4 on queue, OFF
 }
 
-
 QPID_AUTO_TEST_CASE(testFlowSize)
 {
     FieldTable args;
@@ -151,7 +148,7 @@ QPID_AUTO_TEST_CASE(testFlowSize)
     BOOST_CHECK(!flow->isFlowControlActive());
     BOOST_CHECK(flow->monitorFlowControl());
 
-    std::deque<QueuedMessage> msgs;
+    std::deque<Message> msgs;
     for (size_t i = 0; i < 6; i++) {
         msgs.push_back(createMessage(10));
         flow->enqueued(msgs.back());
@@ -161,14 +158,14 @@ QPID_AUTO_TEST_CASE(testFlowSize)
     BOOST_CHECK_EQUAL(6u, flow->getFlowCount());
     BOOST_CHECK_EQUAL(60u, flow->getFlowSize());
 
-    QueuedMessage msg_9 = createMessage(9);
+    Message msg_9 = createMessage(9);
     flow->enqueued(msg_9);
     BOOST_CHECK(!flow->isFlowControlActive());  // 69 on queue
-    QueuedMessage tinyMsg_1 = createMessage(1);
+    Message tinyMsg_1 = createMessage(1);
     flow->enqueued(tinyMsg_1);
     BOOST_CHECK(!flow->isFlowControlActive());   // 70 on queue
 
-    QueuedMessage tinyMsg_2 = createMessage(1);
+    Message tinyMsg_2 = createMessage(1);
     flow->enqueued(tinyMsg_2);
     BOOST_CHECK(flow->isFlowControlActive());   // 71 on queue, ON
     msgs.push_back(createMessage(10));
@@ -233,12 +230,12 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
     args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200);
     args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100);
 
-    std::deque<QueuedMessage> msgs_1;
-    std::deque<QueuedMessage> msgs_10;
-    std::deque<QueuedMessage> msgs_50;
-    std::deque<QueuedMessage> msgs_100;
+    std::deque<Message> msgs_1;
+    std::deque<Message> msgs_10;
+    std::deque<Message> msgs_50;
+    std::deque<Message> msgs_100;
 
-    QueuedMessage msg;
+    Message msg;
 
     std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
     BOOST_CHECK(!flow->isFlowControlActive());        // count:0  size:0
@@ -458,7 +455,6 @@ QPID_AUTO_TEST_CASE(testFlowDisable)
     }
 }
 
-
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/asyncstore/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/QueuePolicyTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/QueuePolicyTest.cpp Mon Aug 27 15:40:33 2012
@@ -22,12 +22,10 @@
 #include "unit_test.h"
 #include "test_tools.h"
 
-#include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/client/QueueOptions.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
-#include "MessageUtils.h"
 #include "BrokerFixture.h"
 
 using namespace qpid::broker;
@@ -39,118 +37,10 @@ namespace tests {
 
 QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
 
-namespace {
-QueuedMessage createMessage(uint32_t size)
-{
-    QueuedMessage msg;
-    msg.payload = MessageUtils::createMessage();
-    MessageUtils::addContent(msg.payload, std::string (size, 'x'));
-    return msg;
-}
-}
-
-QPID_AUTO_TEST_CASE(testCount)
-{
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 0));
-    BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
-    BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
-
-    QueuedMessage msg = createMessage(10);
-    for (size_t i = 0; i < 5; i++) {
-        policy->tryEnqueue(msg.payload);
-    }
-    try {
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on enqueuing sixth message");
-    } catch (const ResourceLimitExceededException&) {}
-
-    policy->dequeued(msg);
-    policy->tryEnqueue(msg.payload);
-
-    try {
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)");
-    } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testSize)
-{
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 0, 50));
-    QueuedMessage msg = createMessage(10);
-
-    for (size_t i = 0; i < 5; i++) {
-        policy->tryEnqueue(msg.payload);
-    }
-    try {
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-
-    policy->dequeued(msg);
-    policy->tryEnqueue(msg.payload);
-
-    try {
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testBoth)
-{
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 50));
-    try {
-        QueuedMessage msg = createMessage(51);
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-
-    std::vector<QueuedMessage> messages;
-    messages.push_back(createMessage(15));
-    messages.push_back(createMessage(10));
-    messages.push_back(createMessage(11));
-    messages.push_back(createMessage(2));
-    messages.push_back(createMessage(7));
-    for (size_t i = 0; i < messages.size(); i++) {
-        policy->tryEnqueue(messages[i].payload);
-    }
-    //size = 45 at this point, count = 5
-    try {
-        QueuedMessage msg = createMessage(5);
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-    try {
-        QueuedMessage msg = createMessage(10);
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-
-
-    policy->dequeued(messages[0]);
-    try {
-        QueuedMessage msg = createMessage(20);
-        policy->tryEnqueue(msg.payload);
-    } catch (const ResourceLimitExceededException&) {
-        BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy);
-    }
-}
-
-QPID_AUTO_TEST_CASE(testSettings)
-{
-    //test reading and writing the policy from/to field table
-    std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy("test", 101, 303));
-    FieldTable settings;
-    a->update(settings);
-    std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy("test", settings));
-    BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount());
-    BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
-}
-
 QPID_AUTO_TEST_CASE(testRingPolicyCount)
 {
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(RING, 0, 5);
 
     SessionFixture f;
     std::string q("my-ring-queue");
@@ -183,9 +73,8 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
 
     // Ring queue, 500 bytes maxSize
 
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(RING, 500, 0);
 
     SessionFixture f;
     std::string q("my-ring-queue");
@@ -255,9 +144,9 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
 
 QPID_AUTO_TEST_CASE(testStrictRingPolicy)
 {
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(RING_STRICT, 0, 5);
+    args.setString("qpid.flow_stop_count", "0");
 
     SessionFixture f;
     std::string q("my-ring-queue");
@@ -281,9 +170,8 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy
 
 QPID_AUTO_TEST_CASE(testPolicyWithDtx)
 {
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(REJECT, 0, 5);
 
     SessionFixture f;
     std::string q("my-policy-queue");
@@ -367,9 +255,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo
 
 QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit)
 {
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(REJECT, 0, 5);
 
     SessionFixture f;
     std::string q("q");

Modified: qpid/branches/asyncstore/cpp/src/tests/QueueRegistryTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/QueueRegistryTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/QueueRegistryTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/QueueRegistryTest.cpp Mon Aug 27 15:40:33 2012
@@ -19,6 +19,7 @@
 
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
 #include "unit_test.h"
 #include <string>
 
@@ -36,33 +37,23 @@ QPID_AUTO_TEST_CASE(testDeclare)
     QueueRegistry reg;
     std::pair<Queue::shared_ptr,  bool> qc;
 
-    qc = reg.declare(foo, false, 0, 0);
+    qc = reg.declare(foo, QueueSettings());
     Queue::shared_ptr q = qc.first;
     BOOST_CHECK(q);
     BOOST_CHECK(qc.second); // New queue
     BOOST_CHECK_EQUAL(foo, q->getName());
 
-    qc = reg.declare(foo, false, 0, 0);
+    qc = reg.declare(foo, QueueSettings());
     BOOST_CHECK_EQUAL(q, qc.first);
     BOOST_CHECK(!qc.second);
 
-    qc = reg.declare(bar, false, 0, 0);
+    qc = reg.declare(bar, QueueSettings());
     q = qc.first;
     BOOST_CHECK(q);
     BOOST_CHECK_EQUAL(true, qc.second);
     BOOST_CHECK_EQUAL(bar, q->getName());
 }
 
-QPID_AUTO_TEST_CASE(testDeclareTmp)
-{
-    QueueRegistry reg;
-    std::pair<Queue::shared_ptr,  bool> qc;
-
-    qc = reg.declare(std::string(), false, 0, 0);
-    BOOST_CHECK(qc.second);
-    BOOST_CHECK_EQUAL(std::string("tmp_1"), qc.first->getName());
-}
-
 QPID_AUTO_TEST_CASE(testFind)
 {
     std::string foo("foo");
@@ -72,8 +63,8 @@ QPID_AUTO_TEST_CASE(testFind)
 
     BOOST_CHECK(reg.find(foo) == 0);
 
-    reg.declare(foo, false, 0, 0);
-    reg.declare(bar, false, 0, 0);
+    reg.declare(foo, QueueSettings());
+    reg.declare(bar, QueueSettings());
     Queue::shared_ptr q = reg.find(bar);
     BOOST_CHECK(q);
     BOOST_CHECK_EQUAL(bar, q->getName());
@@ -85,7 +76,7 @@ QPID_AUTO_TEST_CASE(testDestroy)
     QueueRegistry reg;
     std::pair<Queue::shared_ptr,  bool> qc;
 
-    qc = reg.declare(foo, false, 0, 0);
+    qc = reg.declare(foo, QueueSettings());
     reg.destroy(foo);
     // Queue is gone from the registry.
     BOOST_CHECK(reg.find(foo) == 0);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org