You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2009/09/25 20:04:59 UTC

svn commit: r818935 - in /qpid/trunk/qpid/cpp/src/qpid: broker/DirectExchange.cpp broker/Exchange.cpp broker/Exchange.h broker/FanOutExchange.cpp broker/HeadersExchange.cpp broker/TopicExchange.cpp xml/XmlExchange.cpp

Author: kpvdr
Date: Fri Sep 25 18:04:59 2009
New Revision: 818935

URL: http://svn.apache.org/viewvc?rev=818935&view=rev
Log:
Exchange route() refactorization that eliminates common code in each of the Exchange subclass route() methods.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Fri Sep 25 18:04:59 2009
@@ -145,39 +145,12 @@
 void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
 {
     PreRoute pr(msg, this);
-    Queues::ConstPtr p;
+    ConstBindingList b;
     {
         Mutex::ScopedLock l(lock);
-        p = bindings[routingKey].queues.snapshot();
-    }
-    int count(0);
-
-    if (p) {
-        for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
-            msg.deliverTo((*i)->queue);
-            if ((*i)->mgmtBinding != 0)
-                (*i)->mgmtBinding->inc_msgMatched();
-        }
-    }
-
-    if(!count){
-        QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey
-                 << "; no matching binding found");
-        if (mgmtExchange != 0) {
-            mgmtExchange->inc_msgDrops();
-            mgmtExchange->inc_byteDrops(msg.contentSize());
-        }
-    } else {
-        if (mgmtExchange != 0) {
-            mgmtExchange->inc_msgRoutes(count);
-            mgmtExchange->inc_byteRoutes(count * msg.contentSize());
-        }
-    }
-
-    if (mgmtExchange != 0) {
-        mgmtExchange->inc_msgReceives();
-        mgmtExchange->inc_byteReceives(msg.contentSize());
+        b = bindings[routingKey].queues.snapshot();
     }
+    doRoute(msg, b);
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Sep 25 18:04:59 2009
@@ -76,6 +76,36 @@
     }
 }
 
+void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
+{
+    int count = 0;
+
+    if (b.get()) {
+        for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
+            msg.deliverTo((*i)->queue);
+            if ((*i)->mgmtBinding != 0)
+                (*i)->mgmtBinding->inc_msgMatched();
+        }
+    }
+
+    if (mgmtExchange != 0)
+    {
+        mgmtExchange->inc_msgReceives  ();
+        mgmtExchange->inc_byteReceives (msg.contentSize ());
+        if (count == 0)
+        {
+            //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
+            mgmtExchange->inc_msgDrops  ();
+            mgmtExchange->inc_byteDrops (msg.contentSize ());
+        }
+        else
+        {
+            mgmtExchange->inc_msgRoutes  (count);
+            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+        }
+    }
+}
+
 void Exchange::routeIVE(){
     if (ive && lastMsg.get()){
         DeliverableMessage dmsg(lastMsg);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Fri Sep 25 18:04:59 2009
@@ -79,6 +79,9 @@
         Exchange* parent;
     };
            
+    typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
+    typedef boost::shared_ptr<      std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
+    void doRoute(Deliverable& msg, ConstBindingList b);
     void routeIVE();
            
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Fri Sep 25 18:04:59 2009
@@ -106,36 +106,12 @@
     return true;
 }
 
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/)
+{
     PreRoute pr(msg, this);
-    uint32_t count(0);
-
-    BindingsArray::ConstPtr p = bindings.snapshot();
-    if (p.get()){
-        for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
-            msg.deliverTo((*i)->queue);
-            if ((*i)->mgmtBinding != 0)
-                (*i)->mgmtBinding->inc_msgMatched ();
-        }
-    }
-    
-    if (mgmtExchange != 0)
-    {
-        mgmtExchange->inc_msgReceives  ();
-        mgmtExchange->inc_byteReceives (msg.contentSize ());
-        if (count == 0)
-        {
-            mgmtExchange->inc_msgDrops  ();
-            mgmtExchange->inc_byteDrops (msg.contentSize ());
-        }
-        else
-        {
-            mgmtExchange->inc_msgRoutes  (count);
-            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
-        }
-    }
+    doRoute(msg, bindings.snapshot());
 }
-
+    
 bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
 {
     BindingsArray::ConstPtr ptr = bindings.snapshot();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Fri Sep 25 18:04:59 2009
@@ -104,7 +104,8 @@
 }
 
 
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args)
+{
     if (!args) {
         //can't match if there were no headers passed in
         if (mgmtExchange != 0) {
@@ -118,31 +119,17 @@
 
     PreRoute pr(msg, this);
 
-    uint32_t count(0);
-
-    Bindings::ConstPtr p = bindings.snapshot();
-    if (p.get()){
+    ConstBindingList p = bindings.snapshot();
+    BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+    if (p.get())
+    {
         for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
             if (match((*i)->args, *args)) {
-                msg.deliverTo((*i)->queue);
-                count++;
-                if ((*i)->mgmtBinding != 0)
-                    (*i)->mgmtBinding->inc_msgMatched();
+                b->push_back(*i);
             }
         }
     }
-
-    if (mgmtExchange != 0) {
-        mgmtExchange->inc_msgReceives();
-        mgmtExchange->inc_byteReceives(msg.contentSize());
-        if (count == 0) {
-            mgmtExchange->inc_msgDrops();
-            mgmtExchange->inc_byteDrops(msg.contentSize());
-        } else {
-            mgmtExchange->inc_msgRoutes(count);
-            mgmtExchange->inc_byteRoutes(count * msg.contentSize());
-        }
-    }
+    doRoute(msg, b);
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Fri Sep 25 18:04:59 2009
@@ -293,44 +293,23 @@
     return q != qv.end();
 }
 
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
     Binding::vector mb;
+    BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
     PreRoute pr(msg, this);
-    uint32_t count(0);
-
     {
-    RWlock::ScopedRlock l(lock);
-    for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
-        if (match(i->first, routingKey)) {
-            Binding::vector& qv(i->second.bindingVector);
-            for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
-                mb.push_back(*j);
+        RWlock::ScopedRlock l(lock);
+        for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+            if (match(i->first, routingKey)) {
+                Binding::vector& qv(i->second.bindingVector);
+                for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
+                    b->push_back(*j);
+                }
             }
         }
     }
-    }
-    
-    for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) {
-        msg.deliverTo((*j)->queue);
-        if ((*j)->mgmtBinding != 0)
-            (*j)->mgmtBinding->inc_msgMatched ();
-    }
-
-    if (mgmtExchange != 0)
-    {
-        mgmtExchange->inc_msgReceives  ();
-        mgmtExchange->inc_byteReceives (msg.contentSize ());
-        if (count == 0)
-        {
-            mgmtExchange->inc_msgDrops  ();
-            mgmtExchange->inc_byteDrops (msg.contentSize ());
-        }
-        else
-        {
-            mgmtExchange->inc_msgRoutes  (count);
-            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
-        }
-    }
+    doRoute(msg, b);
 }
 
 bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)

Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=818935&r1=818934&r2=818935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp Fri Sep 25 18:04:59 2009
@@ -206,45 +206,22 @@
     PreRoute pr(msg, this);
     try {
         XmlBinding::vector::ConstPtr p;
-       {
+        BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+        {
             RWlock::ScopedRlock l(lock);
-           p = bindingsMap[routingKey].snapshot();
-           if (!p) return;
-       }
-        int count(0);
+            p = bindingsMap[routingKey].snapshot();
+            if (!p.get()) return;
+        }
 
         for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
             if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { 
-                msg.deliverTo((*i)->queue);
-                count++;
-                QPID_LOG(trace, "Delivered to queue" );
-
-                if ((*i)->mgmtBinding != 0)
-                    (*i)->mgmtBinding->inc_msgMatched ();
+                b->push_back(*i);
             }
-       }
-       if (!count) {
-           QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey);
-           if (mgmtExchange != 0) {
-               mgmtExchange->inc_msgDrops  ();
-               mgmtExchange->inc_byteDrops (msg.contentSize ());
-           }
-       } else {
-           if (mgmtExchange != 0) {
-               mgmtExchange->inc_msgRoutes  (count);
-               mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
-           }
-       }
-
-       if (mgmtExchange != 0) {
-           mgmtExchange->inc_msgReceives  ();
-           mgmtExchange->inc_byteReceives (msg.contentSize ());
-       }
+        }
+        doRoute(msg, b);
     } catch (...) {
         QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey);
     }
-      
-
 }
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org