You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/09/25 20:04:59 UTC
svn commit: r818935 - in /qpid/trunk/qpid/cpp/src/qpid:
broker/DirectExchange.cpp broker/Exchange.cpp broker/Exchange.h
broker/FanOutExchange.cpp broker/HeadersExchange.cpp
broker/TopicExchange.cpp xml/XmlExchange.cpp
Author: kpvdr
Date: Fri Sep 25 18:04:59 2009
New Revision: 818935
URL: http://svn.apache.org/viewvc?rev=818935&view=rev
Log:
Exchange route() refactorization that eliminates common code in each of the Exchange subclass route() methods.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Fri Sep 25 18:04:59 2009
@@ -145,39 +145,12 @@
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
{
PreRoute pr(msg, this);
- Queues::ConstPtr p;
+ ConstBindingList b;
{
Mutex::ScopedLock l(lock);
- p = bindings[routingKey].queues.snapshot();
- }
- int count(0);
-
- if (p) {
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
- }
- }
-
- if(!count){
- QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey
- << "; no matching binding found");
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- }
- } else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes(count);
- mgmtExchange->inc_byteRoutes(count * msg.contentSize());
- }
- }
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
+ b = bindings[routingKey].queues.snapshot();
}
+ doRoute(msg, b);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Sep 25 18:04:59 2009
@@ -76,6 +76,36 @@
}
}
+void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
+{
+ int count = 0;
+
+ if (b.get()) {
+ for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ }
+
+ if (mgmtExchange != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+}
+
void Exchange::routeIVE(){
if (ive && lastMsg.get()){
DeliverableMessage dmsg(lastMsg);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Fri Sep 25 18:04:59 2009
@@ -79,6 +79,9 @@
Exchange* parent;
};
+ typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
+ typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
+ void doRoute(Deliverable& msg, ConstBindingList b);
void routeIVE();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Fri Sep 25 18:04:59 2009
@@ -106,36 +106,12 @@
return true;
}
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/)
+{
PreRoute pr(msg, this);
- uint32_t count(0);
-
- BindingsArray::ConstPtr p = bindings.snapshot();
- if (p.get()){
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
- }
- }
-
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
+ doRoute(msg, bindings.snapshot());
}
-
+
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
BindingsArray::ConstPtr ptr = bindings.snapshot();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Fri Sep 25 18:04:59 2009
@@ -104,7 +104,8 @@
}
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args)
+{
if (!args) {
//can't match if there were no headers passed in
if (mgmtExchange != 0) {
@@ -118,31 +119,17 @@
PreRoute pr(msg, this);
- uint32_t count(0);
-
- Bindings::ConstPtr p = bindings.snapshot();
- if (p.get()){
+ ConstBindingList p = bindings.snapshot();
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ if (p.get())
+ {
for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
if (match((*i)->args, *args)) {
- msg.deliverTo((*i)->queue);
- count++;
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ b->push_back(*i);
}
}
}
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
- if (count == 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- } else {
- mgmtExchange->inc_msgRoutes(count);
- mgmtExchange->inc_byteRoutes(count * msg.contentSize());
- }
- }
+ doRoute(msg, b);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Fri Sep 25 18:04:59 2009
@@ -293,44 +293,23 @@
return q != qv.end();
}
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
Binding::vector mb;
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
PreRoute pr(msg, this);
- uint32_t count(0);
-
{
- RWlock::ScopedRlock l(lock);
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, routingKey)) {
- Binding::vector& qv(i->second.bindingVector);
- for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
- mb.push_back(*j);
+ RWlock::ScopedRlock l(lock);
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, routingKey)) {
+ Binding::vector& qv(i->second.bindingVector);
+ for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ b->push_back(*j);
+ }
}
}
}
- }
-
- for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) {
- msg.deliverTo((*j)->queue);
- if ((*j)->mgmtBinding != 0)
- (*j)->mgmtBinding->inc_msgMatched ();
- }
-
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
+ doRoute(msg, b);
}
bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp Fri Sep 25 18:04:59 2009
@@ -206,45 +206,22 @@
PreRoute pr(msg, this);
try {
XmlBinding::vector::ConstPtr p;
- {
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ {
RWlock::ScopedRlock l(lock);
- p = bindingsMap[routingKey].snapshot();
- if (!p) return;
- }
- int count(0);
+ p = bindingsMap[routingKey].snapshot();
+ if (!p.get()) return;
+ }
for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
- msg.deliverTo((*i)->queue);
- count++;
- QPID_LOG(trace, "Delivered to queue" );
-
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ b->push_back(*i);
}
- }
- if (!count) {
- QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- } else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- }
+ }
+ doRoute(msg, b);
} catch (...) {
QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey);
}
-
-
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org