You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jo...@apache.org on 2010/10/21 00:15:24 UTC

svn commit: r1025780 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/xml/ tests/

Author: jonathan
Date: Wed Oct 20 22:15:24 2010
New Revision: 1025780

URL: http://svn.apache.org/viewvc?rev=1025780&view=rev
Log:
Adds support for federation in the XML exchange.

Resolves QPID-2348 for the XML Exchange. Also made some changes to the file structure for fedop constants.



Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h
    qpid/trunk/qpid/cpp/src/tests/federation.py
    qpid/trunk/qpid/cpp/src/tests/run_federation_tests

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Wed Oct 20 22:15:24 2010
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/broker/Bridge.h"
+#include "qpid/broker/FedOps.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Link.h"
@@ -37,18 +38,6 @@ using qpid::management::ManagementAgent;
 using std::string;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
-namespace 
-{
-const std::string qpidFedOp("qpid.fed.op");
-const std::string qpidFedTags("qpid.fed.tags");
-const std::string qpidFedOrigin("qpid.fed.origin");
-
-const std::string fedOpBind("B");
-const std::string fedOpUnbind("U");
-const std::string fedOpReorigin("R");
-const std::string fedOpHello("H");
-}
-
 namespace qpid {
 namespace broker {
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Wed Oct 20 22:15:24 2010
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/log/Statement.h"
+#include "qpid/broker/FedOps.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/DirectExchange.h"
 #include <iostream>
@@ -32,15 +33,7 @@ namespace _qmf = qmf::org::apache::qpid:
 
 namespace 
 {
-const std::string qpidFedOp("qpid.fed.op");
-const std::string qpidFedTags("qpid.fed.tags");
-const std::string qpidFedOrigin("qpid.fed.origin");
-const std::string qpidExclusiveBinding("qpid.exclusive-binding");
-
-const std::string fedOpBind("B");
-const std::string fedOpUnbind("U");
-const std::string fedOpReorigin("R");
-const std::string fedOpHello("H");
+    const std::string qpidExclusiveBinding("qpid.exclusive-binding");
 }
 
 DirectExchange::DirectExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
@@ -67,7 +60,7 @@ bool DirectExchange::bind(Queue::shared_
         fedOp = args->getAsString(qpidFedOp);
         fedTags = args->getAsString(qpidFedTags);
         fedOrigin = args->getAsString(qpidFedOrigin);
-        exclusiveBinding = args->get(qpidExclusiveBinding);
+        exclusiveBinding = args->get(qpidExclusiveBinding);  // only direct exchanges take exclusive bindings
     }
 
     bool propagate = false;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Wed Oct 20 22:15:24 2010
@@ -21,6 +21,7 @@
 
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/FedOps.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Cluster.h"
 #include "qpid/management/ManagementAgent.h"
@@ -43,19 +44,10 @@ namespace _qmf = qmf::org::apache::qpid:
 
 namespace
 {
-const std::string qpidMsgSequence("qpid.msg_sequence");
-const std::string qpidSequenceCounter("qpid.sequence_counter");
-const std::string qpidIVE("qpid.ive");
-const std::string qpidFedOp("qpid.fed.op");
-const std::string qpidFedTags("qpid.fed.tags");
-const std::string qpidFedOrigin("qpid.fed.origin");
-
-const std::string fedOpBind("B");
-const std::string fedOpUnbind("U");
-const std::string fedOpReorigin("R");
-const std::string fedOpHello("H");
-
-const std::string QPID_MANAGEMENT("qpid.management");
+    const std::string qpidMsgSequence("qpid.msg_sequence");
+    const std::string qpidSequenceCounter("qpid.sequence_counter");
+    const std::string qpidIVE("qpid.ive");
+    const std::string QPID_MANAGEMENT("qpid.management");
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Wed Oct 20 22:15:24 2010
@@ -101,6 +101,13 @@ protected:
     public:
         FedBinding() : localBindings(0) {}
         bool hasLocal() const { return localBindings != 0; }
+
+        /**
+         *  Returns 'true' if and only if this is the first local
+         *  binding.
+         *
+         *  The first local binding may need to be propagated.
+         */
         bool addOrigin(const std::string& origin) {
             if (origin.empty()) {
                 localBindings++;
@@ -113,6 +120,14 @@ protected:
             originSet.erase(origin);
             return true;
         }
+
+        /**
+         *  Returns 'true' if and only if the last local binding is
+         *  deleted.
+         *
+         *  When the last local binding is deleted, it may need to
+         *  be propagated.
+         */
         bool delOrigin() {
             if (localBindings > 0)
                 localBindings--;
@@ -145,6 +160,17 @@ public:
     bool inUseAsAlternate() { return alternateUsers > 0; }
 
     virtual std::string getType() const = 0;
+
+    /**
+     *  bind() is used for two distinct purposes:
+     *
+     *  1. To create a binding, in the conventional sense
+     *
+     *  2. As a vehicle for any FedOp, currently including federated
+     *  binding, federated unbinding, federated reorigin.
+     *
+     */
+
     virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
     virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
     virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Wed Oct 20 22:15:24 2010
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/FedOps.h"
 #include <algorithm>
 
 using namespace qpid::broker;
@@ -26,18 +27,6 @@ using namespace qpid::framing;
 using namespace qpid::sys;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
-namespace 
-{
-const std::string qpidFedOp("qpid.fed.op");
-const std::string qpidFedTags("qpid.fed.tags");
-const std::string qpidFedOrigin("qpid.fed.origin");
-
-const std::string fedOpBind("B");
-const std::string fedOpUnbind("U");
-const std::string fedOpReorigin("R");
-const std::string fedOpHello("H");
-}
-
 FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent, Broker* b) :
     Exchange(_name, _parent, b)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Wed Oct 20 22:15:24 2010
@@ -94,10 +94,11 @@ bool HeadersExchange::bind(Queue::shared
         fedTags = args->getAsString(qpidFedTags);
         fedOrigin = args->getAsString(qpidFedOrigin);
     }
+
     bool propagate = false;
 
     // The federation args get propagated directly, so we need to identify
-    // the non feteration args in case a federated propagate is needed
+    // the non federation args in case a federated propagate is needed
     FieldTable extra_args;
     getNonFedArgs(args, extra_args);
     

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Wed Oct 20 22:15:24 2010
@@ -19,6 +19,7 @@
  *
  */
 #include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/FedOps.h"
 #include "qpid/log/Statement.h"
 #include <algorithm>
 
@@ -37,19 +38,6 @@ namespace _qmf = qmf::org::apache::qpid:
 // - excessive string copying: should be 0 copy, match from original buffer.
 // - match/lookup: use descision tree or other more efficient structure.
 
-namespace 
-{
-const std::string qpidFedOp("qpid.fed.op");
-const std::string qpidFedTags("qpid.fed.tags");
-const std::string qpidFedOrigin("qpid.fed.origin");
-
-const std::string fedOpBind("B");
-const std::string fedOpUnbind("U");
-const std::string fedOpReorigin("R");
-const std::string fedOpHello("H");
-}
-
-
 namespace {
 // Iterate over a string of '.'-separated tokens.
 struct TokenIterator {

Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp Wed Oct 20 22:15:24 2010
@@ -26,6 +26,7 @@
 #include "qpid/broker/DeliverableMessage.h"
 
 #include "qpid/log/Statement.h"
+#include "qpid/broker/FedOps.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -43,19 +44,63 @@
 #include <xqilla/context/ItemFactory.hpp>
 #include <xqilla/xqilla-simple.hpp>
 
+#include <boost/bind.hpp>
+#include <functional>
+#include <algorithm>
 #include <iostream>
 #include <sstream>
 
 using namespace qpid::framing;
 using namespace qpid::sys;
 using qpid::management::Manageable;
-using std::string;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
 namespace qpid {
-namespace broker {
+namespace broker {            
+    
+XQilla XmlBinding::xqilla;
+
+XmlBinding::XmlBinding(const std::string& key, const Queue::shared_ptr queue, const std::string& _fedOrigin, Exchange* parent, 
+                                    const ::qpid::framing::FieldTable& _arguments, const std::string& queryText )
+    :      Binding(key, queue, parent, _arguments),
+           xquery(),
+           parse_message_content(true),
+           fedOrigin(_fedOrigin)
+{ 
+    startManagement();
+
+    QPID_LOG(trace, "Creating binding with query: " << queryText );
+
+    try {  
+        Query q(xqilla.parse(X(queryText.c_str())));
+        xquery = q;
+        
+        QPID_LOG(trace, "Bound successfully with query: " << queryText );
 
+        parse_message_content = false;
 
+        if (xquery->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) {
+            parse_message_content = true;
+        }
+        else {
+            GlobalVariables &vars = const_cast<GlobalVariables&>(xquery->getVariables());
+            for (GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) {
+                if ((*it)->getStaticAnalysis().areContextFlagsUsed()) {
+                    parse_message_content = true;
+                    break;
+                } 
+            }
+        }
+    }
+    catch (XQException& e) {
+        throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText));
+    }
+    catch (...) {
+        throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText));
+    }    
+}
+
+     
 XmlExchange::XmlExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b)
 {
     if (mgmtExchange != 0)
@@ -69,69 +114,83 @@ XmlExchange::XmlExchange(const std::stri
     if (mgmtExchange != 0)
         mgmtExchange->set_type (typeName);
 }
+    
+bool XmlExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
+{ 
+
+    // Federation uses bind for unbind and reorigin comands as well as for binds.
+    //
+    // Both federated and local binds are done in this method.  Other
+    // federated requests are done by calling the relevent methods.
+
+    string fedOp;
+    string fedTags;
+    string fedOrigin;
+    
+    if (args) 
+        fedOp = args->getAsString(qpidFedOp);
+    if (! fedOp.empty())  {
+        fedTags =  args->getAsString(qpidFedTags);
+        fedOrigin = args->getAsString(qpidFedOrigin);
+    }
 
+    if (fedOp == fedOpUnbind) {
+        return fedUnbind(fedOrigin, fedTags, queue, bindingKey, args);
+    }
+    else if (fedOp == fedOpReorigin) {
+        fedReorigin();
+        return true;
+    }
 
-      // #### TODO: The Binding should take the query text
-      // #### only. Consider encapsulating the entire block, including
-      // #### the if condition.
-      
-
-bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments)
-{
-    string queryText = bindingArguments->getAsString("xquery");
-
-    try {
-        RWlock::ScopedWlock l(lock);
-
-	    XmlBinding::vector& bindings(bindingsMap[routingKey]);
-	    XmlBinding::vector::ConstPtr p = bindings.snapshot();
-	    if (!p || std::find_if(p->begin(), p->end(), MatchQueue(queue)) == p->end()) {
-	        Query query(xqilla.parse(X(queryText.c_str())));
-		XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, *bindingArguments, query));
+    // OK, looks like we're really going to bind
+    
+    else if (fedOp.empty() || fedOp == fedOpBind) {
 
-	        QPID_LOG(trace, "Bound successfully with query: " << queryText );
+        string queryText = args->getAsString("xquery");
 
-                binding->parse_message_content = false;
+        RWlock::ScopedWlock l(lock);   
+ 
+        XmlBinding::vector& bindings(bindingsMap[bindingKey]);
+        XmlBinding::vector::ConstPtr p = bindings.snapshot();
+       
+        if (!p || std::find_if(p->begin(), p->end(), MatchQueueAndOrigin(queue, fedOrigin)) == p->end()) {
 
-                if (query->getQueryBody()->getStaticAnalysis().areContextFlagsUsed()) {
-                    binding->parse_message_content = true;
-                }
-                else {
-                    GlobalVariables &vars = const_cast<GlobalVariables&>(query->getVariables());
-                    for(GlobalVariables::iterator it = vars.begin(); it != vars.end(); ++it) {
-                        if ((*it)->getStaticAnalysis().areContextFlagsUsed()) {
-                            binding->parse_message_content = true;
-                            break;
-                        } 
-                    }
-                }
+            XmlBinding::shared_ptr binding(new XmlBinding (bindingKey, queue, fedOrigin, this, *args, queryText));
+            bindings.add(binding);
 
-	        bindings.add(binding);
-	        if (mgmtExchange != 0) {
-	            mgmtExchange->inc_bindingCount();
-                ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
-	        }
-	    } else {
-	        return false;
-	    }
+            if (mgmtExchange != 0) {
+                mgmtExchange->inc_bindingCount();
+            }
+        } else {
+            return false;
+        }
     }
-    catch (XQException& e) {
-        throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText));
-    }
-    catch (...) {
-        throw InternalErrorException(QPID_MSG("Unexpected error - Could not parse xquery:"+ queryText));
+    else {
+        QPID_LOG(warning, "Unknown Federation Op: " << fedOp);
     }
+ 
     routeIVE();
-	return true;
+    propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, args);
+
+    return true;
 }
 
-bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+bool XmlExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
 {
+    /*
+     *  When called directly, no qpidFedOrigin argument will be
+     *  present. When called from federation, it will be present. 
+     *
+     *  This is a bit of a hack - the binding needs the origin, but
+     *  this interface, as originally defined, would not supply one.
+     */
+    string fedOrigin;
+    if (args) fedOrigin = args->getAsString(qpidFedOrigin);
+
     RWlock::ScopedWlock l(lock);
-    if (bindingsMap[routingKey].remove_if(MatchQueue(queue))) {
+    if (bindingsMap[bindingKey].remove_if(MatchQueueAndOrigin(queue, fedOrigin))) {
         if (mgmtExchange != 0) {
             mgmtExchange->dec_bindingCount();
-            ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
         }
         return true;
     } else {
@@ -141,65 +200,65 @@ bool XmlExchange::unbind(Queue::shared_p
 
 bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content) 
 {
-  string msgContent;
+    string msgContent;
 
-  try {
-      QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]");
+    try {
+        QPID_LOG(trace, "matches: query is [" << UTF8(query->getQueryText()) << "]");
+
+        boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
+        if (!context.get()) {
+            throw InternalErrorException(QPID_MSG("Query context looks munged ..."));
+        }
+
+        if (parse_message_content) {
+
+            msg.getMessage().getFrames().getContent(msgContent);
+
+            QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
 
-      boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
-      if (!context.get()) {
-          throw InternalErrorException(QPID_MSG("Query context looks munged ..."));
-      }
-
-      if (parse_message_content) {
-
-          msg.getMessage().getFrames().getContent(msgContent);
-
-          QPID_LOG(trace, "matches: message content is [" << msgContent << "]");
-
-          XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), 
-                                                      msgContent.length(), "input" );
-
-	// This will parse the document using either Xerces or FastXDM, depending
-	// on your XQilla configuration. FastXDM can be as much as 10x faster.
-
-          Sequence seq(context->parseDocument(xml));
-
-          if(!seq.isEmpty() && seq.first()->isNode()) {
-              context->setContextItem(seq.first());
-              context->setContextPosition(1);
-              context->setContextSize(1);
-          }
-      }
-
-      if (args) {
-          FieldTable::ValueMap::const_iterator v = args->begin();
-          for(; v != args->end(); ++v) {
-              // ### TODO: Do types properly
-              if (v->second->convertsTo<std::string>()) {
-                  QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
-                  Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
-                  context->setExternalVariable(X(v->first.c_str()), value);
-              }
-          }
-      }
+            XERCES_CPP_NAMESPACE::MemBufInputSource xml((const XMLByte*) msgContent.c_str(), 
+                                                        msgContent.length(), "input" );
+
+            // This will parse the document using either Xerces or FastXDM, depending
+            // on your XQilla configuration. FastXDM can be as much as 10x faster.
+
+            Sequence seq(context->parseDocument(xml));
+
+            if(!seq.isEmpty() && seq.first()->isNode()) {
+                context->setContextItem(seq.first());
+                context->setContextPosition(1);
+                context->setContextSize(1);
+            }
+        }
 
-      Result result = query->execute(context.get());
+        if (args) {
+            FieldTable::ValueMap::const_iterator v = args->begin();
+            for(; v != args->end(); ++v) {
+                // ### TODO: Do types properly
+                if (v->second->convertsTo<std::string>()) {
+                    QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
+                    Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
+                    context->setExternalVariable(X(v->first.c_str()), value);
+                }
+            }
+        }
+
+        Result result = query->execute(context.get());
 #ifdef XQ_EFFECTIVE_BOOLEAN_VALUE_HPP
-      Item::Ptr first_ = result->next(context.get());
-      Item::Ptr second_ = result->next(context.get());
-      return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0);
+        Item::Ptr first_ = result->next(context.get());
+        Item::Ptr second_ = result->next(context.get());
+        return XQEffectiveBooleanValue::get(first_, second_, context.get(), 0);
 #else 
-      return result->getEffectiveBooleanValue(context.get(), 0);
+        return result->getEffectiveBooleanValue(context.get(), 0);
 #endif
-  }
-  catch (XQException& e) {
-      QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
-  }
-  catch (...) {
-      QPID_LOG(warning, "Unexpected error routing message: " << msgContent);
-  }
-  return 0;
+    }
+    catch (XQException& e) {
+        QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
+    }
+    catch (...) {
+        QPID_LOG(warning, "Unexpected error routing message: " << msgContent);
+    }
+    return 0;
 }
 
 // Future optimization: If any query in a binding for a given routing key requires
@@ -237,16 +296,16 @@ void XmlExchange::route(Deliverable& msg
 }
 
 
-bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) 
+bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const bindingKey, const FieldTable* const) 
 {
     RWlock::ScopedRlock l(lock);
-    if (routingKey) {
-        XmlBindingsMap::iterator i = bindingsMap.find(*routingKey);
+    if (bindingKey) {
+        XmlBindingsMap::iterator i = bindingsMap.find(*bindingKey);
 
         if (i == bindingsMap.end())
-	    return false;
+            return false;
         if (!queue)
-	    return true;
+            return true;
         XmlBinding::vector::ConstPtr p = i->second.snapshot();
         return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
     } else if (!queue) {
@@ -254,20 +313,84 @@ bool XmlExchange::isBound(Queue::shared_
         return bindingsMap.size() > 0;
     } else {
         for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++) {
-	    XmlBinding::vector::ConstPtr p = i->second.snapshot();
+            XmlBinding::vector::ConstPtr p = i->second.snapshot();
             if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
-	}
-	return false;
+        }
+        return false;
     }
 
 }
 
-
 XmlExchange::~XmlExchange() 
 {
     bindingsMap.clear();
 }
 
+void XmlExchange::propagateFedOp(const std::string& bindingKey, const std::string& fedTags, const std::string& fedOp, const std::string& fedOrigin, const qpid::framing::FieldTable* args)
+{
+    FieldTable nonFedArgs;
+
+    if (args) {            
+        for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i)  {
+            const string& name(i->first);
+            if (name != qpidFedOp &&
+                name != qpidFedTags &&
+                name != qpidFedOrigin)  {
+                nonFedArgs.insert((*i));
+            }
+        }
+    }
+
+    FieldTable* propArgs  = (nonFedArgs.count() > 0 ? &nonFedArgs : 0);
+    Exchange::propagateFedOp(bindingKey, fedTags, fedOp, fedOrigin, propArgs);
+}
+
+bool XmlExchange::fedUnbind(const string& fedOrigin, const string& fedTags, Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args)
+{
+    RWlock::ScopedRlock l(lock);
+
+    if (unbind(queue, bindingKey, args)) {
+        propagateFedOp(bindingKey, fedTags, fedOpUnbind, fedOrigin); 
+        return true;
+    }
+    return false;
+}
+
+void XmlExchange::fedReorigin()
+{
+    std::vector<std::string> keys2prop;
+    {
+        RWlock::ScopedRlock l(lock);   
+        for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); ++i) {
+            XmlBinding::vector::ConstPtr p = i->second.snapshot();
+            if (std::find_if(p->begin(), p->end(), MatchOrigin(string())) != p->end()) {
+                keys2prop.push_back(i->first);
+            }
+        }
+    }   /* lock dropped */
+    for (std::vector<std::string>::const_iterator key = keys2prop.begin();
+         key != keys2prop.end(); key++) {
+        propagateFedOp( *key, string(), fedOpBind, string());
+    }
+}
+
+
+XmlExchange::MatchOrigin::MatchOrigin(const string& _origin) : origin(_origin) {}
+
+bool XmlExchange::MatchOrigin::operator()(XmlBinding::shared_ptr b)
+{
+    return b->fedOrigin == origin;
+}
+
+
+XmlExchange::MatchQueueAndOrigin::MatchQueueAndOrigin(Queue::shared_ptr _queue, const string& _origin) : queue(_queue), origin(_origin) {}
+
+bool XmlExchange::MatchQueueAndOrigin::operator()(XmlBinding::shared_ptr b)
+{
+    return b->queue == queue and b->fedOrigin == origin;
+}
+
+
 const std::string XmlExchange::typeName("xml");
  
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/xml/XmlExchange.h Wed Oct 20 22:15:24 2010
@@ -33,41 +33,45 @@
 
 #include <map>
 #include <vector>
+#include <string>
+
+using namespace std;
 
 namespace qpid {
 namespace broker {
 
 class Broker;
-class XmlExchange : public virtual Exchange {
 
-    typedef boost::shared_ptr<XQQuery> Query;
+typedef boost::shared_ptr<XQQuery> Query;
 
-    struct XmlBinding : public Exchange::Binding {
-        typedef boost::shared_ptr<XmlBinding> shared_ptr;
-        typedef qpid::sys::CopyOnWriteArray<XmlBinding::shared_ptr> vector;
-
-        boost::shared_ptr<XQQuery> xquery;
-        bool parse_message_content;
-
-        XmlBinding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent, 
-		   const ::qpid::framing::FieldTable& _arguments, Query query):
-            Binding(key, queue, parent, _arguments),
-	      xquery(query),
-                parse_message_content(true) { startManagement(); }
-    };
+struct XmlBinding : public Exchange::Binding {
 
+     static XQilla xqilla;
+
+     typedef boost::shared_ptr<XmlBinding> shared_ptr;
+     typedef qpid::sys::CopyOnWriteArray<XmlBinding::shared_ptr> vector;
+    
+     Query xquery;
+     bool parse_message_content;
+     const std::string fedOrigin;   // empty for local bindings
+    
+     XmlBinding(const std::string& key, const Queue::shared_ptr queue, const std::string& fedOrigin, Exchange* parent, 
+                const ::qpid::framing::FieldTable& _arguments, const std::string& );
         
-    typedef std::map<std::string, XmlBinding::vector > XmlBindingsMap;
+};
 
+class XmlExchange : public virtual Exchange {
+
+    typedef std::map<string, XmlBinding::vector> XmlBindingsMap;
     XmlBindingsMap bindingsMap;
-    XQilla xqilla;
+
     qpid::sys::RWlock lock;
 
     bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args, bool parse_message_content);
 
   public:
     static const std::string typeName;
-        
+
     XmlExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0);
     XmlExchange(const std::string& _name, bool _durable,
 		const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0);
@@ -82,7 +86,29 @@ class XmlExchange : public virtual Excha
 
     virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
 
+    virtual void propagateFedOp(const std::string& bindingKey, const std::string& fedTags, const std::string& fedOp, const std::string& fedOrigin, const qpid::framing::FieldTable* args=0);
+
+    virtual bool fedUnbind(const std::string& fedOrigin, const std::string& fedTags, Queue::shared_ptr queue, const std::string& bindingKey, const  qpid::framing::FieldTable* args);
+    
+    virtual void fedReorigin();
+
+    virtual bool supportsDynamicBinding() { return true; }
+
     virtual ~XmlExchange();
+
+    struct MatchOrigin {
+        const std::string origin;
+        MatchOrigin(const std::string& origin);
+        bool operator()(XmlBinding::shared_ptr b);
+    };
+
+    struct MatchQueueAndOrigin {
+        const Queue::shared_ptr queue;
+        const std::string origin;
+        MatchQueueAndOrigin(Queue::shared_ptr queue, const std::string& origin);
+        bool operator()(XmlBinding::shared_ptr b);
+    };
+
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/federation.py Wed Oct 20 22:15:24 2010
@@ -802,6 +802,165 @@ class FederationTests(TestBase010):
 
         self.verify_cleanup()
 
+
+    def test_dynamic_headers_xml(self):
+        session = self.session
+        r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+        r_session = r_conn.session("test_dynamic_headers_xml")
+
+        session.exchange_declare(exchange="fed.xml", type="xml")
+        r_session.exchange_declare(exchange="fed.xml", type="xml")
+
+        self.startQmf()
+        qmf = self.qmf
+
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "fed.xml", "fed.xml", "", "", "", False, False, True, 0)
+        
+        self.assertEqual(result.status, 0) 
+        bridge = qmf.getObjects(_class="bridge")[0]
+        sleep(5)
+
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="fed.xml", binding_key="key1", arguments={'xquery':'true()'})
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+        
+        props = r_session.delivery_properties(routing_key="key1")
+        for i in range(1, 11):
+            r_session.message_transfer(destination="fed.xml", message=Message(props, "Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            content = msg.body
+            self.assertEqual("Message %d" % i, msg.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        self.verify_cleanup()
+
+    def test_dynamic_headers_reorigin_xml(self):
+        session = self.session
+        r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+        r_session = r_conn.session("test_dynamic_headers_reorigin_xml")
+
+        session.exchange_declare(exchange="fed.xml_reorigin", type="xml")
+        r_session.exchange_declare(exchange="fed.xml_reorigin", type="xml")
+
+        session.exchange_declare(exchange="fed.xml_reorigin_2", type="xml")
+        r_session.exchange_declare(exchange="fed.xml_reorigin_2", type="xml")
+
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        session.queue_declare(queue="fed2", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed2", exchange="fed.xml_reorigin_2", binding_key="key2",  arguments={'xquery':'true()'})
+        self.subscribe(queue="fed2", destination="f2")
+        queue2 = session.incoming("f2")
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "fed.xml_reorigin", "fed.xml_reorigin", "", "", "", False, False, True, 0)
+
+        self.assertEqual(result.status, 0) 
+        result = link.bridge(False, "fed.xml_reorigin_2", "fed.xml_reorigin_2", "", "", "", False, False, True, 0)
+        self.assertEqual(result.status, 0)
+
+        bridge = qmf.getObjects(_class="bridge")[0]
+        bridge2 = qmf.getObjects(_class="bridge")[1]
+        sleep(5)
+
+        foo=qmf.getObjects(_class="link")
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)  
+        session.exchange_bind(queue="fed1", exchange="fed.xml_reorigin", binding_key="key1",  arguments={'xquery':'true()'})
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        props = r_session.delivery_properties(routing_key="key1")
+        for i in range(1, 11):
+            r_session.message_transfer(destination="fed.xml_reorigin", message=Message(props, "Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            self.assertEqual("Message %d" % i, msg.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+
+        # Extra test: don't explicitly close() bridge2.  When the link is closed,
+        # it should clean up bridge2 automagically.  verify_cleanup() will detect
+        # if bridge2 isn't cleaned up and will fail the test.
+        #
+        #result = bridge2.close()
+        #self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        self.verify_cleanup()
+
+    def test_dynamic_headers_unbind_xml(self):
+        session = self.session
+        r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+        r_session = r_conn.session("test_dynamic_xml_unbind")
+
+        session.exchange_declare(exchange="fed.xml_unbind", type="xml")
+        r_session.exchange_declare(exchange="fed.xml_unbind", type="xml")
+
+        self.startQmf()
+        qmf = self.qmf
+
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "fed.xml_unbind", "fed.xml_unbind", "", "", "", False, False, True, 0)
+
+        self.assertEqual(result.status, 0) 
+        bridge = qmf.getObjects(_class="bridge")[0]
+        sleep(5)
+
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        queue = qmf.getObjects(_class="queue", name="fed1")[0]
+        queue.update()
+        self.assertEqual(queue.bindingCount, 1,
+                         "bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+
+        session.exchange_bind(queue="fed1", exchange="fed.xml_unbind", binding_key="key1",  arguments={'xquery':'true()'})
+        queue.update()
+        self.assertEqual(queue.bindingCount, 2,
+                         "bindings not accounted for (expected 2, got %d)" % queue.bindingCount) 
+
+        session.exchange_unbind(queue="fed1", exchange="fed.xml_unbind", binding_key="key1")
+        queue.update()
+        self.assertEqual(queue.bindingCount, 1,
+                         "bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        self.verify_cleanup()
+
+
     def test_dynamic_topic_nodup(self):
         """Verify that a message whose routing key matches more than one
         binding does not get duplicated to the same queue.

Modified: qpid/trunk/qpid/cpp/src/tests/run_federation_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_federation_tests?rev=1025780&r1=1025779&r2=1025780&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_federation_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_federation_tests Wed Oct 20 22:15:24 2010
@@ -25,29 +25,37 @@ source ./test_env.sh
 
 trap stop_brokers INT TERM QUIT
 
+if [ -f ../.libs/xml.so ] ; then
+    MODULES="--load-module xml" # Load the XML exchange and run XML exchange federation tests
+    SKIPTESTS=""
+else
+    MODULES="--no-module-dir"  
+    SKIPTESTS="-i *xml"
+fi
+
 start_brokers() {
-    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port
+    ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
     LOCAL_PORT=`cat qpidd.port`
-    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port
+    ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
     REMOTE_PORT=`cat qpidd.port`
 
-    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port
+    ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
     REMOTE_B1=`cat qpidd.port`
-    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > qpidd.port
+    ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
     REMOTE_B2=`cat qpidd.port`
 }
 
 stop_brokers() {
-        $QPIDD_EXEC --no-module-dir -q --port $LOCAL_PORT
-        $QPIDD_EXEC --no-module-dir -q --port $REMOTE_PORT
-        $QPIDD_EXEC --no-module-dir -q --port $REMOTE_B1
-        $QPIDD_EXEC --no-module-dir -q --port $REMOTE_B2
+        $QPIDD_EXEC $MODULES -q --port $LOCAL_PORT
+        $QPIDD_EXEC $MODULES -q --port $REMOTE_PORT
+        $QPIDD_EXEC $MODULES -q --port $REMOTE_B1
+        $QPIDD_EXEC $MODULES -q --port $REMOTE_B2
 }
 
 if test -d ${PYTHON_DIR} ;  then
     start_brokers
     echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT $REMOTE_B1 $REMOTE_B2"
-    $QPID_PYTHON_TEST -m federation -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@
+    $QPID_PYTHON_TEST -m federation $SKIPTESTS -b localhost:$LOCAL_PORT -Dremote-port=$REMOTE_PORT -Dextra-brokers="$REMOTE_B1 $REMOTE_B2" $@
     RETCODE=$?
     stop_brokers
     if test x$RETCODE != x0; then



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org