You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC
svn commit: r1377715 [7/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/
cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/
cpp/src/qpid/asyncStore/ cpp/src/qpid...
Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp Mon Aug 27 15:40:33 2012
@@ -43,11 +43,9 @@ ManagementDirectExchange::ManagementDire
void ManagementDirectExchange::route(Deliverable& msg)
{
bool routeIt = true;
- const std::string& routingKey = msg.getMessage().getRoutingKey();
- const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (managementAgent)
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion);
+ routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, false, qmfVersion);
if (routeIt)
DirectExchange::route(msg);
Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp Mon Aug 27 15:40:33 2012
@@ -42,12 +42,10 @@ ManagementTopicExchange::ManagementTopic
void ManagementTopicExchange::route(Deliverable& msg)
{
bool routeIt = true;
- const std::string& routingKey = msg.getMessage().getRoutingKey();
- const FieldTable* args = msg.getMessage().getApplicationHeaders();
// Intercept management agent commands
if (managementAgent)
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion);
+ routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, true, qmfVersion);
if (routeIt)
TopicExchange::route(msg);
Modified: qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/store/MessageStorePlugin.cpp Mon Aug 27 15:40:33 2012
@@ -249,7 +249,7 @@ MessageStorePlugin::destroy(const broker
void
MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg)
{
- if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) {
+ if (msg->getPersistenceId() == 0) {
provider->second->stage(msg);
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIO.h Mon Aug 27 15:40:33 2012
@@ -76,8 +76,8 @@ protected:
};
struct AsynchIOBufferBase {
- char* const bytes;
- const int32_t byteCount;
+ char* bytes;
+ int32_t byteCount;
int32_t dataStart;
int32_t dataCount;
@@ -134,9 +134,21 @@ public:
BuffersEmptyCallback eCb = 0,
IdleCallback iCb = 0);
public:
+ /*
+ * Size of IO buffers - this is the maximum possible frame size + 1
+ */
+ const static uint32_t MaxBufferSize = 65536;
+
+ /*
+ * Number of IO buffers allocated - I think the code can only use 2 -
+ * 1 for reading and 1 for writing, allocate 4 for safety
+ */
+ const static uint32_t BufferCount = 4;
+
virtual void queueForDeletion() = 0;
virtual void start(boost::shared_ptr<Poller> poller) = 0;
+ virtual void createBuffers(uint32_t size = MaxBufferSize) = 0;
virtual void queueReadBuffer(BufferBase* buff) = 0;
virtual void unread(BufferBase* buff) = 0;
virtual void queueWrite(BufferBase* buff) = 0;
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp Mon Aug 27 15:40:33 2012
@@ -33,15 +33,6 @@
namespace qpid {
namespace sys {
-// Buffer definition
-struct Buff : public AsynchIO::BufferBase {
- Buff() :
- AsynchIO::BufferBase(new char[65536], 65536)
- {}
- ~Buff()
- { delete [] bytes;}
-};
-
struct ProtocolTimeoutTask : public sys::TimerTask {
AsynchIOHandler& handler;
std::string id;
@@ -79,7 +70,7 @@ AsynchIOHandler::~AsynchIOHandler() {
delete codec;
}
-void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime) {
aio = a;
// Start timer for this connection
@@ -87,17 +78,14 @@ void AsynchIOHandler::init(qpid::sys::As
timer.add(timeoutTimerTask);
// Give connection some buffers to use
- for (int i = 0; i < numBuffs; i++) {
- aio->queueReadBuffer(new Buff);
- }
+ aio->createBuffers();
}
void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
{
QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff)
- buff = new Buff;
+ assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
@@ -244,24 +232,24 @@ void AsynchIOHandler::idle(AsynchIO&){
return;
}
if (codec == 0) return;
- try {
- if (codec->canEncode()) {
- // Try and get a queued buffer if not then construct new one
- AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff) buff = new Buff;
+ if (!codec->canEncode()) {
+ return;
+ }
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (buff) {
+ try {
size_t encoded=codec->encode(buff->bytes, buff->byteCount);
buff->dataCount = encoded;
aio->queueWrite(buff);
+ if (!codec->isClosed()) {
+ return;
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(error, e.what());
}
- if (codec->isClosed()) {
- readError = true;
- aio->queueWriteClose();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
}
+ readError = true;
+ aio->queueWriteClose();
}
}} // namespace qpid::sys
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h Mon Aug 27 15:40:33 2012
@@ -61,7 +61,7 @@ class AsynchIOHandler : public OutputCon
public:
QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
QPID_COMMON_EXTERN ~AsynchIOHandler();
- QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
+ QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp Mon Aug 27 15:40:33 2012
@@ -191,7 +191,7 @@ void SslEstablished(Poller::shared_ptr p
boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
- async->init(aio,timer, maxTime, 4);
+ async->init(aio,timer, maxTime);
aio->start(poller);
}
@@ -247,7 +247,7 @@ void SslMuxProtocolFactory::established(
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, brokerTimer, maxNegotiateTime, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime);
aio->start(poller);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp Mon Aug 27 15:40:33 2012
@@ -166,7 +166,7 @@ void AsynchIOProtocolFactory::establishe
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, brokerTimer, maxNegotiateTime, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime);
aio->start(poller);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Aug 27 15:40:33 2012
@@ -221,8 +221,8 @@ class PollerPrivate {
}
};
- static ReadablePipe alwaysReadable;
- static int alwaysReadableFd;
+ ReadablePipe alwaysReadable;
+ int alwaysReadableFd;
class InterruptHandle: public PollerHandle {
std::queue<PollerHandle*> handles;
@@ -290,6 +290,7 @@ class PollerPrivate {
}
PollerPrivate() :
+ alwaysReadableFd(alwaysReadable.getFD()),
epollFd(::epoll_create(DefaultFds)),
isShutdown(false) {
QPID_POSIX_CHECK(epollFd);
@@ -328,9 +329,6 @@ class PollerPrivate {
}
};
-PollerPrivate::ReadablePipe PollerPrivate::alwaysReadable;
-int PollerPrivate::alwaysReadableFd = alwaysReadable.getFD();
-
void Poller::registerHandle(PollerHandle& handle) {
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/AsynchIO.cpp Mon Aug 27 15:40:33 2012
@@ -40,6 +40,7 @@
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
+#include <boost/shared_array.hpp>
namespace qpid {
namespace sys {
@@ -239,6 +240,7 @@ public:
virtual void queueForDeletion();
virtual void start(Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
virtual void queueReadBuffer(BufferBase* buff);
virtual void unread(BufferBase* buff);
virtual void queueWrite(BufferBase* buff);
@@ -270,6 +272,8 @@ private:
const Socket& socket;
std::deque<BufferBase*> bufferQueue;
std::deque<BufferBase*> writeQueue;
+ std::vector<BufferBase> buffers;
+ boost::shared_array<char> bufferMemory;
bool queuedClose;
/**
* This flag is used to detect and handle concurrency between
@@ -309,15 +313,7 @@ AsynchIO::AsynchIO(const Socket& s,
s.setNonblocking();
}
-struct deleter
-{
- template <typename T>
- void operator()(T *ptr){ delete ptr;}
-};
-
AsynchIO::~AsynchIO() {
- std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
- std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
}
void AsynchIO::queueForDeletion() {
@@ -328,6 +324,19 @@ void AsynchIO::start(Poller::shared_ptr
DispatchHandle::startWatch(poller);
}
+void AsynchIO::createBuffers(uint32_t size) {
+ // Allocate all the buffer memory at once
+ bufferMemory.reset(new char[size*BufferCount]);
+
+ // Create the Buffer structs in a vector
+ // And push into the buffer queue
+ buffers.reserve(BufferCount);
+ for (uint32_t i = 0; i < BufferCount; i++) {
+ buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+ queueReadBuffer(&buffers[i]);
+ }
+}
+
void AsynchIO::queueReadBuffer(BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp Mon Aug 27 15:40:33 2012
@@ -91,7 +91,7 @@ void SystemInfo::getLocalIpAddresses (ui
// * The scope id is illegal in URL syntax
// * Clients won't be able to use a link local address
// without adding their own (potentially different) scope id
- sockaddr_in6* sa6 = (sockaddr_in6*)(ifap->ifa_addr);
+ sockaddr_in6* sa6 = (sockaddr_in6*)((void*)ifap->ifa_addr);
if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break;
// Fallthrough
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp Mon Aug 27 15:40:33 2012
@@ -33,15 +33,6 @@ namespace sys {
namespace ssl {
-// Buffer definition
-struct Buff : public SslIO::BufferBase {
- Buff() :
- SslIO::BufferBase(new char[65536], 65536)
- {}
- ~Buff()
- { delete [] bytes;}
-};
-
struct ProtocolTimeoutTask : public sys::TimerTask {
SslHandler& handler;
std::string id;
@@ -78,7 +69,7 @@ SslHandler::~SslHandler() {
delete codec;
}
-void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) {
+void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime) {
aio = a;
// Start timer for this connection
@@ -86,17 +77,14 @@ void SslHandler::init(SslIO* a, Timer& t
timer.add(timeoutTimerTask);
// Give connection some buffers to use
- for (int i = 0; i < numBuffs; i++) {
- aio->queueReadBuffer(new Buff);
- }
+ aio->createBuffers();
}
void SslHandler::write(const framing::ProtocolInitiation& data)
{
QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
SslIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff)
- buff = new Buff;
+ assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
@@ -205,10 +193,11 @@ void SslHandler::idle(SslIO&){
return;
}
if (codec == 0) return;
- if (codec->canEncode()) {
- // Try and get a queued buffer if not then construct new one
- SslIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff) buff = new Buff;
+ if (!codec->canEncode()) {
+ return;
+ }
+ SslIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (buff) {
size_t encoded=codec->encode(buff->bytes, buff->byteCount);
buff->dataCount = encoded;
aio->queueWrite(buff);
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h Mon Aug 27 15:40:33 2012
@@ -60,7 +60,7 @@ class SslHandler : public OutputControl
public:
SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
~SslHandler();
- void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
+ void init(SslIO* a, Timer& timer, uint32_t maxTime);
void setClient() { isClient = true; }
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp Mon Aug 27 15:40:33 2012
@@ -197,15 +197,7 @@ SslIO::SslIO(const SslSocket& s,
s.setNonblocking();
}
-struct deleter
-{
- template <typename T>
- void operator()(T *ptr){ delete ptr;}
-};
-
SslIO::~SslIO() {
- std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
- std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
}
void SslIO::queueForDeletion() {
@@ -216,6 +208,19 @@ void SslIO::start(Poller::shared_ptr pol
DispatchHandle::startWatch(poller);
}
+void SslIO::createBuffers(uint32_t size) {
+ // Allocate all the buffer memory at once
+ bufferMemory.reset(new char[size*BufferCount]);
+
+ // Create the Buffer structs in a vector
+ // And push into the buffer queue
+ buffers.reserve(BufferCount);
+ for (uint32_t i = 0; i < BufferCount; i++) {
+ buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+ queueReadBuffer(&buffers[i]);
+ }
+}
+
void SslIO::queueReadBuffer(BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h Mon Aug 27 15:40:33 2012
@@ -25,6 +25,7 @@
#include "qpid/sys/SecuritySettings.h"
#include <boost/function.hpp>
+#include <boost/shared_array.hpp>
#include <deque>
namespace qpid {
@@ -87,8 +88,8 @@ private:
};
struct SslIOBufferBase {
- char* const bytes;
- const int32_t byteCount;
+ char* bytes;
+ int32_t byteCount;
int32_t dataStart;
int32_t dataCount;
@@ -127,7 +128,9 @@ public:
typedef boost::function1<void, SslIO&> IdleCallback;
typedef boost::function1<void, SslIO&> RequestCallback;
-
+ SslIO(const SslSocket& s,
+ ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+ ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
private:
ReadCallback readCallback;
EofCallback eofCallback;
@@ -138,6 +141,8 @@ private:
const SslSocket& socket;
std::deque<BufferBase*> bufferQueue;
std::deque<BufferBase*> writeQueue;
+ std::vector<BufferBase> buffers;
+ boost::shared_array<char> bufferMemory;
bool queuedClose;
/**
* This flag is used to detect and handle concurrency between
@@ -148,12 +153,21 @@ private:
volatile bool writePending;
public:
- SslIO(const SslSocket& s,
- ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
+ /*
+ * Size of IO buffers - this is the maximum possible frame size + 1
+ */
+ const static uint32_t MaxBufferSize = 65536;
+
+ /*
+ * Number of IO buffers allocated - I think the code can only use 2 -
+ * 1 for reading and 1 for writing, allocate 4 for safety
+ */
+ const static uint32_t BufferCount = 4;
+
void queueForDeletion();
void start(qpid::sys::Poller::shared_ptr poller);
+ void createBuffers(uint32_t size = MaxBufferSize);
void queueReadBuffer(BufferBase* buff);
void unread(BufferBase* buff);
void queueWrite(BufferBase* buff);
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/AsynchIO.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/AsynchIO.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/AsynchIO.cpp Mon Aug 27 15:40:33 2012
@@ -40,6 +40,7 @@
#include <windows.h>
#include <boost/bind.hpp>
+#include <boost/shared_array.hpp>
namespace {
@@ -252,6 +253,7 @@ public:
/// Take any actions needed to prepare for working with the poller.
virtual void start(Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
virtual void queueReadBuffer(BufferBase* buff);
virtual void unread(BufferBase* buff);
virtual void queueWrite(BufferBase* buff);
@@ -286,6 +288,8 @@ private:
* access to the buffer queue and write queue.
*/
Mutex bufferQueueLock;
+ std::vector<BufferBase> buffers;
+ boost::shared_array<char> bufferMemory;
// Number of outstanding I/O operations.
volatile LONG opsInProgress;
@@ -385,15 +389,7 @@ AsynchIO::AsynchIO(const Socket& s,
working(false) {
}
-struct deleter
-{
- template <typename T>
- void operator()(T *ptr){ delete ptr;}
-};
-
AsynchIO::~AsynchIO() {
- std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
- std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
}
void AsynchIO::queueForDeletion() {
@@ -426,6 +422,19 @@ void AsynchIO::start(Poller::shared_ptr
startReading();
}
+void AsynchIO::createBuffers(uint32_t size) {
+ // Allocate all the buffer memory at once
+ bufferMemory.reset(new char[size*BufferCount]);
+
+ // Create the Buffer structs in a vector
+ // And push into the buffer queue
+ buffers.reserve(BufferCount);
+ for (uint32_t i = 0; i < BufferCount; i++) {
+ buffers.push_back(BufferBase(&bufferMemory[i*size], size));
+ queueReadBuffer(&buffers[i]);
+ }
+}
+
void AsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
assert(buff);
buff->dataStart = 0;
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.cpp Mon Aug 27 15:40:33 2012
@@ -55,7 +55,7 @@ namespace {
* the frame layer for writing into.
*/
struct SslIoBuff : public qpid::sys::AsynchIO::BufferBase {
- std::auto_ptr<qpid::sys::AsynchIO::BufferBase> aioBuff;
+ qpid::sys::AsynchIO::BufferBase* aioBuff;
SslIoBuff (qpid::sys::AsynchIO::BufferBase *base,
const SecPkgContext_StreamSizes &sizes)
@@ -66,7 +66,6 @@ namespace {
{}
~SslIoBuff() {}
- qpid::sys::AsynchIO::BufferBase* release() { return aioBuff.release(); }
};
}
@@ -101,10 +100,7 @@ SslAsynchIO::SslAsynchIO(const qpid::sys
}
SslAsynchIO::~SslAsynchIO() {
- if (leftoverPlaintext) {
- delete leftoverPlaintext;
- leftoverPlaintext = 0;
- }
+ leftoverPlaintext = 0;
}
void SslAsynchIO::queueForDeletion() {
@@ -121,6 +117,10 @@ void SslAsynchIO::start(qpid::sys::Polle
startNegotiate();
}
+void SslAsynchIO::createBuffers(uint32_t size) {
+ aio->createBuffers(size);
+}
+
void SslAsynchIO::queueReadBuffer(AsynchIO::BufferBase* buff) {
aio->queueReadBuffer(buff);
}
@@ -148,7 +148,7 @@ void SslAsynchIO::queueWrite(AsynchIO::B
// encoding was working on, and adjusting counts for, the SslIoBuff.
// Update the count of the original BufferBase before handing off to
// the I/O layer.
- buff = sslBuff->release();
+ buff = sslBuff->aioBuff;
SecBuffer buffs[4];
buffs[0].cbBuffer = schSizes.cbHeader;
buffs[0].BufferType = SECBUFFER_STREAM_HEADER;
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/SslAsynchIO.h Mon Aug 27 15:40:33 2012
@@ -70,6 +70,7 @@ public:
virtual void queueForDeletion();
virtual void start(qpid::sys::Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
virtual void queueReadBuffer(BufferBase* buff);
virtual void unread(BufferBase* buff);
virtual void queueWrite(BufferBase* buff);
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/Time.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/Time.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/Time.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/Time.cpp Mon Aug 27 15:40:33 2012
@@ -20,10 +20,12 @@
*/
#include "qpid/sys/Time.h"
+#include <cmath>
#include <ostream>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread_time.hpp>
#include <windows.h>
+#include <time.h>
using namespace boost::posix_time;
@@ -33,8 +35,16 @@ namespace {
// more or less. Keep track of the start value and the conversion factor to
// seconds.
bool timeInitialized = false;
-LARGE_INTEGER start;
-double freq = 1.0;
+LARGE_INTEGER start_hpc;
+double hpc_freq = 1.0;
+
+double start_time;
+
+/// Static constant to remove time skew between FILETIME and POSIX
+/// time. POSIX and Win32 use different epochs (Jan. 1, 1970 v.s.
+/// Jan. 1, 1601). The following constant defines the difference
+/// in 100ns ticks.
+const DWORDLONG FILETIME_to_timval_skew = 0x19db1ded53e8000;
}
@@ -114,23 +124,59 @@ void outputFormattedNow(std::ostream& o)
}
void outputHiresNow(std::ostream& o) {
+ ::time_t tv_sec;
+ ::tm timeinfo;
+ char time_string[100];
+
if (!timeInitialized) {
- start.QuadPart = 0;
+ // To start, get the current time from FILETIME which includes
+ // sub-second resolution. However, since FILETIME is updated a bit
+ // "bumpy" every 15 msec or so, future time displays will be the
+ // starting FILETIME plus a delta based on the high-resolution
+ // performance counter.
+ FILETIME file_time;
+ ULARGE_INTEGER start_usec;
+ ::GetSystemTimeAsFileTime(&file_time); // This is in 100ns units
+ start_usec.LowPart = file_time.dwLowDateTime;
+ start_usec.HighPart = file_time.dwHighDateTime;
+ start_usec.QuadPart -= FILETIME_to_timval_skew;
+ start_usec.QuadPart /= 10; // Convert 100ns to usec
+ tv_sec = (time_t)(start_usec.QuadPart / (1000 * 1000));
+ long tv_usec = (long)(start_usec.QuadPart % (1000 * 1000));
+ start_time = static_cast<double>(tv_sec);
+ start_time += tv_usec / 1000000.0;
+
+ start_hpc.QuadPart = 0;
LARGE_INTEGER iFreq;
iFreq.QuadPart = 1;
- QueryPerformanceCounter(&start);
+ QueryPerformanceCounter(&start_hpc);
QueryPerformanceFrequency(&iFreq);
- freq = static_cast<double>(iFreq.QuadPart);
+ hpc_freq = static_cast<double>(iFreq.QuadPart);
timeInitialized = true;
}
- LARGE_INTEGER iNow;
- iNow.QuadPart = 0;
- QueryPerformanceCounter(&iNow);
- iNow.QuadPart -= start.QuadPart;
- if (iNow.QuadPart < 0)
- iNow.QuadPart = 0;
- double now = static_cast<double>(iNow.QuadPart);
- now /= freq; // now is seconds after this
- o << std::fixed << std::setprecision(8) << std::setw(16) << std::setfill('0') << now << "s ";
+ LARGE_INTEGER hpc_now;
+ hpc_now.QuadPart = 0;
+ QueryPerformanceCounter(&hpc_now);
+ hpc_now.QuadPart -= start_hpc.QuadPart;
+ if (hpc_now.QuadPart < 0)
+ hpc_now.QuadPart = 0;
+ double now = static_cast<double>(hpc_now.QuadPart);
+ now /= hpc_freq; // now is seconds after this
+ double fnow = start_time + now;
+ double usec, sec;
+ usec = modf(fnow, &sec);
+ tv_sec = static_cast<time_t>(sec);
+#ifdef _MSC_VER
+ ::localtime_s(&timeinfo, &tv_sec);
+#else
+ timeinfo = *(::localtime(&tv_sec));
+#endif
+ ::strftime(time_string, 100,
+ "%Y-%m-%d %H:%M:%S",
+ &timeinfo);
+ // No way to set "max field width" to cleanly output the double usec so
+ // convert it back to integral number of usecs and print that.
+ unsigned long i_usec = usec * 1000 * 1000;
+ o << time_string << "." << std::setw(6) << std::setfill('0') << i_usec << " ";
}
}}
Modified: qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.cpp Mon Aug 27 15:40:33 2012
@@ -27,6 +27,7 @@
#include "qpid/log/Statement.h"
#include "qpid/broker/FedOps.h"
+#include "qpid/broker/MapHandler.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -198,7 +199,52 @@ bool XmlExchange::unbind(Queue::shared_p
}
}
-bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content)
+namespace {
+class DefineExternals : public MapHandler
+{
+ public:
+ DefineExternals(DynamicContext* c) : context(c) { assert(context); }
+ void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleInt8(const MapHandler::CharSequence& key, int8_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleInt16(const MapHandler::CharSequence& key, int16_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleInt32(const MapHandler::CharSequence& key, int32_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleInt64(const MapHandler::CharSequence& key, int64_t value) { process(std::string(key.data, key.size), (int) value); }
+ void handleFloat(const MapHandler::CharSequence& key, float value) { process(std::string(key.data, key.size), value); }
+ void handleDouble(const MapHandler::CharSequence& key, double value) { process(std::string(key.data, key.size), value); }
+ void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/)
+ {
+ process(std::string(key.data, key.size), std::string(value.data, value.size));
+ }
+ void handleVoid(const MapHandler::CharSequence&) {}
+ private:
+ void process(const std::string& key, double value)
+ {
+ QPID_LOG(trace, "XmlExchange, external variable (double): " << key << " = " << value);
+ Item::Ptr item = context->getItemFactory()->createDouble(value, context);
+ context->setExternalVariable(X(key.c_str()), item);
+ }
+ void process(const std::string& key, int value)
+ {
+ QPID_LOG(trace, "XmlExchange, external variable (int):" << key << " = " << value);
+ Item::Ptr item = context->getItemFactory()->createInteger(value, context);
+ context->setExternalVariable(X(key.c_str()), item);
+ }
+ void process(const std::string& key, const std::string& value)
+ {
+ QPID_LOG(trace, "XmlExchange, external variable (string):" << key << " = " << value);
+ Item::Ptr item = context->getItemFactory()->createString(X(value.c_str()), context);
+ context->setExternalVariable(X(key.c_str()), item);
+ }
+
+ DynamicContext* context;
+};
+
+}
+
+bool XmlExchange::matches(Query& query, Deliverable& msg, bool parse_message_content)
{
std::string msgContent;
@@ -212,7 +258,7 @@ bool XmlExchange::matches(Query& query,
if (parse_message_content) {
- msg.getMessage().getFrames().getContent(msgContent);
+ msgContent = msg.getMessage().getContent();
QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
@@ -231,28 +277,8 @@ bool XmlExchange::matches(Query& query,
}
}
- if (args) {
- FieldTable::ValueMap::const_iterator v = args->begin();
- for(; v != args->end(); ++v) {
-
- if (v->second->convertsTo<double>()) {
- QPID_LOG(trace, "XmlExchange, external variable (double): " << v->first << " = " << v->second->get<double>());
- Item::Ptr value = context->getItemFactory()->createDouble(v->second->get<double>(), context.get());
- context->setExternalVariable(X(v->first.c_str()), value);
- }
- else if (v->second->convertsTo<int>()) {
- QPID_LOG(trace, "XmlExchange, external variable (int):" << v->first << " = " << v->second->getData().getInt());
- Item::Ptr value = context->getItemFactory()->createInteger(v->second->get<int>(), context.get());
- context->setExternalVariable(X(v->first.c_str()), value);
- }
- else if (v->second->convertsTo<std::string>()) {
- QPID_LOG(trace, "XmlExchange, external variable (string):" << v->first << " = " << v->second->getData().getString().c_str());
- Item::Ptr value = context->getItemFactory()->createString(X(v->second->get<std::string>().c_str()), context.get());
- context->setExternalVariable(X(v->first.c_str()), value);
- }
-
- }
- }
+ DefineExternals f(context.get());
+ msg.getMessage().processProperties(f);
Result result = query->execute(context.get());
#ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP
@@ -286,7 +312,6 @@ bool XmlExchange::matches(Query& query,
void XmlExchange::route(Deliverable& msg)
{
const std::string& routingKey = msg.getMessage().getRoutingKey();
- const FieldTable* args = msg.getMessage().getApplicationHeaders();
PreRoute pr(msg, this);
try {
XmlBinding::vector::ConstPtr p;
@@ -298,7 +323,7 @@ void XmlExchange::route(Deliverable& msg
}
for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
- if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
+ if (matches((*i)->xquery, msg, (*i)->parse_message_content)) {
b->push_back(*i);
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/xml/XmlExchange.h Mon Aug 27 15:40:33 2012
@@ -65,7 +65,7 @@ class XmlExchange : public virtual Excha
qpid::sys::RWlock lock;
- bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content);
+ bool matches(Query& query, Deliverable& msg, bool parse_message_content);
public:
static const std::string typeName;
Propchange: qpid/branches/asyncstore/cpp/src/tests/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/tests:r1368652-1375508
Modified: qpid/branches/asyncstore/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/CMakeLists.txt?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/asyncstore/cpp/src/tests/CMakeLists.txt Mon Aug 27 15:40:33 2012
@@ -126,6 +126,7 @@ set(unit_tests_to_build
ExchangeTest
HeadersExchangeTest
MessageTest
+ QueueDepth
QueueRegistryTest
QueuePolicyTest
QueueFlowLimitTest
@@ -135,16 +136,12 @@ set(unit_tests_to_build
TimerTest
TopicExchangeTest
TxBufferTest
- TxPublishTest
- MessageBuilderTest
ManagementTest
MessageReplayTracker
ConsoleTest
- QueueEvents
ProxyTest
RetryList
FrameDecoder
- ReplicationTest
ClientMessageTest
PollableCondition
Variant
@@ -165,10 +162,6 @@ remember_location(unit_test)
add_library (shlibtest MODULE shlibtest.cpp)
-if (BUILD_CLUSTER)
- include (cluster.cmake)
-endif (BUILD_CLUSTER)
-
# FIXME aconway 2009-11-30: enable SSL
#if SSL
#include ssl.mk
Modified: qpid/branches/asyncstore/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ClientSessionTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ClientSessionTest.cpp Mon Aug 27 15:40:33 2012
@@ -621,7 +621,7 @@ QPID_AUTO_TEST_CASE(testQueueDeleted)
fix.session.queueDeclare(arg::queue="my-queue");
LocalQueue queue;
fix.subs.subscribe(queue, "my-queue");
-
+
ScopedSuppressLogging sl;
fix.session.queueDelete(arg::queue="my-queue");
BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException);
Modified: qpid/branches/asyncstore/cpp/src/tests/DeliveryRecordTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/DeliveryRecordTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/DeliveryRecordTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/DeliveryRecordTest.cpp Mon Aug 27 15:40:33 2012
@@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort)
list<DeliveryRecord> records;
for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
- DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
+ DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
r.setId(*i);
records.push_back(r);
}
Modified: qpid/branches/asyncstore/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ExchangeTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ExchangeTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ExchangeTest.cpp Mon Aug 27 15:40:33 2012
@@ -35,7 +35,6 @@
using std::string;
-using boost::intrusive_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -62,11 +61,9 @@ QPID_AUTO_TEST_CASE(testMe)
queue.reset();
queue2.reset();
- intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "abc", false, "id"));
- DeliverableMessage msg(msgPtr);
+ DeliverableMessage msg(MessageUtils::createMessage("exchange", "abc"), 0);
topic.route(msg);
direct.route(msg);
-
}
QPID_AUTO_TEST_CASE(testIsBound)
@@ -170,16 +167,6 @@ QPID_AUTO_TEST_CASE(testDeleteGetAndRede
BOOST_CHECK_EQUAL(string("direct"), response.first->getType());
}
-intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) {
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
- return msg;
-}
-
QPID_AUTO_TEST_CASE(testSequenceOptions)
{
FieldTable args;
@@ -189,46 +176,35 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
{
DirectExchange direct("direct1", false, args);
- intrusive_ptr<Message> msg1 = cmessage("e", "abc");
- intrusive_ptr<Message> msg2 = cmessage("e", "abc");
- intrusive_ptr<Message> msg3 = cmessage("e", "abc");
-
- DeliverableMessage dmsg1(msg1);
- DeliverableMessage dmsg2(msg2);
- DeliverableMessage dmsg3(msg3);
-
- direct.route(dmsg1);
- direct.route(dmsg2);
- direct.route(dmsg3);
-
- BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0);
+ DeliverableMessage msg2(MessageUtils::createMessage("e", "abc"), 0);
+ DeliverableMessage msg3(MessageUtils::createMessage("e", "abc"), 0);
+
+ direct.route(msg1);
+ direct.route(msg2);
+ direct.route(msg3);
+
+ BOOST_CHECK_EQUAL(1, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
+ BOOST_CHECK_EQUAL(2, msg2.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
+ BOOST_CHECK_EQUAL(3, msg3.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
FanOutExchange fanout("fanout1", false, args);
HeadersExchange header("headers1", false, args);
TopicExchange topic ("topic1", false, args);
// check other exchanges, that they preroute
- intrusive_ptr<Message> msg4 = cmessage("e", "abc");
- intrusive_ptr<Message> msg5 = cmessage("e", "abc");
-
- // Need at least empty header for the HeadersExchange to route at all
- msg5->insertCustomProperty("", "");
- intrusive_ptr<Message> msg6 = cmessage("e", "abc");
+ DeliverableMessage msg4(MessageUtils::createMessage("e", "abc"), 0);
+ DeliverableMessage msg5(MessageUtils::createMessage("e", "abc"), 0);
+ DeliverableMessage msg6(MessageUtils::createMessage("e", "abc"), 0);
- DeliverableMessage dmsg4(msg4);
- DeliverableMessage dmsg5(msg5);
- DeliverableMessage dmsg6(msg6);
+ fanout.route(msg4);
+ BOOST_CHECK_EQUAL(1, msg4.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
- fanout.route(dmsg4);
- BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ header.route(msg5);
+ BOOST_CHECK_EQUAL(1, msg5.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
- header.route(dmsg5);
- BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
- topic.route(dmsg6);
- BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ topic.route(msg6);
+ BOOST_CHECK_EQUAL(1, msg6.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
direct.encode(buffer);
}
{
@@ -237,11 +213,10 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
buffer.reset();
DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer);
- intrusive_ptr<Message> msg1 = cmessage("e", "abc");
- DeliverableMessage dmsg1(msg1);
- exch_dec->route(dmsg1);
+ DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0);
+ exch_dec->route(msg1);
- BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(4, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
}
delete [] buff;
@@ -256,9 +231,11 @@ QPID_AUTO_TEST_CASE(testIVEOption)
HeadersExchange header("headers1", false, args);
TopicExchange topic ("topic1", false, args);
- intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
- msg1->insertCustomProperty("a", "abc");
- DeliverableMessage dmsg1(msg1);
+ qpid::types::Variant::Map properties;
+ properties["routing-key"] = "abc";
+ properties["a"] = "abc";
+ Message msg1 = MessageUtils::createMessage(properties, "my-message", "direct1");
+ DeliverableMessage dmsg1(msg1, 0);
FieldTable args2;
args2.setString("x-match", "any");
@@ -273,8 +250,6 @@ QPID_AUTO_TEST_CASE(testIVEOption)
Queue::shared_ptr queue2(new Queue("queue2", true));
Queue::shared_ptr queue3(new Queue("queue3", true));
- BOOST_CHECK(HeadersExchange::match(args2, msg1->getProperties<MessageProperties>()->getApplicationHeaders()));
-
BOOST_CHECK(direct.bind(queue, "abc", 0));
BOOST_CHECK(fanout.bind(queue1, "abc", 0));
BOOST_CHECK(header.bind(queue2, "", &args2));
@@ -287,7 +262,6 @@ QPID_AUTO_TEST_CASE(testIVEOption)
}
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/asyncstore/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/Makefile.am?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/asyncstore/cpp/src/tests/Makefile.am Mon Aug 27 15:40:33 2012
@@ -96,6 +96,7 @@ unit_test_SOURCES= unit_test.cpp unit_te
ExchangeTest.cpp \
HeadersExchangeTest.cpp \
MessageTest.cpp \
+ QueueDepth.cpp \
QueueRegistryTest.cpp \
QueuePolicyTest.cpp \
QueueFlowLimitTest.cpp \
@@ -105,19 +106,15 @@ unit_test_SOURCES= unit_test.cpp unit_te
TimerTest.cpp \
TopicExchangeTest.cpp \
TxBufferTest.cpp \
- TxPublishTest.cpp \
- MessageBuilderTest.cpp \
ConnectionOptions.h \
ForkedBroker.h \
ForkedBroker.cpp \
ManagementTest.cpp \
MessageReplayTracker.cpp \
ConsoleTest.cpp \
- QueueEvents.cpp \
ProxyTest.cpp \
RetryList.cpp \
FrameDecoder.cpp \
- ReplicationTest.cpp \
ClientMessageTest.cpp \
PollableCondition.cpp \
Variant.cpp \
@@ -142,7 +139,6 @@ test_store_la_SOURCES = test_store.cpp
test_store_la_LIBADD = $(lib_broker)
test_store_la_LDFLAGS = -module
-include cluster.mk
include sasl.mk
if SSL
include ssl.mk
@@ -338,7 +334,6 @@ EXTRA_DIST += \
dynamic_log_level_test \
qpid-ctrl \
CMakeLists.txt \
- cluster.cmake \
windows/DisableWin32ErrorWindows.cpp \
background.ps1 \
find_prog.ps1 \
@@ -372,14 +367,6 @@ LONG_TESTS+=start_broker \
stop_broker \
run_long_federation_sys_tests
-if HAVE_LIBCPG
-
-LONG_TESTS+= federated_cluster_test_with_node_failure \
- run_failover_soak \
- reliable_replication_test
-
-endif HAVE_LIBCPG
-
EXTRA_DIST+= \
fanout_perftest \
shared_perftest \
Modified: qpid/branches/asyncstore/cpp/src/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/MessageTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/MessageTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/MessageTest.cpp Mon Aug 27 15:40:33 2012
@@ -24,6 +24,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/Uuid.h"
+#include "MessageUtils.h"
#include "unit_test.h"
@@ -43,49 +44,29 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
{
string exchange = "MyExchange";
string routingKey = "MyRoutingKey";
+ uint64_t ttl(60);
Uuid messageId(true);
- string data1("abcdefg");
- string data2("hijklmn");
+ string data("abcdefghijklmn");
- boost::intrusive_ptr<Message> msg(new Message());
-
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content1((AMQContentBody(data1)));
- AMQFrame content2((AMQContentBody(data2)));
-
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- msg->getFrames().append(content1);
- msg->getFrames().append(content2);
-
- MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true);
- mProps->setContentLength(data1.size() + data2.size());
- mProps->setMessageId(messageId);
- FieldTable applicationHeaders;
- applicationHeaders.setString("abc", "xyz");
- mProps->setApplicationHeaders(applicationHeaders);
- DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
- dProps->setRoutingKey(routingKey);
- dProps->setDeliveryMode(PERSISTENT);
- BOOST_CHECK(msg->isPersistent());
-
- std::vector<char> buff(msg->encodedSize());
- Buffer wbuffer(&buff[0], msg->encodedSize());
- msg->encode(wbuffer);
-
- Buffer rbuffer(&buff[0], msg->encodedSize());
- msg = new Message();
- msg->decodeHeader(rbuffer);
- msg->decodeContent(rbuffer);
- BOOST_CHECK_EQUAL(exchange, msg->getExchangeName());
- BOOST_CHECK_EQUAL(routingKey, msg->getRoutingKey());
- BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize());
- BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength());
- BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
- BOOST_CHECK_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("abc"));
- BOOST_CHECK_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
- BOOST_CHECK(msg->isPersistent());
+ qpid::types::Variant::Map properties;
+ properties["routing-key"] = routingKey;
+ properties["ttl"] = ttl;
+ properties["durable"] = true;
+ properties["message-id"] = qpid::types::Uuid(messageId.data());
+ properties["abc"] = "xyz";
+ Message msg = MessageUtils::createMessage(properties, data);
+
+ std::string buffer;
+ encode(msg, buffer);
+ msg = Message();
+ decode(buffer, msg);
+
+ BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
+ BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize());
+ BOOST_CHECK_EQUAL(data, msg.getContent());
+ //BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+ BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc"));
+ BOOST_CHECK(msg.isPersistent());
}
QPID_AUTO_TEST_SUITE_END()
Modified: qpid/branches/asyncstore/cpp/src/tests/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/MessageUtils.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/MessageUtils.h (original)
+++ qpid/branches/asyncstore/cpp/src/tests/MessageUtils.h Mon Aug 27 15:40:33 2012
@@ -20,9 +20,11 @@
*/
#include "qpid/broker/Message.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/types/Variant.h"
using namespace qpid;
using namespace broker;
@@ -33,11 +35,46 @@ namespace tests {
struct MessageUtils
{
- static boost::intrusive_ptr<Message> createMessage(const std::string& exchange="", const std::string& routingKey="",
- const bool durable = false, const Uuid& messageId=Uuid(true),
- uint64_t contentSize = 0)
+ static Message createMessage(const qpid::types::Variant::Map& properties, const std::string& content="", const std::string& destination = "")
{
- boost::intrusive_ptr<broker::Message> msg(new broker::Message());
+ boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer());
+
+ AMQFrame method(( MessageTransferBody(ProtocolVersion(), destination, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ if (content.size()) {
+ msg->getFrames().getHeaders()->get<MessageProperties>(true)->setContentLength(content.size());
+ AMQFrame data((AMQContentBody(content)));
+ msg->getFrames().append(data);
+ }
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ if (i->first == "routing-key" && !i->second.isVoid()) {
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(i->second);
+ } else if (i->first == "message-id" && !i->second.isVoid()) {
+ qpid::types::Uuid id = i->second;
+ qpid::framing::Uuid id2(id.data());
+ msg->getFrames().getHeaders()->get<MessageProperties>(true)->setMessageId(id2);
+ } else if (i->first == "ttl" && !i->second.isVoid()) {
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(i->second);
+ } else if (i->first == "priority" && !i->second.isVoid()) {
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setPriority(i->second);
+ } else if (i->first == "durable" && !i->second.isVoid()) {
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(i->second.asBool() ? 2 : 1);
+ } else {
+ msg->getFrames().getHeaders()->get<MessageProperties>(true)->getApplicationHeaders().setString(i->first, i->second);
+ }
+ }
+ return Message(msg, msg);
+ }
+
+
+ static Message createMessage(const std::string& exchange="", const std::string& routingKey="",
+ uint64_t ttl = 0, bool durable = false, const Uuid& messageId=Uuid(true),
+ const std::string& content="")
+ {
+ boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer());
AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
@@ -45,18 +82,18 @@ struct MessageUtils
msg->getFrames().append(method);
msg->getFrames().append(header);
MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(contentSize);
+ props->setContentLength(content.size());
props->setMessageId(messageId);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
if (durable)
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
- return msg;
- }
-
- static void addContent(boost::intrusive_ptr<Message> msg, const std::string& data)
- {
- AMQFrame content((AMQContentBody(data)));
- msg->getFrames().append(content);
+ if (ttl)
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
+ if (content.size()) {
+ AMQFrame data((AMQContentBody(content)));
+ msg->getFrames().append(data);
+ }
+ return Message(msg, msg);
}
};
Modified: qpid/branches/asyncstore/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/QueueFlowLimitTest.cpp Mon Aug 27 15:40:33 2012
@@ -23,8 +23,8 @@
#include "unit_test.h"
#include "test_tools.h"
-#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldValue.h"
@@ -66,21 +66,19 @@ public:
return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
- static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& settings)
+ static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& arguments)
{
+ QueueSettings settings;
+ settings.populate(arguments, settings.storeSettings);
return QueueFlowLimit::createLimit(0, settings);
}
};
-
-
-QueuedMessage createMessage(uint32_t size)
+Message createMessage(uint32_t size)
{
static uint32_t seqNum;
- QueuedMessage msg;
- msg.payload = MessageUtils::createMessage();
- msg.position = ++seqNum;
- MessageUtils::addContent(msg.payload, std::string (size, 'x'));
+ Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x'));
+ msg.setSequence(++seqNum);
return msg;
}
}
@@ -100,7 +98,7 @@ QPID_AUTO_TEST_CASE(testFlowCount)
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
- std::deque<QueuedMessage> msgs;
+ std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
msgs.push_back(createMessage(10));
flow->enqueued(msgs.back());
@@ -135,7 +133,6 @@ QPID_AUTO_TEST_CASE(testFlowCount)
BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF
}
-
QPID_AUTO_TEST_CASE(testFlowSize)
{
FieldTable args;
@@ -151,7 +148,7 @@ QPID_AUTO_TEST_CASE(testFlowSize)
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
- std::deque<QueuedMessage> msgs;
+ std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
msgs.push_back(createMessage(10));
flow->enqueued(msgs.back());
@@ -161,14 +158,14 @@ QPID_AUTO_TEST_CASE(testFlowSize)
BOOST_CHECK_EQUAL(6u, flow->getFlowCount());
BOOST_CHECK_EQUAL(60u, flow->getFlowSize());
- QueuedMessage msg_9 = createMessage(9);
+ Message msg_9 = createMessage(9);
flow->enqueued(msg_9);
BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue
- QueuedMessage tinyMsg_1 = createMessage(1);
+ Message tinyMsg_1 = createMessage(1);
flow->enqueued(tinyMsg_1);
BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue
- QueuedMessage tinyMsg_2 = createMessage(1);
+ Message tinyMsg_2 = createMessage(1);
flow->enqueued(tinyMsg_2);
BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON
msgs.push_back(createMessage(10));
@@ -233,12 +230,12 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200);
args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100);
- std::deque<QueuedMessage> msgs_1;
- std::deque<QueuedMessage> msgs_10;
- std::deque<QueuedMessage> msgs_50;
- std::deque<QueuedMessage> msgs_100;
+ std::deque<Message> msgs_1;
+ std::deque<Message> msgs_10;
+ std::deque<Message> msgs_50;
+ std::deque<Message> msgs_100;
- QueuedMessage msg;
+ Message msg;
std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
BOOST_CHECK(!flow->isFlowControlActive()); // count:0 size:0
@@ -458,7 +455,6 @@ QPID_AUTO_TEST_CASE(testFlowDisable)
}
}
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/asyncstore/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/QueuePolicyTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/QueuePolicyTest.cpp Mon Aug 27 15:40:33 2012
@@ -22,12 +22,10 @@
#include "unit_test.h"
#include "test_tools.h"
-#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/client/QueueOptions.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/reply_exceptions.h"
-#include "MessageUtils.h"
#include "BrokerFixture.h"
using namespace qpid::broker;
@@ -39,118 +37,10 @@ namespace tests {
QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
-namespace {
-QueuedMessage createMessage(uint32_t size)
-{
- QueuedMessage msg;
- msg.payload = MessageUtils::createMessage();
- MessageUtils::addContent(msg.payload, std::string (size, 'x'));
- return msg;
-}
-}
-
-QPID_AUTO_TEST_CASE(testCount)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 0));
- BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
- BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
-
- QueuedMessage msg = createMessage(10);
- for (size_t i = 0; i < 5; i++) {
- policy->tryEnqueue(msg.payload);
- }
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on enqueuing sixth message");
- } catch (const ResourceLimitExceededException&) {}
-
- policy->dequeued(msg);
- policy->tryEnqueue(msg.payload);
-
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)");
- } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testSize)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 0, 50));
- QueuedMessage msg = createMessage(10);
-
- for (size_t i = 0; i < 5; i++) {
- policy->tryEnqueue(msg.payload);
- }
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-
- policy->dequeued(msg);
- policy->tryEnqueue(msg.payload);
-
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testBoth)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 50));
- try {
- QueuedMessage msg = createMessage(51);
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-
- std::vector<QueuedMessage> messages;
- messages.push_back(createMessage(15));
- messages.push_back(createMessage(10));
- messages.push_back(createMessage(11));
- messages.push_back(createMessage(2));
- messages.push_back(createMessage(7));
- for (size_t i = 0; i < messages.size(); i++) {
- policy->tryEnqueue(messages[i].payload);
- }
- //size = 45 at this point, count = 5
- try {
- QueuedMessage msg = createMessage(5);
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
- try {
- QueuedMessage msg = createMessage(10);
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-
-
- policy->dequeued(messages[0]);
- try {
- QueuedMessage msg = createMessage(20);
- policy->tryEnqueue(msg.payload);
- } catch (const ResourceLimitExceededException&) {
- BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy);
- }
-}
-
-QPID_AUTO_TEST_CASE(testSettings)
-{
- //test reading and writing the policy from/to field table
- std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy("test", 101, 303));
- FieldTable settings;
- a->update(settings);
- std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy("test", settings));
- BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount());
- BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
-}
-
QPID_AUTO_TEST_CASE(testRingPolicyCount)
{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(RING, 0, 5);
SessionFixture f;
std::string q("my-ring-queue");
@@ -183,9 +73,8 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
// Ring queue, 500 bytes maxSize
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(RING, 500, 0);
SessionFixture f;
std::string q("my-ring-queue");
@@ -255,9 +144,9 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
QPID_AUTO_TEST_CASE(testStrictRingPolicy)
{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(RING_STRICT, 0, 5);
+ args.setString("qpid.flow_stop_count", "0");
SessionFixture f;
std::string q("my-ring-queue");
@@ -281,9 +170,8 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy
QPID_AUTO_TEST_CASE(testPolicyWithDtx)
{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(REJECT, 0, 5);
SessionFixture f;
std::string q("my-policy-queue");
@@ -367,9 +255,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo
QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit)
{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(REJECT, 0, 5);
SessionFixture f;
std::string q("q");
Modified: qpid/branches/asyncstore/cpp/src/tests/QueueRegistryTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/QueueRegistryTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/QueueRegistryTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/QueueRegistryTest.cpp Mon Aug 27 15:40:33 2012
@@ -19,6 +19,7 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
#include "unit_test.h"
#include <string>
@@ -36,33 +37,23 @@ QPID_AUTO_TEST_CASE(testDeclare)
QueueRegistry reg;
std::pair<Queue::shared_ptr, bool> qc;
- qc = reg.declare(foo, false, 0, 0);
+ qc = reg.declare(foo, QueueSettings());
Queue::shared_ptr q = qc.first;
BOOST_CHECK(q);
BOOST_CHECK(qc.second); // New queue
BOOST_CHECK_EQUAL(foo, q->getName());
- qc = reg.declare(foo, false, 0, 0);
+ qc = reg.declare(foo, QueueSettings());
BOOST_CHECK_EQUAL(q, qc.first);
BOOST_CHECK(!qc.second);
- qc = reg.declare(bar, false, 0, 0);
+ qc = reg.declare(bar, QueueSettings());
q = qc.first;
BOOST_CHECK(q);
BOOST_CHECK_EQUAL(true, qc.second);
BOOST_CHECK_EQUAL(bar, q->getName());
}
-QPID_AUTO_TEST_CASE(testDeclareTmp)
-{
- QueueRegistry reg;
- std::pair<Queue::shared_ptr, bool> qc;
-
- qc = reg.declare(std::string(), false, 0, 0);
- BOOST_CHECK(qc.second);
- BOOST_CHECK_EQUAL(std::string("tmp_1"), qc.first->getName());
-}
-
QPID_AUTO_TEST_CASE(testFind)
{
std::string foo("foo");
@@ -72,8 +63,8 @@ QPID_AUTO_TEST_CASE(testFind)
BOOST_CHECK(reg.find(foo) == 0);
- reg.declare(foo, false, 0, 0);
- reg.declare(bar, false, 0, 0);
+ reg.declare(foo, QueueSettings());
+ reg.declare(bar, QueueSettings());
Queue::shared_ptr q = reg.find(bar);
BOOST_CHECK(q);
BOOST_CHECK_EQUAL(bar, q->getName());
@@ -85,7 +76,7 @@ QPID_AUTO_TEST_CASE(testDestroy)
QueueRegistry reg;
std::pair<Queue::shared_ptr, bool> qc;
- qc = reg.declare(foo, false, 0, 0);
+ qc = reg.declare(foo, QueueSettings());
reg.destroy(foo);
// Queue is gone from the registry.
BOOST_CHECK(reg.find(foo) == 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org