You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2012/03/05 20:12:33 UTC
svn commit: r1297183 - in /qpid/trunk/qpid/cpp/src: qpid/broker/
qpid/cluster/ qpid/ha/ qpid/management/ qpid/replication/ qpid/xml/ tests/
Author: astitcher
Date: Mon Mar 5 19:12:32 2012
New Revision: 1297183
URL: http://svn.apache.org/viewvc?rev=1297183&view=rev
Log:
QPID-3883: Using application headers in messages causes a very large slowdown
Change Exchange route interface not to require a fieldtable
- Exchanges that actually use the fieldtable for routing
need to extract it directly from the message themselves.
This avoids the need to extract the fieldtable from the message
unnecessarily.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h
qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h
qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Mon Mar 5 19:12:32 2012
@@ -153,8 +153,9 @@ bool DirectExchange::unbind(Queue::share
return true;
}
-void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+void DirectExchange::route(Deliverable& msg)
{
+ const string& routingKey = msg.getMessage().getRoutingKey();
PreRoute pr(msg, this);
ConstBindingList b;
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Mon Mar 5 19:12:32 2012
@@ -57,9 +57,7 @@ public:
const std::string& routingKey,
const qpid::framing::FieldTable* args);
virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr<Queue> queue,
const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Mar 5 19:12:32 2012
@@ -159,7 +159,7 @@ void Exchange::doRoute(Deliverable& msg,
void Exchange::routeIVE(){
if (ive && lastMsg.get()){
DeliverableMessage dmsg(lastMsg);
- route(dmsg, lastMsg->getRoutingKey(), lastMsg->getApplicationHeaders());
+ route(dmsg);
}
}
@@ -402,9 +402,9 @@ void Exchange::setProperties(const boost
bool Exchange::routeWithAlternate(Deliverable& msg)
{
- route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+ route(msg);
if (!msg.delivered && alternate) {
- alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders());
+ alternate->route(msg);
}
return msg.delivered;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Mar 5 19:12:32 2012
@@ -196,7 +196,7 @@ public:
virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
- virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg) = 0;
//PersistableExchange:
QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Mon Mar 5 19:12:32 2012
@@ -101,7 +101,7 @@ bool FanOutExchange::unbind(Queue::share
return true;
}
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/)
+void FanOutExchange::route(Deliverable& msg)
{
PreRoute pr(msg, this);
doRoute(msg, bindings.snapshot());
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Mon Mar 5 19:12:32 2012
@@ -54,9 +54,7 @@ class FanOutExchange : public virtual Ex
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Mon Mar 5 19:12:32 2012
@@ -191,8 +191,9 @@ bool HeadersExchange::unbind(Queue::shar
}
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args)
+void HeadersExchange::route(Deliverable& msg)
{
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (!args) {
//can't match if there were no headers passed in
if (mgmtExchange != 0) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Mon Mar 5 19:12:32 2012
@@ -98,9 +98,7 @@ class HeadersExchange : public virtual E
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Mar 5 19:12:32 2012
@@ -215,7 +215,7 @@ void Queue::deliver(boost::intrusive_ptr
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
- alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
+ alternateExchange->route(deliverable);
}
} else if (isLocal(msg)) {
//drop message
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Mar 5 19:12:32 2012
@@ -489,14 +489,14 @@ void SemanticState::route(intrusive_ptr<
exchangeName << " with routing-key " << msg->getRoutingKey()));
}
- cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
+ cacheExchange->route(strategy);
if (!strategy.delivered) {
//TODO:if discard-unroutable, just drop it
//TODO:else if accept-mode is explicit, reject it
//else route it to alternate exchange
if (cacheExchange->getAlternate()) {
- cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
+ cacheExchange->getAlternate()->route(strategy);
}
if (!strategy.delivered) {
msg->destroy();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Mon Mar 5 19:12:32 2012
@@ -350,8 +350,9 @@ TopicExchange::BindingKey *TopicExchange
return (q != qv.end()) ? bk : 0;
}
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+void TopicExchange::route(Deliverable& msg)
{
+ const string& routingKey = msg.getMessage().getRoutingKey();
// Note: PERFORMANCE CRITICAL!!!
BindingList b;
std::map<std::string, BindingList>::iterator it;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Mon Mar 5 19:12:32 2012
@@ -185,9 +185,7 @@ class TopicExchange : public virtual Exc
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ QPID_BROKER_EXTERN virtual void route(Deliverable& msg);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
const std::string* const routingKey,
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.cpp Mon Mar 5 19:12:32 2012
@@ -62,7 +62,8 @@ bool CredentialsExchange::check(MemberId
return valid;
}
-void CredentialsExchange::route(broker::Deliverable& msg, const string& /*routingKey*/, const framing::FieldTable* args) {
+void CredentialsExchange::route(broker::Deliverable& msg) {
+ const framing::FieldTable* args = msg.getMessage().getApplicationHeaders();
sys::Mutex::ScopedLock l(lock);
const broker::ConnectionState* connection =
static_cast<const broker::ConnectionState*>(msg.getMessage().getPublisher());
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/CredentialsExchange.h Mon Mar 5 19:12:32 2012
@@ -50,7 +50,7 @@ class CredentialsExchange : public broke
bool check(MemberId member);
/** Throw an exception if the calling connection is not the cluster user. Store credentials in msg. */
- void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
+ void route(broker::Deliverable& msg);
// Exchange overrides
std::string getType() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp Mon Mar 5 19:12:32 2012
@@ -80,7 +80,7 @@ bool FailoverExchange::isBound(Queue::sh
return queues.find(queue) != queues.end();
}
-void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) {
+void FailoverExchange::route(Deliverable&) {
QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/FailoverExchange.h Mon Mar 5 19:12:32 2012
@@ -54,7 +54,7 @@ class FailoverExchange : public broker::
bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args);
- void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
+ void route(broker::Deliverable& msg);
private:
void sendUpdate(const boost::shared_ptr<broker::Queue>&);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp Mon Mar 5 19:12:32 2012
@@ -40,9 +40,9 @@ UpdateDataExchange::UpdateDataExchange(C
Exchange(EXCHANGE_NAME, &cluster)
{}
-void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
- const qpid::framing::FieldTable* )
+void UpdateDataExchange::route(broker::Deliverable& msg)
{
+ const std::string& routingKey = msg.getMessage().getRoutingKey();
std::string data = msg.getMessage().getFrames().getContent();
if (routingKey == MANAGEMENT_AGENTS_KEY) managementAgents = data;
else if (routingKey == MANAGEMENT_SCHEMAS_KEY) managementSchemas = data;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h Mon Mar 5 19:12:32 2012
@@ -50,8 +50,7 @@ class UpdateDataExchange : public broker
UpdateDataExchange(Cluster& parent);
- void route(broker::Deliverable& msg, const std::string& routingKey,
- const framing::FieldTable* args);
+ void route(broker::Deliverable& msg);
// Not implemented
std::string getType() const { return EXCHANGE_TYPE; }
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Mar 5 19:12:32 2012
@@ -226,7 +226,8 @@ void BrokerReplicator::initializeBridge(
}
// FIXME aconway 2011-12-02: error handling in route.
-void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
+void BrokerReplicator::route(Deliverable& msg) {
+ const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
Variant::List list;
try {
if (!isQMFv2(msg.getMessage()) || !headers)
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Mon Mar 5 19:12:32 2012
@@ -58,7 +58,7 @@ class BrokerReplicator : public broker::
// Exchange methods
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
- void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon Mar 5 19:12:32 2012
@@ -138,8 +138,9 @@ void QueueReplicator::dequeue(SequenceNu
}
// Called in connection thread of the queues bridge to primary.
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*)
+void QueueReplicator::route(Deliverable& msg)
{
+ const std::string& key = msg.getMessage().getRoutingKey();
sys::Mutex::ScopedLock l(lock);
if (key == DEQUEUE_EVENT_KEY) {
SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Mon Mar 5 19:12:32 2012
@@ -66,7 +66,7 @@ class QueueReplicator : public broker::E
bool bind(boost::shared_ptr<broker::Queue
>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
- void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
+ void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Mar 5 19:12:32 2012
@@ -564,7 +564,7 @@ void ManagementAgent::sendBufferLH(Buffe
DeliverableMessage deliverable (msg);
try {
- exchange->route(deliverable, routingKey, 0);
+ exchange->route(deliverable);
} catch(exception&) {}
}
buf.reset();
@@ -641,7 +641,7 @@ void ManagementAgent::sendBufferLH(const
DeliverableMessage deliverable (msg);
try {
- exchange->route(deliverable, routingKey, 0);
+ exchange->route(deliverable);
} catch(exception&) {}
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp Mon Mar 5 19:12:32 2012
@@ -40,17 +40,17 @@ ManagementDirectExchange::ManagementDire
DirectExchange(_name, _durable, _args, _parent, b),
managementAgent(0) {}
-void ManagementDirectExchange::route(Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
+void ManagementDirectExchange::route(Deliverable& msg)
{
bool routeIt = true;
+ const string& routingKey = msg.getMessage().getRoutingKey();
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (managementAgent)
routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion);
if (routeIt)
- DirectExchange::route(msg, routingKey, args);
+ DirectExchange::route(msg);
}
void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv)
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h Mon Mar 5 19:12:32 2012
@@ -43,9 +43,7 @@ class ManagementDirectExchange : public
virtual std::string getType() const { return typeName; }
- virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg);
void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp Mon Mar 5 19:12:32 2012
@@ -39,18 +39,18 @@ ManagementTopicExchange::ManagementTopic
TopicExchange(_name, _durable, _args, _parent, b),
managementAgent(0) {}
-void ManagementTopicExchange::route(Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
+void ManagementTopicExchange::route(Deliverable& msg)
{
bool routeIt = true;
+ const string& routingKey = msg.getMessage().getRoutingKey();
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
// Intercept management agent commands
if (managementAgent)
routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion);
if (routeIt)
- TopicExchange::route(msg, routingKey, args);
+ TopicExchange::route(msg);
}
bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h Mon Mar 5 19:12:32 2012
@@ -43,9 +43,7 @@ class ManagementTopicExchange : public v
virtual std::string getType() const { return typeName; }
- virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg);
virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Mon Mar 5 19:12:32 2012
@@ -80,7 +80,7 @@ void ReplicatingEventListener::route(boo
try {
if (exchange) {
DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
+ exchange->route(deliverable);
} else if (queue) {
queue->deliver(msg);
} else {
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Mon Mar 5 19:12:32 2012
@@ -50,8 +50,9 @@ ReplicationExchange::ReplicationExchange
std::string ReplicationExchange::getType() const { return typeName; }
-void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args)
+void ReplicationExchange::route(Deliverable& msg)
{
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (mgmtExchange != 0) {
mgmtExchange->inc_msgReceives();
mgmtExchange->inc_byteReceives(msg.contentSize());
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h Mon Mar 5 19:12:32 2012
@@ -52,7 +52,7 @@ class ReplicationExchange : public qpid:
std::string getType() const;
- void route(qpid::broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ void route(qpid::broker::Deliverable& msg);
bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp Mon Mar 5 19:12:32 2012
@@ -283,8 +283,10 @@ bool XmlExchange::matches(Query& query,
// But for very large messages, if all these queries are on the first part of the data,
// it could still be a big win.
-void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args)
+void XmlExchange::route(Deliverable& msg)
{
+ const string& routingKey = msg.getMessage().getRoutingKey();
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
PreRoute pr(msg, this);
try {
XmlBinding::vector::ConstPtr p;
Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h Mon Mar 5 19:12:32 2012
@@ -82,7 +82,7 @@ class XmlExchange : public virtual Excha
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg);
virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
Modified: qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Mon Mar 5 19:12:32 2012
@@ -60,10 +60,10 @@ QPID_AUTO_TEST_CASE(testMe)
queue.reset();
queue2.reset();
- intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "key", false, "id"));
+ intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "abc", false, "id"));
DeliverableMessage msg(msgPtr);
- topic.route(msg, "abc", 0);
- direct.route(msg, "abc", 0);
+ topic.route(msg);
+ direct.route(msg);
}
@@ -187,17 +187,17 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
{
DirectExchange direct("direct1", false, args);
- intrusive_ptr<Message> msg1 = cmessage("e", "A");
- intrusive_ptr<Message> msg2 = cmessage("e", "B");
- intrusive_ptr<Message> msg3 = cmessage("e", "C");
+ intrusive_ptr<Message> msg1 = cmessage("e", "abc");
+ intrusive_ptr<Message> msg2 = cmessage("e", "abc");
+ intrusive_ptr<Message> msg3 = cmessage("e", "abc");
DeliverableMessage dmsg1(msg1);
DeliverableMessage dmsg2(msg2);
DeliverableMessage dmsg3(msg3);
- direct.route(dmsg1, "abc", 0);
- direct.route(dmsg2, "abc", 0);
- direct.route(dmsg3, "abc", 0);
+ direct.route(dmsg1);
+ direct.route(dmsg2);
+ direct.route(dmsg3);
BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
@@ -208,22 +208,24 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
TopicExchange topic ("topic1", false, args);
// check other exchanges, that they preroute
- intrusive_ptr<Message> msg4 = cmessage("e", "A");
- intrusive_ptr<Message> msg5 = cmessage("e", "B");
- intrusive_ptr<Message> msg6 = cmessage("e", "C");
+ intrusive_ptr<Message> msg4 = cmessage("e", "abc");
+ intrusive_ptr<Message> msg5 = cmessage("e", "abc");
+
+ // Need at least empty header for the HeadersExchange to route at all
+ msg5->insertCustomProperty("", "");
+ intrusive_ptr<Message> msg6 = cmessage("e", "abc");
DeliverableMessage dmsg4(msg4);
DeliverableMessage dmsg5(msg5);
DeliverableMessage dmsg6(msg6);
- fanout.route(dmsg4, "abc", 0);
+ fanout.route(dmsg4);
BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- FieldTable headers;
- header.route(dmsg5, "abc", &headers);
+ header.route(dmsg5);
BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- topic.route(dmsg6, "abc", 0);
+ topic.route(dmsg6);
BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
direct.encode(buffer);
}
@@ -233,9 +235,9 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
buffer.reset();
DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer);
- intrusive_ptr<Message> msg1 = cmessage("e", "A");
+ intrusive_ptr<Message> msg1 = cmessage("e", "abc");
DeliverableMessage dmsg1(msg1);
- exch_dec->route(dmsg1, "abc", 0);
+ exch_dec->route(dmsg1);
BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
@@ -260,10 +262,10 @@ QPID_AUTO_TEST_CASE(testIVEOption)
args2.setString("x-match", "any");
args2.setString("a", "abc");
- direct.route(dmsg1, "abc", 0);
- fanout.route(dmsg1, "abc", 0);
- header.route(dmsg1, "abc", &args2);
- topic.route(dmsg1, "abc", 0);
+ direct.route(dmsg1);
+ fanout.route(dmsg1);
+ header.route(dmsg1);
+ topic.route(dmsg1);
Queue::shared_ptr queue(new Queue("queue", true));
Queue::shared_ptr queue1(new Queue("queue1", true));
Queue::shared_ptr queue2(new Queue("queue2", true));
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1297183&r1=1297182&r2=1297183&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Mon Mar 5 19:12:32 2012
@@ -254,8 +254,8 @@ QPID_AUTO_TEST_CASE(testBound){
//ensure the remaining exchanges don't still have the queue bound to them:
FailOnDeliver deliverable;
- exchange1->route(deliverable, key, &args);
- exchange3->route(deliverable, key, &args);
+ exchange1->route(deliverable);
+ exchange3->route(deliverable);
}
QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
@@ -1151,7 +1151,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg01(msg01);
- sbtFanout1.route(dmsg01, "", 0); // Brings queue 1 to capacity limit
+ sbtFanout1.route(dmsg01); // Brings queue 1 to capacity limit
msg01->tryReleaseContent();
BOOST_CHECK_EQUAL(msg01->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
@@ -1160,7 +1160,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
DeliverableMessage dmsg02(msg02);
{
ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg02, "", 0), ResourceLimitExceededException);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg02), ResourceLimitExceededException);
}
msg02->tryReleaseContent();
BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
@@ -1170,7 +1170,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
DeliverableMessage dmsg03(msg03);
{
ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg03, "", 0), ResourceLimitExceededException);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg03), ResourceLimitExceededException);
}
msg03->tryReleaseContent();
BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
@@ -1180,7 +1180,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
DeliverableMessage dmsg04(msg04);
{
ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg04, "", 0), ResourceLimitExceededException);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg04), ResourceLimitExceededException);
}
msg04->tryReleaseContent();
BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
@@ -1190,7 +1190,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
DeliverableMessage dmsg05(msg05);
{
ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg05, "", 0), ResourceLimitExceededException);
+ BOOST_CHECK_THROW(sbtFanout1.route(dmsg05), ResourceLimitExceededException);
}
msg05->tryReleaseContent();
BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
@@ -1205,35 +1205,35 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg06(msg06);
- sbdFanout2.route(dmsg06, "", 0); // Brings queue 2 to capacity limit
+ sbdFanout2.route(dmsg06); // Brings queue 2 to capacity limit
msg06->tryReleaseContent();
BOOST_CHECK_EQUAL(msg06->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, dq2->getMessageCount());
intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg07(msg07);
- sbdFanout2.route(dmsg07, "", 0);
+ sbdFanout2.route(dmsg07);
msg07->tryReleaseContent();
BOOST_CHECK_EQUAL(msg07->isContentReleased(), true);
BOOST_CHECK_EQUAL(2u, dq2->getMessageCount());
intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg08(msg08);
- sbdFanout2.route(dmsg08, "", 0);
+ sbdFanout2.route(dmsg08);
msg08->tryReleaseContent();
BOOST_CHECK_EQUAL(msg08->isContentReleased(), true);
BOOST_CHECK_EQUAL(3u, dq2->getMessageCount());
intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg09(msg09);
- sbdFanout2.route(dmsg09, "", 0);
+ sbdFanout2.route(dmsg09);
msg09->tryReleaseContent();
BOOST_CHECK_EQUAL(msg09->isContentReleased(), true);
BOOST_CHECK_EQUAL(4u, dq2->getMessageCount());
intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg10(msg10);
- sbdFanout2.route(dmsg10, "", 0);
+ sbdFanout2.route(dmsg10);
msg10->tryReleaseContent();
BOOST_CHECK_EQUAL(msg10->isContentReleased(), true);
BOOST_CHECK_EQUAL(5u, dq2->getMessageCount());
@@ -1253,7 +1253,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg11(msg11);
- mbdFanout3.route(dmsg11, "", 0); // Brings queues 3 and 4 to capacity limit
+ mbdFanout3.route(dmsg11); // Brings queues 3 and 4 to capacity limit
msg11->tryReleaseContent();
BOOST_CHECK_EQUAL(msg11->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, dq3->getMessageCount());
@@ -1262,7 +1262,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg12(msg12);
- mbdFanout3.route(dmsg12, "", 0);
+ mbdFanout3.route(dmsg12);
msg12->tryReleaseContent();
BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
BOOST_CHECK_EQUAL(2u, dq3->getMessageCount());
@@ -1271,7 +1271,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg13(msg13);
- mbdFanout3.route(dmsg13, "", 0);
+ mbdFanout3.route(dmsg13);
msg13->tryReleaseContent();
BOOST_CHECK_EQUAL(msg13->isContentReleased(), true);
BOOST_CHECK_EQUAL(3u, dq3->getMessageCount());
@@ -1280,7 +1280,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg14(msg14);
- mbdFanout3.route(dmsg14, "", 0);
+ mbdFanout3.route(dmsg14);
msg14->tryReleaseContent();
BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
BOOST_CHECK_EQUAL(4u, dq3->getMessageCount());
@@ -1289,7 +1289,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg15(msg15);
- mbdFanout3.route(dmsg15, "", 0);
+ mbdFanout3.route(dmsg15);
msg15->tryReleaseContent();
BOOST_CHECK_EQUAL(msg15->isContentReleased(), true);
BOOST_CHECK_EQUAL(5u, dq3->getMessageCount());
@@ -1307,7 +1307,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg16(msg16);
- mbdFanout3.route(dmsg16, "", 0);
+ mbdFanout3.route(dmsg16);
msg16->tryReleaseContent();
BOOST_CHECK_EQUAL(msg16->isContentReleased(), false);
BOOST_CHECK_EQUAL(6u, dq3->getMessageCount());
@@ -1316,7 +1316,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg17(msg17);
- mbdFanout3.route(dmsg17, "", 0);
+ mbdFanout3.route(dmsg17);
msg17->tryReleaseContent();
BOOST_CHECK_EQUAL(msg17->isContentReleased(), false);
BOOST_CHECK_EQUAL(7u, dq3->getMessageCount());
@@ -1325,7 +1325,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg18(msg18);
- mbdFanout3.route(dmsg18, "", 0);
+ mbdFanout3.route(dmsg18);
msg18->tryReleaseContent();
BOOST_CHECK_EQUAL(msg18->isContentReleased(), false);
BOOST_CHECK_EQUAL(8u, dq3->getMessageCount());
@@ -1334,7 +1334,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg19(msg19);
- mbdFanout3.route(dmsg19, "", 0);
+ mbdFanout3.route(dmsg19);
msg19->tryReleaseContent();
BOOST_CHECK_EQUAL(msg19->isContentReleased(), false);
BOOST_CHECK_EQUAL(9u, dq3->getMessageCount());
@@ -1357,7 +1357,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg20(msg20);
- mbmFanout4.route(dmsg20, "", 0); // Brings queue 7 to capacity limit
+ mbmFanout4.route(dmsg20); // Brings queue 7 to capacity limit
msg20->tryReleaseContent();
BOOST_CHECK_EQUAL(msg20->isContentReleased(), false);
BOOST_CHECK_EQUAL(1u, dq7->getMessageCount());
@@ -1366,7 +1366,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
DeliverableMessage dmsg21(msg21);
- mbmFanout4.route(dmsg21, "", 0);
+ mbmFanout4.route(dmsg21);
msg21->tryReleaseContent();
BOOST_CHECK_EQUAL(msg21->isContentReleased(), false);
BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit
@@ -1375,7 +1375,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
DeliverableMessage dmsg22(msg22);
- mbmFanout4.route(dmsg22, "", 0);
+ mbmFanout4.route(dmsg22);
msg22->tryReleaseContent();
BOOST_CHECK_EQUAL(msg22->isContentReleased(), false);
BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit
@@ -1384,7 +1384,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content
DeliverableMessage dmsg23(msg23);
- mbmFanout4.route(dmsg23, "", 0);
+ mbmFanout4.route(dmsg23);
msg23->tryReleaseContent();
BOOST_CHECK_EQUAL(msg23->isContentReleased(), false);
BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit
@@ -1393,7 +1393,7 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content
DeliverableMessage dmsg24(msg24);
- mbmFanout4.route(dmsg24, "", 0);
+ mbmFanout4.route(dmsg24);
msg24->tryReleaseContent();
BOOST_CHECK_EQUAL(msg24->isContentReleased(), false);
BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org