You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC

svn commit: r1371676 [7/8] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/ cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/ cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp Fri Aug 10 12:04:27 2012
@@ -68,7 +68,7 @@ bool ReplicationTest::isReplicated(
 
 bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q)
 {
-    return isReplicated(level, q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner());
+    return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner());
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Aug 10 12:04:27 2012
@@ -31,6 +31,7 @@
 #include <qpid/broker/Message.h>
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/FieldValue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/sys/Time.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/broker/ConnectionState.h"
@@ -535,7 +536,7 @@ void ManagementAgent::sendBufferLH(Buffe
     }
     if (exchange.get() == 0) return;
 
-    intrusive_ptr<Message> msg(new Message());
+    intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     AMQFrame content((AMQContentBody()));
@@ -547,24 +548,26 @@ void ManagementAgent::sendBufferLH(Buffe
     header.setEof(false);
     content.setBof(false);
 
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
+    transfer->getFrames().append(method);
+    transfer->getFrames().append(header);
 
     MessageProperties* props =
-        msg->getFrames().getHeaders()->get<MessageProperties>(true);
+        transfer->getFrames().getHeaders()->get<MessageProperties>(true);
     props->setContentLength(length);
 
     DeliveryProperties* dp =
-        msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+        transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
     dp->setRoutingKey(routingKey);
 
-    msg->getFrames().append(content);
-    msg->setIsManagementMessage(true);
+    transfer->getFrames().append(content);
+
+    Message msg(transfer, transfer);
+    msg.setIsManagementMessage(true);
 
     {
         sys::Mutex::ScopedUnlock u(userLock);
 
-        DeliverableMessage deliverable (msg);
+        DeliverableMessage deliverable (msg, 0);
         try {
             exchange->route(deliverable);
         } catch(exception&) {}
@@ -602,7 +605,7 @@ void ManagementAgent::sendBufferLH(const
     }
     if (exchange.get() == 0) return;
 
-    intrusive_ptr<Message> msg(new Message());
+    intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     AMQFrame content((AMQContentBody(data)));
@@ -612,11 +615,11 @@ void ManagementAgent::sendBufferLH(const
     header.setEof(false);
     content.setBof(false);
 
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
+    transfer->getFrames().append(method);
+    transfer->getFrames().append(header);
 
     MessageProperties* props =
-        msg->getFrames().getHeaders()->get<MessageProperties>(true);
+        transfer->getFrames().getHeaders()->get<MessageProperties>(true);
     props->setContentLength(data.length());
     if (!cid.empty()) {
         props->setCorrelationId(cid);
@@ -625,23 +628,24 @@ void ManagementAgent::sendBufferLH(const
     props->setAppId("qmf2");
 
     for (i = headers.begin(); i != headers.end(); ++i) {
-        msg->insertCustomProperty(i->first, i->second.asString());
+        props->getApplicationHeaders().setString(i->first, i->second.asString());
     }
 
     DeliveryProperties* dp =
-        msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+        transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
     dp->setRoutingKey(routingKey);
     if (ttl_msec) {
         dp->setTtl(ttl_msec);
-        msg->computeExpiration(broker->getExpiryPolicy());
     }
-    msg->getFrames().append(content);
-    msg->setIsManagementMessage(true);
+    transfer->getFrames().append(content);
+    Message msg(transfer, transfer);
+    msg.setIsManagementMessage(true);
+    msg.computeExpiration(broker->getExpiryPolicy());
 
     {
         sys::Mutex::ScopedUnlock u(userLock);
 
-        DeliverableMessage deliverable (msg);
+        DeliverableMessage deliverable (msg, 0);
         try {
             exchange->route(deliverable);
         } catch(exception&) {}
@@ -2135,19 +2139,20 @@ bool ManagementAgent::authorizeAgentMess
     // authorized or not.  In this case, return true (authorized) if there is no ACL in place,
     // otherwise return false;
     //
-    if (msg.encodedSize() > MA_BUFFER_SIZE)
+    if (msg.getContentSize() > MA_BUFFER_SIZE)
         return broker->getAcl() == 0;
 
-    msg.encodeContent(inBuffer);
+    inBuffer.putRawData(msg.getContent());
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
+    qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
     const framing::MessageProperties* p =
-      msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+      transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
 
-    const framing::FieldTable *headers = msg.getApplicationHeaders();
+    const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
 
-    if (headers && msg.getAppId() == "qmf2")
+    if (headers && p->getAppId() == "qmf2")
     {
         mapMsg = true;
 
@@ -2238,8 +2243,9 @@ bool ManagementAgent::authorizeAgentMess
 
         // authorization failed, send reply if replyTo present
 
+        qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
         const framing::MessageProperties* p =
-            msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+            transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
         if (p && p->hasReplyTo()) {
             const framing::ReplyTo& rt = p->getReplyTo();
             string rte = rt.getExchange();
@@ -2277,8 +2283,9 @@ void ManagementAgent::dispatchAgentComma
 {
     string   rte;
     string   rtk;
+    qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
     const framing::MessageProperties* p =
-        msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+        transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
     if (p && p->hasReplyTo()) {
         const framing::ReplyTo& rt = p->getReplyTo();
         rte = rt.getExchange();
@@ -2290,19 +2297,19 @@ void ManagementAgent::dispatchAgentComma
     Buffer   inBuffer(inputBuffer, MA_BUFFER_SIZE);
     uint8_t  opcode;
 
-    if (msg.encodedSize() > MA_BUFFER_SIZE) {
+    if (msg.getContentSize() > MA_BUFFER_SIZE) {
         QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
-                 msg.encodedSize());
+                 msg.getContentSize());
         return;
     }
 
-    msg.encodeContent(inBuffer);
+    inBuffer.putRawData(msg.getContent());
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
     ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
-    const framing::FieldTable *headers = msg.getApplicationHeaders();
-    if (headers && msg.getAppId() == "qmf2")
+    const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
+    if (headers && p->getAppId() == "qmf2")
     {
         string opcode = headers->getAsString("qmf.opcode");
         string contentType = headers->getAsString("qmf.content");

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp Fri Aug 10 12:04:27 2012
@@ -43,11 +43,9 @@ ManagementDirectExchange::ManagementDire
 void ManagementDirectExchange::route(Deliverable&      msg)
 {
     bool routeIt = true;
-    const std::string& routingKey = msg.getMessage().getRoutingKey();
-    const FieldTable* args = msg.getMessage().getApplicationHeaders();
 
     if (managementAgent)
-        routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion);
+        routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, false, qmfVersion);
 
     if (routeIt)
         DirectExchange::route(msg);

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp Fri Aug 10 12:04:27 2012
@@ -42,12 +42,10 @@ ManagementTopicExchange::ManagementTopic
 void ManagementTopicExchange::route(Deliverable&      msg)
 {
     bool routeIt = true;
-    const std::string& routingKey = msg.getMessage().getRoutingKey();
-    const FieldTable* args = msg.getMessage().getApplicationHeaders();
 
     // Intercept management agent commands
     if (managementAgent)
-        routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion);
+        routeIt = managementAgent->dispatchCommand(msg, msg.getMessage().getRoutingKey(), 0/*args - TODO*/, true, qmfVersion);
 
     if (routeIt)
         TopicExchange::route(msg);

Modified: qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp Fri Aug 10 12:04:27 2012
@@ -249,7 +249,7 @@ MessageStorePlugin::destroy(const broker
 void
 MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg)
 {
-    if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) {
+    if (msg->getPersistenceId() == 0) {
         provider->second->stage(msg);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp Fri Aug 10 12:04:27 2012
@@ -27,6 +27,7 @@
 
 #include "qpid/log/Statement.h"
 #include "qpid/broker/FedOps.h"
+#include "qpid/broker/MapHandler.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -198,7 +199,52 @@ bool XmlExchange::unbind(Queue::shared_p
     }
 }
 
-bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) 
+namespace {
+class DefineExternals : public MapHandler
+{
+  public:
+    DefineExternals(DynamicContext* c) : context(c) { assert(context); }
+    void handleUint8(const MapHandler::CharSequence& key, uint8_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleUint16(const MapHandler::CharSequence& key, uint16_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleUint32(const MapHandler::CharSequence& key, uint32_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleUint64(const MapHandler::CharSequence& key, uint64_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleInt8(const MapHandler::CharSequence& key, int8_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleInt16(const MapHandler::CharSequence& key, int16_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleInt32(const MapHandler::CharSequence& key, int32_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleInt64(const MapHandler::CharSequence& key, int64_t value) { process(std::string(key.data, key.size), (int) value); }
+    void handleFloat(const MapHandler::CharSequence& key, float value) { process(std::string(key.data, key.size), value); }
+    void handleDouble(const MapHandler::CharSequence& key, double value) { process(std::string(key.data, key.size), value); }
+    void handleString(const MapHandler::CharSequence& key, const MapHandler::CharSequence& value, const MapHandler::CharSequence& /*encoding*/)
+    {
+        process(std::string(key.data, key.size), std::string(value.data, value.size));
+    }
+    void handleVoid(const MapHandler::CharSequence&) {}
+  private:
+    void process(const std::string& key, double value)
+    {
+        QPID_LOG(trace, "XmlExchange, external variable (double): " << key << " = " << value);
+        Item::Ptr item = context->getItemFactory()->createDouble(value, context);
+        context->setExternalVariable(X(key.c_str()), item);
+    }
+    void process(const std::string& key, int value)
+    {
+        QPID_LOG(trace, "XmlExchange, external variable (int):" << key << " = " << value);
+        Item::Ptr item = context->getItemFactory()->createInteger(value, context);
+        context->setExternalVariable(X(key.c_str()), item);
+    }
+    void process(const std::string& key, const std::string& value)
+    {
+        QPID_LOG(trace, "XmlExchange, external variable (string):" << key << " = " << value);
+        Item::Ptr item = context->getItemFactory()->createString(X(value.c_str()), context);
+        context->setExternalVariable(X(key.c_str()), item);
+    }
+
+    DynamicContext* context;
+};
+
+}
+
+bool XmlExchange::matches(Query& query, Deliverable& msg, bool parse_message_content) 
 {
     std::string msgContent;
 
@@ -212,7 +258,7 @@ bool XmlExchange::matches(Query& query, 
 
         if (parse_message_content) {
 
-            msg.getMessage().getFrames().getContent(msgContent);
+            msgContent = msg.getMessage().getContent();
 
             QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
 
@@ -231,28 +277,8 @@ bool XmlExchange::matches(Query& query, 
             }
         }
 
-        if (args) {
-            FieldTable::ValueMap::const_iterator v = args->begin();
-            for(; v != args->end(); ++v) {
-
-                if (v->second->convertsTo<double>()) {
-                    QPID_LOG(trace, "XmlExchange, external variable (double): " << v->first << " = " << v->second->get<double>());
-                    Item::Ptr value = context->getItemFactory()->createDouble(v->second->get<double>(), context.get());
-                    context->setExternalVariable(X(v->first.c_str()), value);
-                }              
-                else if (v->second->convertsTo<int>()) {
-                    QPID_LOG(trace, "XmlExchange, external variable (int):" << v->first << " = " << v->second->getData().getInt());
-                    Item::Ptr value = context->getItemFactory()->createInteger(v->second->get<int>(), context.get());
-                    context->setExternalVariable(X(v->first.c_str()), value);
-                }
-                else if (v->second->convertsTo<std::string>()) {
-                    QPID_LOG(trace, "XmlExchange, external variable (string):" << v->first << " = " << v->second->getData().getString().c_str());
-                    Item::Ptr value = context->getItemFactory()->createString(X(v->second->get<std::string>().c_str()), context.get());
-                    context->setExternalVariable(X(v->first.c_str()), value);
-                }
-
-            }
-        }
+        DefineExternals f(context.get());
+        msg.getMessage().processProperties(f);
 
         Result result = query->execute(context.get());
 #ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP
@@ -286,7 +312,6 @@ bool XmlExchange::matches(Query& query, 
 void XmlExchange::route(Deliverable& msg)
 {
     const std::string& routingKey = msg.getMessage().getRoutingKey();
-    const FieldTable* args = msg.getMessage().getApplicationHeaders();
     PreRoute pr(msg, this);
     try {
         XmlBinding::vector::ConstPtr p;
@@ -298,7 +323,7 @@ void XmlExchange::route(Deliverable& msg
         }
 
         for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
-            if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { 
+            if (matches((*i)->xquery, msg, (*i)->parse_message_content)) { 
                 b->push_back(*i);
             }
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h Fri Aug 10 12:04:27 2012
@@ -65,7 +65,7 @@ class XmlExchange : public virtual Excha
 
     qpid::sys::RWlock lock;
 
-    bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content);
+    bool matches(Query& query, Deliverable& msg, bool parse_message_content);
 
   public:
     static const std::string typeName;

Modified: qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt Fri Aug 10 12:04:27 2012
@@ -126,6 +126,7 @@ set(unit_tests_to_build
     ExchangeTest
     HeadersExchangeTest
     MessageTest
+    QueueDepth
     QueueRegistryTest
     QueuePolicyTest
     QueueFlowLimitTest
@@ -135,16 +136,12 @@ set(unit_tests_to_build
     TimerTest
     TopicExchangeTest
     TxBufferTest
-    TxPublishTest
-    MessageBuilderTest
     ManagementTest
     MessageReplayTracker
     ConsoleTest
-    QueueEvents
     ProxyTest
     RetryList
     FrameDecoder
-    ReplicationTest
     ClientMessageTest
     PollableCondition
     Variant

Modified: qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Aug 10 12:04:27 2012
@@ -621,7 +621,7 @@ QPID_AUTO_TEST_CASE(testQueueDeleted)
     fix.session.queueDeclare(arg::queue="my-queue");
     LocalQueue queue;
     fix.subs.subscribe(queue, "my-queue");
-    
+
     ScopedSuppressLogging sl;
     fix.session.queueDelete(arg::queue="my-queue");
     BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException);

Modified: qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp Fri Aug 10 12:04:27 2012
@@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort)
 
     list<DeliveryRecord> records;
     for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
-        DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
+        DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
         r.setId(*i);
         records.push_back(r);
     }

Modified: qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Fri Aug 10 12:04:27 2012
@@ -35,7 +35,6 @@
 
 using std::string;
 
-using boost::intrusive_ptr;
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::sys;
@@ -62,11 +61,9 @@ QPID_AUTO_TEST_CASE(testMe)
     queue.reset();
     queue2.reset();
 
-    intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "abc", false, "id"));
-    DeliverableMessage msg(msgPtr);
+    DeliverableMessage msg(MessageUtils::createMessage("exchange", "abc"), 0);
     topic.route(msg);
     direct.route(msg);
-
 }
 
 QPID_AUTO_TEST_CASE(testIsBound)
@@ -170,16 +167,6 @@ QPID_AUTO_TEST_CASE(testDeleteGetAndRede
     BOOST_CHECK_EQUAL(string("direct"), response.first->getType());
 }
 
-intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) {
-    intrusive_ptr<Message> msg(new Message());
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
-    AMQFrame header((AMQHeaderBody()));
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
-    msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
-    return msg;
-}
-
 QPID_AUTO_TEST_CASE(testSequenceOptions)
 {
     FieldTable args;
@@ -189,46 +176,35 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
     {
         DirectExchange direct("direct1", false, args);
 
-        intrusive_ptr<Message> msg1 = cmessage("e", "abc");
-        intrusive_ptr<Message> msg2 = cmessage("e", "abc");
-        intrusive_ptr<Message> msg3 = cmessage("e", "abc");
-
-        DeliverableMessage dmsg1(msg1);
-        DeliverableMessage dmsg2(msg2);
-        DeliverableMessage dmsg3(msg3);
-
-        direct.route(dmsg1);
-        direct.route(dmsg2);
-        direct.route(dmsg3);
-
-        BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-        BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-        BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0);
+        DeliverableMessage msg2(MessageUtils::createMessage("e", "abc"), 0);
+        DeliverableMessage msg3(MessageUtils::createMessage("e", "abc"), 0);
+
+        direct.route(msg1);
+        direct.route(msg2);
+        direct.route(msg3);
+
+        BOOST_CHECK_EQUAL(1, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
+        BOOST_CHECK_EQUAL(2, msg2.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
+        BOOST_CHECK_EQUAL(3, msg3.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
 
         FanOutExchange fanout("fanout1", false, args);
         HeadersExchange header("headers1", false, args);
         TopicExchange topic ("topic1", false, args);
 
         // check other exchanges, that they preroute
-        intrusive_ptr<Message> msg4 = cmessage("e", "abc");
-        intrusive_ptr<Message> msg5 = cmessage("e", "abc");
-
-        // Need at least empty header for the HeadersExchange to route at all
-        msg5->insertCustomProperty("", "");
-        intrusive_ptr<Message> msg6 = cmessage("e", "abc");
+        DeliverableMessage msg4(MessageUtils::createMessage("e", "abc"), 0);
+        DeliverableMessage msg5(MessageUtils::createMessage("e", "abc"), 0);
+        DeliverableMessage msg6(MessageUtils::createMessage("e", "abc"), 0);
 
-        DeliverableMessage dmsg4(msg4);
-        DeliverableMessage dmsg5(msg5);
-        DeliverableMessage dmsg6(msg6);
+        fanout.route(msg4);
+        BOOST_CHECK_EQUAL(1, msg4.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
 
-        fanout.route(dmsg4);
-        BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        header.route(msg5);
+        BOOST_CHECK_EQUAL(1, msg5.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
 
-        header.route(dmsg5);
-        BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
-        topic.route(dmsg6);
-        BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        topic.route(msg6);
+        BOOST_CHECK_EQUAL(1, msg6.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
         direct.encode(buffer);
     }
     {
@@ -237,11 +213,10 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
         buffer.reset();
         DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer);
 
-        intrusive_ptr<Message> msg1 = cmessage("e", "abc");
-        DeliverableMessage dmsg1(msg1);
-        exch_dec->route(dmsg1);
+        DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0);
+        exch_dec->route(msg1);
 
-        BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+        BOOST_CHECK_EQUAL(4, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
 
     }
     delete [] buff;
@@ -256,9 +231,11 @@ QPID_AUTO_TEST_CASE(testIVEOption)
     HeadersExchange header("headers1", false, args);
     TopicExchange topic ("topic1", false, args);
 
-    intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
-    msg1->insertCustomProperty("a", "abc");
-    DeliverableMessage dmsg1(msg1);
+    qpid::types::Variant::Map properties;
+    properties["routing-key"] = "abc";
+    properties["a"] = "abc";
+    Message msg1 = MessageUtils::createMessage(properties, "my-message", "direct1");
+    DeliverableMessage dmsg1(msg1, 0);
 
     FieldTable args2;
     args2.setString("x-match", "any");
@@ -273,8 +250,6 @@ QPID_AUTO_TEST_CASE(testIVEOption)
     Queue::shared_ptr queue2(new Queue("queue2", true));
     Queue::shared_ptr queue3(new Queue("queue3", true));
 
-    BOOST_CHECK(HeadersExchange::match(args2, msg1->getProperties<MessageProperties>()->getApplicationHeaders()));
-
     BOOST_CHECK(direct.bind(queue, "abc", 0));
     BOOST_CHECK(fanout.bind(queue1, "abc", 0));
     BOOST_CHECK(header.bind(queue2, "", &args2));
@@ -287,7 +262,6 @@ QPID_AUTO_TEST_CASE(testIVEOption)
 
 }
 
-
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Aug 10 12:04:27 2012
@@ -96,6 +96,7 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	ExchangeTest.cpp \
 	HeadersExchangeTest.cpp \
 	MessageTest.cpp \
+	QueueDepth.cpp \
 	QueueRegistryTest.cpp \
 	QueuePolicyTest.cpp \
 	QueueFlowLimitTest.cpp \
@@ -105,19 +106,15 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	TimerTest.cpp \
 	TopicExchangeTest.cpp \
 	TxBufferTest.cpp \
-	TxPublishTest.cpp \
-	MessageBuilderTest.cpp \
 	ConnectionOptions.h \
 	ForkedBroker.h \
 	ForkedBroker.cpp \
 	ManagementTest.cpp \
 	MessageReplayTracker.cpp \
 	ConsoleTest.cpp \
-	QueueEvents.cpp \
 	ProxyTest.cpp \
 	RetryList.cpp \
 	FrameDecoder.cpp \
-	ReplicationTest.cpp \
 	ClientMessageTest.cpp \
 	PollableCondition.cpp \
 	Variant.cpp \

Modified: qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp Fri Aug 10 12:04:27 2012
@@ -24,6 +24,7 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/Uuid.h"
+#include "MessageUtils.h"
 
 #include "unit_test.h"
 
@@ -43,49 +44,29 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
 {
     string exchange = "MyExchange";
     string routingKey = "MyRoutingKey";
+    uint64_t ttl(60);
     Uuid messageId(true);
-    string data1("abcdefg");
-    string data2("hijklmn");
+    string data("abcdefghijklmn");
 
-    boost::intrusive_ptr<Message> msg(new Message());
-
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
-    AMQFrame header((AMQHeaderBody()));
-    AMQFrame content1((AMQContentBody(data1)));
-    AMQFrame content2((AMQContentBody(data2)));
-
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
-    msg->getFrames().append(content1);
-    msg->getFrames().append(content2);
-
-    MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true);
-    mProps->setContentLength(data1.size() + data2.size());
-    mProps->setMessageId(messageId);
-    FieldTable applicationHeaders;
-    applicationHeaders.setString("abc", "xyz");
-    mProps->setApplicationHeaders(applicationHeaders);
-    DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
-    dProps->setRoutingKey(routingKey);
-    dProps->setDeliveryMode(PERSISTENT);
-    BOOST_CHECK(msg->isPersistent());
-
-    std::vector<char> buff(msg->encodedSize());
-    Buffer wbuffer(&buff[0], msg->encodedSize());
-    msg->encode(wbuffer);
-
-    Buffer rbuffer(&buff[0], msg->encodedSize());
-    msg = new Message();
-    msg->decodeHeader(rbuffer);
-    msg->decodeContent(rbuffer);
-    BOOST_CHECK_EQUAL(exchange, msg->getExchangeName());
-    BOOST_CHECK_EQUAL(routingKey, msg->getRoutingKey());
-    BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize());
-    BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength());
-    BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
-    BOOST_CHECK_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("abc"));
-    BOOST_CHECK_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
-    BOOST_CHECK(msg->isPersistent());
+    qpid::types::Variant::Map properties;
+    properties["routing-key"] = routingKey;
+    properties["ttl"] = ttl;
+    properties["durable"] = true;
+    properties["message-id"] = qpid::types::Uuid(messageId.data());
+    properties["abc"] = "xyz";
+    Message msg = MessageUtils::createMessage(properties, data);
+
+    std::string buffer;
+    encode(msg, buffer);
+    msg = Message();
+    decode(buffer, msg);
+
+    BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
+    BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize());
+    BOOST_CHECK_EQUAL(data, msg.getContent());
+    //BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+    BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc"));
+    BOOST_CHECK(msg.isPersistent());
 }
 
 QPID_AUTO_TEST_SUITE_END()

Modified: qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageUtils.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessageUtils.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessageUtils.h Fri Aug 10 12:04:27 2012
@@ -20,9 +20,11 @@
  */
 
 #include "qpid/broker/Message.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/types/Variant.h"
 
 using namespace qpid;
 using namespace broker;
@@ -33,11 +35,46 @@ namespace tests {
 
 struct MessageUtils
 {
-    static boost::intrusive_ptr<Message> createMessage(const std::string& exchange="", const std::string& routingKey="",
-                                                       const bool durable = false, const Uuid& messageId=Uuid(true),
-                                                       uint64_t contentSize = 0)
+    static Message createMessage(const qpid::types::Variant::Map& properties, const std::string& content="", const std::string& destination = "")
     {
-        boost::intrusive_ptr<broker::Message> msg(new broker::Message());
+        boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer());
+
+        AMQFrame method(( MessageTransferBody(ProtocolVersion(), destination, 0, 0)));
+        AMQFrame header((AMQHeaderBody()));
+
+        msg->getFrames().append(method);
+        msg->getFrames().append(header);
+        if (content.size()) {
+            msg->getFrames().getHeaders()->get<MessageProperties>(true)->setContentLength(content.size());
+            AMQFrame data((AMQContentBody(content)));
+            msg->getFrames().append(data);
+        }
+        for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            if (i->first == "routing-key" && !i->second.isVoid()) {
+                msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(i->second);
+            } else if (i->first == "message-id" && !i->second.isVoid()) {
+                qpid::types::Uuid id = i->second;
+                qpid::framing::Uuid id2(id.data());
+                msg->getFrames().getHeaders()->get<MessageProperties>(true)->setMessageId(id2);
+            } else if (i->first == "ttl" && !i->second.isVoid()) {
+                msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(i->second);
+            } else if (i->first == "priority" && !i->second.isVoid()) {
+                msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setPriority(i->second);
+            } else if (i->first == "durable" && !i->second.isVoid()) {
+                msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(i->second.asBool() ? 2 : 1);
+            } else {
+                msg->getFrames().getHeaders()->get<MessageProperties>(true)->getApplicationHeaders().setString(i->first, i->second);
+            }
+        }
+        return Message(msg, msg);
+    }
+
+
+    static Message createMessage(const std::string& exchange="", const std::string& routingKey="",
+                                 uint64_t ttl = 0, bool durable = false, const Uuid& messageId=Uuid(true),
+                                 const std::string& content="")
+    {
+        boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer());
 
         AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
         AMQFrame header((AMQHeaderBody()));
@@ -45,18 +82,18 @@ struct MessageUtils
         msg->getFrames().append(method);
         msg->getFrames().append(header);
         MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
-        props->setContentLength(contentSize);
+        props->setContentLength(content.size());
         props->setMessageId(messageId);
         msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
         if (durable)
             msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
-        return msg;
-    }
-
-    static void addContent(boost::intrusive_ptr<Message> msg, const std::string& data)
-    {
-        AMQFrame content((AMQContentBody(data)));
-        msg->getFrames().append(content);
+        if (ttl)
+            msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
+        if (content.size()) {
+            AMQFrame data((AMQContentBody(content)));
+            msg->getFrames().append(data);
+        }
+        return Message(msg, msg);
     }
 };
 

Added: qpid/trunk/qpid/cpp/src/tests/QueueDepth.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueDepth.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueDepth.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/QueueDepth.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/QueueDepth.h"
+
+#include "unit_test.h"
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(QueueDepthTestSuite)
+
+using namespace qpid::broker;
+
+QPID_AUTO_TEST_CASE(testCompare)
+{
+    QueueDepth a(0, 0);
+    QueueDepth b(1, 1);
+    QueueDepth c(2, 2);
+    QueueDepth d(1, 1);
+
+    BOOST_CHECK(a < b);
+    BOOST_CHECK(b < c);
+    BOOST_CHECK(a < c);
+
+    BOOST_CHECK(b > a);
+    BOOST_CHECK(c > b);
+    BOOST_CHECK(c > a);
+
+    BOOST_CHECK(b == d);
+    BOOST_CHECK(d == b);
+    BOOST_CHECK(a != b);
+    BOOST_CHECK(b != a);
+
+    QueueDepth e; e.setCount(1);
+    QueueDepth f; f.setCount(2);
+    BOOST_CHECK(e < f);
+    BOOST_CHECK(f > e);
+
+    QueueDepth g; g.setSize(1);
+    QueueDepth h; h.setSize(2);
+    BOOST_CHECK(g < h);
+    BOOST_CHECK(h > g);
+}
+
+QPID_AUTO_TEST_CASE(testIncrement)
+{
+    QueueDepth a(5, 10);
+    QueueDepth b(3, 6);
+    QueueDepth c(8, 16);
+    a += b;
+    BOOST_CHECK(a == c);
+    BOOST_CHECK_EQUAL(8, a.getCount());
+    BOOST_CHECK_EQUAL(16, a.getSize());
+}
+
+QPID_AUTO_TEST_CASE(testDecrement)
+{
+    QueueDepth a(5, 10);
+    QueueDepth b(3, 6);
+    QueueDepth c(2, 4);
+    a -= b;
+    BOOST_CHECK(a == c);
+    BOOST_CHECK_EQUAL(2, a.getCount());
+    BOOST_CHECK_EQUAL(4, a.getSize());
+}
+
+QPID_AUTO_TEST_CASE(testAddition)
+{
+    QueueDepth a(5, 10);
+    QueueDepth b(3, 6);
+
+    QueueDepth c = a + b;
+    BOOST_CHECK_EQUAL(8, c.getCount());
+    BOOST_CHECK_EQUAL(16, c.getSize());
+}
+
+QPID_AUTO_TEST_CASE(testSubtraction)
+{
+    QueueDepth a(5, 10);
+    QueueDepth b(3, 6);
+
+    QueueDepth c = a - b;
+    BOOST_CHECK_EQUAL(2, c.getCount());
+    BOOST_CHECK_EQUAL(4, c.getSize());
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests

Modified: qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Fri Aug 10 12:04:27 2012
@@ -23,8 +23,8 @@
 #include "unit_test.h"
 #include "test_tools.h"
 
-#include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/FieldValue.h"
@@ -66,21 +66,19 @@ public:
         return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
     }
 
-    static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& settings)
+    static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& arguments)
     {
+        QueueSettings settings;
+        settings.populate(arguments, settings.storeSettings);
         return QueueFlowLimit::createLimit(0, settings);
     }
 };
 
-
-
-QueuedMessage createMessage(uint32_t size)
+Message createMessage(uint32_t size)
 {
     static uint32_t seqNum;
-    QueuedMessage msg;
-    msg.payload = MessageUtils::createMessage();
-    msg.position = ++seqNum;
-    MessageUtils::addContent(msg.payload, std::string (size, 'x'));
+    Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x'));
+    msg.setSequence(++seqNum);
     return msg;
 }
 }
@@ -100,7 +98,7 @@ QPID_AUTO_TEST_CASE(testFlowCount)
     BOOST_CHECK(!flow->isFlowControlActive());
     BOOST_CHECK(flow->monitorFlowControl());
 
-    std::deque<QueuedMessage> msgs;
+    std::deque<Message> msgs;
     for (size_t i = 0; i < 6; i++) {
         msgs.push_back(createMessage(10));
         flow->enqueued(msgs.back());
@@ -135,7 +133,6 @@ QPID_AUTO_TEST_CASE(testFlowCount)
     BOOST_CHECK(!flow->isFlowControlActive());  // 4 on queue, OFF
 }
 
-
 QPID_AUTO_TEST_CASE(testFlowSize)
 {
     FieldTable args;
@@ -151,7 +148,7 @@ QPID_AUTO_TEST_CASE(testFlowSize)
     BOOST_CHECK(!flow->isFlowControlActive());
     BOOST_CHECK(flow->monitorFlowControl());
 
-    std::deque<QueuedMessage> msgs;
+    std::deque<Message> msgs;
     for (size_t i = 0; i < 6; i++) {
         msgs.push_back(createMessage(10));
         flow->enqueued(msgs.back());
@@ -161,14 +158,14 @@ QPID_AUTO_TEST_CASE(testFlowSize)
     BOOST_CHECK_EQUAL(6u, flow->getFlowCount());
     BOOST_CHECK_EQUAL(60u, flow->getFlowSize());
 
-    QueuedMessage msg_9 = createMessage(9);
+    Message msg_9 = createMessage(9);
     flow->enqueued(msg_9);
     BOOST_CHECK(!flow->isFlowControlActive());  // 69 on queue
-    QueuedMessage tinyMsg_1 = createMessage(1);
+    Message tinyMsg_1 = createMessage(1);
     flow->enqueued(tinyMsg_1);
     BOOST_CHECK(!flow->isFlowControlActive());   // 70 on queue
 
-    QueuedMessage tinyMsg_2 = createMessage(1);
+    Message tinyMsg_2 = createMessage(1);
     flow->enqueued(tinyMsg_2);
     BOOST_CHECK(flow->isFlowControlActive());   // 71 on queue, ON
     msgs.push_back(createMessage(10));
@@ -233,12 +230,12 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
     args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200);
     args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100);
 
-    std::deque<QueuedMessage> msgs_1;
-    std::deque<QueuedMessage> msgs_10;
-    std::deque<QueuedMessage> msgs_50;
-    std::deque<QueuedMessage> msgs_100;
+    std::deque<Message> msgs_1;
+    std::deque<Message> msgs_10;
+    std::deque<Message> msgs_50;
+    std::deque<Message> msgs_100;
 
-    QueuedMessage msg;
+    Message msg;
 
     std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
     BOOST_CHECK(!flow->isFlowControlActive());        // count:0  size:0
@@ -458,7 +455,6 @@ QPID_AUTO_TEST_CASE(testFlowDisable)
     }
 }
 
-
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Fri Aug 10 12:04:27 2012
@@ -22,12 +22,10 @@
 #include "unit_test.h"
 #include "test_tools.h"
 
-#include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/client/QueueOptions.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
-#include "MessageUtils.h"
 #include "BrokerFixture.h"
 
 using namespace qpid::broker;
@@ -39,118 +37,10 @@ namespace tests {
 
 QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
 
-namespace {
-QueuedMessage createMessage(uint32_t size)
-{
-    QueuedMessage msg;
-    msg.payload = MessageUtils::createMessage();
-    MessageUtils::addContent(msg.payload, std::string (size, 'x'));
-    return msg;
-}
-}
-
-QPID_AUTO_TEST_CASE(testCount)
-{
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 0));
-    BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
-    BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
-
-    QueuedMessage msg = createMessage(10);
-    for (size_t i = 0; i < 5; i++) {
-        policy->tryEnqueue(msg.payload);
-    }
-    try {
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on enqueuing sixth message");
-    } catch (const ResourceLimitExceededException&) {}
-
-    policy->dequeued(msg);
-    policy->tryEnqueue(msg.payload);
-
-    try {
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)");
-    } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testSize)
-{
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 0, 50));
-    QueuedMessage msg = createMessage(10);
-
-    for (size_t i = 0; i < 5; i++) {
-        policy->tryEnqueue(msg.payload);
-    }
-    try {
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-
-    policy->dequeued(msg);
-    policy->tryEnqueue(msg.payload);
-
-    try {
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testBoth)
-{
-    std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 50));
-    try {
-        QueuedMessage msg = createMessage(51);
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-
-    std::vector<QueuedMessage> messages;
-    messages.push_back(createMessage(15));
-    messages.push_back(createMessage(10));
-    messages.push_back(createMessage(11));
-    messages.push_back(createMessage(2));
-    messages.push_back(createMessage(7));
-    for (size_t i = 0; i < messages.size(); i++) {
-        policy->tryEnqueue(messages[i].payload);
-    }
-    //size = 45 at this point, count = 5
-    try {
-        QueuedMessage msg = createMessage(5);
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-    try {
-        QueuedMessage msg = createMessage(10);
-        policy->tryEnqueue(msg.payload);
-        BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
-    } catch (const ResourceLimitExceededException&) {}
-
-
-    policy->dequeued(messages[0]);
-    try {
-        QueuedMessage msg = createMessage(20);
-        policy->tryEnqueue(msg.payload);
-    } catch (const ResourceLimitExceededException&) {
-        BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy);
-    }
-}
-
-QPID_AUTO_TEST_CASE(testSettings)
-{
-    //test reading and writing the policy from/to field table
-    std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy("test", 101, 303));
-    FieldTable settings;
-    a->update(settings);
-    std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy("test", settings));
-    BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount());
-    BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
-}
-
 QPID_AUTO_TEST_CASE(testRingPolicyCount)
 {
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(RING, 0, 5);
 
     SessionFixture f;
     std::string q("my-ring-queue");
@@ -183,9 +73,8 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
 
     // Ring queue, 500 bytes maxSize
 
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(RING, 500, 0);
 
     SessionFixture f;
     std::string q("my-ring-queue");
@@ -255,9 +144,9 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
 
 QPID_AUTO_TEST_CASE(testStrictRingPolicy)
 {
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(RING_STRICT, 0, 5);
+    args.setString("qpid.flow_stop_count", "0");
 
     SessionFixture f;
     std::string q("my-ring-queue");
@@ -281,9 +170,8 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy
 
 QPID_AUTO_TEST_CASE(testPolicyWithDtx)
 {
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(REJECT, 0, 5);
 
     SessionFixture f;
     std::string q("my-policy-queue");
@@ -367,9 +255,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo
 
 QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit)
 {
-    FieldTable args;
-    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
-    policy->update(args);
+    QueueOptions args;
+    args.setSizePolicy(REJECT, 0, 5);
 
     SessionFixture f;
     std::string q("q");

Modified: qpid/trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp Fri Aug 10 12:04:27 2012
@@ -19,6 +19,7 @@
 
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
 #include "unit_test.h"
 #include <string>
 
@@ -36,33 +37,23 @@ QPID_AUTO_TEST_CASE(testDeclare)
     QueueRegistry reg;
     std::pair<Queue::shared_ptr,  bool> qc;
 
-    qc = reg.declare(foo, false, 0, 0);
+    qc = reg.declare(foo, QueueSettings());
     Queue::shared_ptr q = qc.first;
     BOOST_CHECK(q);
     BOOST_CHECK(qc.second); // New queue
     BOOST_CHECK_EQUAL(foo, q->getName());
 
-    qc = reg.declare(foo, false, 0, 0);
+    qc = reg.declare(foo, QueueSettings());
     BOOST_CHECK_EQUAL(q, qc.first);
     BOOST_CHECK(!qc.second);
 
-    qc = reg.declare(bar, false, 0, 0);
+    qc = reg.declare(bar, QueueSettings());
     q = qc.first;
     BOOST_CHECK(q);
     BOOST_CHECK_EQUAL(true, qc.second);
     BOOST_CHECK_EQUAL(bar, q->getName());
 }
 
-QPID_AUTO_TEST_CASE(testDeclareTmp)
-{
-    QueueRegistry reg;
-    std::pair<Queue::shared_ptr,  bool> qc;
-
-    qc = reg.declare(std::string(), false, 0, 0);
-    BOOST_CHECK(qc.second);
-    BOOST_CHECK_EQUAL(std::string("tmp_1"), qc.first->getName());
-}
-
 QPID_AUTO_TEST_CASE(testFind)
 {
     std::string foo("foo");
@@ -72,8 +63,8 @@ QPID_AUTO_TEST_CASE(testFind)
 
     BOOST_CHECK(reg.find(foo) == 0);
 
-    reg.declare(foo, false, 0, 0);
-    reg.declare(bar, false, 0, 0);
+    reg.declare(foo, QueueSettings());
+    reg.declare(bar, QueueSettings());
     Queue::shared_ptr q = reg.find(bar);
     BOOST_CHECK(q);
     BOOST_CHECK_EQUAL(bar, q->getName());
@@ -85,7 +76,7 @@ QPID_AUTO_TEST_CASE(testDestroy)
     QueueRegistry reg;
     std::pair<Queue::shared_ptr,  bool> qc;
 
-    qc = reg.declare(foo, false, 0, 0);
+    qc = reg.declare(foo, QueueSettings());
     reg.destroy(foo);
     // Queue is gone from the registry.
     BOOST_CHECK(reg.find(foo) == 0);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org