You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/10/24 02:45:12 UTC

svn commit: r707515 - in /incubator/qpid/trunk/qpid: cpp/examples/direct/ cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/src/qpid/broker/ cpp/src/tests/ python/commands/ specs/

Author: tross
Date: Thu Oct 23 17:45:11 2008
New Revision: 707515

URL: http://svn.apache.org/viewvc?rev=707515&view=rev
Log:
QPID-1348 - Dynamic binding for federation.  Parameterized exchange names for CPP examples

Modified:
    incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp
    incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp
    incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp
    incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp
    incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
    incubator/qpid/trunk/qpid/python/commands/qpid-route
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp Thu Oct 23 17:45:11 2008
@@ -56,6 +56,7 @@
 int main(int argc, char** argv) {
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
+    string exchange(argc>3 ? argv[3] : "amq.direct");
     Connection connection;
 
     try {
@@ -69,7 +70,7 @@
       // routing key is "routing_key" to this newly created queue.
 
       session.queueDeclare(arg::queue="message_queue");
-      session.exchangeBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::bindingKey="routing_key");
+      session.exchangeBind(arg::exchange=exchange, arg::queue="message_queue", arg::bindingKey="routing_key");
 
   //-----------------------------------------------------------------------------
 

Modified: incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp Thu Oct 23 17:45:11 2008
@@ -65,6 +65,7 @@
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
     int count = argc>3 ? atoi(argv[3]) : 10;
+    string exchange(argc>4 ? argv[4] : "amq.direct");
     Connection connection;
     Message message;
     try {
@@ -89,14 +90,14 @@
 	  message.setData(message_data.str());
           // Asynchronous transfer sends messages as quickly as
           // possible without waiting for confirmation.
-          // async(session).messageTransfer(arg::content=message,  arg::destination="amq.direct");
-          session.messageTransfer(arg::content=message,  arg::destination="amq.direct");
+          // async(session).messageTransfer(arg::content=message,  arg::destination=exchange);
+          session.messageTransfer(arg::content=message,  arg::destination=exchange);
 	}
 	
 	// And send a final message to indicate termination.
 
 	message.setData("That's all, folks!");
-        session.messageTransfer(arg::content=message,  arg::destination="amq.direct"); 
+        session.messageTransfer(arg::content=message,  arg::destination=exchange); 
 
   //-----------------------------------------------------------------------------
 

Modified: incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp Thu Oct 23 17:45:11 2008
@@ -64,6 +64,7 @@
 int main(int argc, char** argv) {
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
+    string exchange = argc>3 ? argv[3] : "amq.fanout";
     Connection connection;
     Message message;
     try {
@@ -86,13 +87,13 @@
 	  message.setData(message_data.str());
           // Asynchronous transfer sends messages as quickly as
           // possible without waiting for confirmation.
-          async(session).messageTransfer(arg::content=message, arg::destination="amq.fanout");
+          async(session).messageTransfer(arg::content=message, arg::destination=exchange);
 	}
 	
 	// And send a final message to indicate termination.
 
 	message.setData("That's all, folks!");
-        session.messageTransfer(arg::content=message, arg::destination="amq.fanout"); 
+        session.messageTransfer(arg::content=message, arg::destination=exchange);
 
   //-----------------------------------------------------------------------------
 

Modified: incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp Thu Oct 23 17:45:11 2008
@@ -60,6 +60,7 @@
 int main(int argc, char** argv) {
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
+    string exchange = argc>3 ? argv[3] : "amq.fanout";
     Connection connection;
     Message msg;
     try {
@@ -82,7 +83,7 @@
         session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
                              arg::autoDelete=true);
 
-        session.exchangeBind(arg::exchange="amq.fanout", arg::queue=myQueue, arg::bindingKey="my-key");
+        session.exchangeBind(arg::exchange=exchange, arg::queue=myQueue, arg::bindingKey="my-key");
 
         // Create a listener and subscribe it to my queue.
         SubscriptionManager subscriptions(session);

Modified: incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp Thu Oct 23 17:45:11 2008
@@ -61,7 +61,7 @@
     SubscriptionManager subscriptions;
   public:
     Listener(Session& session);
-    virtual void prepareQueue(std::string queue, std::string routing_key);
+    virtual void prepareQueue(std::string queue, std::string exchange, std::string routing_key);
     virtual void received(Message& message);
     virtual void listen();
     ~Listener() { };
@@ -84,7 +84,7 @@
 }
 
 
-void Listener::prepareQueue(std::string queue, std::string routing_key) {
+void Listener::prepareQueue(std::string queue, std::string exchange, std::string routing_key) {
 
     /* Create a unique queue name for this consumer by concatenating
      * the queue name parameter with the Session ID.
@@ -106,8 +106,8 @@
      * "control" routing key, when it is finished.
      */
 
-    session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey=routing_key);
-    session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey="control");
+    session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=routing_key);
+    session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey="control");
 
     /*
      * subscribe to the queue using the subscription manager.
@@ -134,6 +134,7 @@
 int main(int argc, char** argv) {
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
+    std::string exchange = argc>3 ? argv[3] : "amq.topic";
     Connection connection;
     try {
         connection.open(host, port);
@@ -147,12 +148,12 @@
 
         // Subscribe to messages on the queues we are interested in
 
-	listener.prepareQueue("usa", "usa.#");
-	listener.prepareQueue("europe", "europe.#");
-	listener.prepareQueue("news", "#.news");
-	listener.prepareQueue("weather", "#.weather");
+        listener.prepareQueue("usa", exchange, "usa.#");
+        listener.prepareQueue("europe", exchange, "europe.#");
+        listener.prepareQueue("news", exchange, "#.news");
+        listener.prepareQueue("weather", exchange, "#.weather");
 
-	std::cout << "Listening for messages ..." << std::endl;
+        std::cout << "Listening for messages ..." << std::endl;
 
         // Give up control and receive messages
         listener.listen();

Modified: incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp Thu Oct 23 17:45:11 2008
@@ -60,7 +60,7 @@
 using std::stringstream;
 using std::string;
 
-void publish_messages(Session& session, string routing_key)
+void publish_messages(Session& session, string exchange, string routing_key)
 {
   Message message;
 
@@ -75,7 +75,7 @@
     message.setData(message_data.str());
     // Asynchronous transfer sends messages as quickly as
     // possible without waiting for confirmation.
-    async(session).messageTransfer(arg::content=message, arg::destination="amq.topic");
+    async(session).messageTransfer(arg::content=message, arg::destination=exchange);
   }
 
 }
@@ -88,18 +88,19 @@
  *
  */
 
-void no_more_messages(Session& session)
+void no_more_messages(Session& session, string exchange)
 {
   Message message;
 
   message.getDeliveryProperties().setRoutingKey("control"); 
   message.setData("That's all, folks!");
-  session.messageTransfer(arg::content=message, arg::destination="amq.topic"); 
+  session.messageTransfer(arg::content=message, arg::destination=exchange); 
 }
 
 int main(int argc, char** argv) {
     const char* host = argc>1 ? argv[1] : "127.0.0.1";
     int port = argc>2 ? atoi(argv[2]) : 5672;
+    std::string exchange = argc>3 ? argv[3] : "amq.topic";
     Connection connection;
     Message message;
     try {
@@ -108,12 +109,12 @@
 
   //--------- Main body of program --------------------------------------------
 
-	publish_messages(session, "usa.news");
-	publish_messages(session, "usa.weather");
-	publish_messages(session, "europe.news");
-	publish_messages(session, "europe.weather");
+        publish_messages(session, exchange, "usa.news");
+        publish_messages(session, exchange, "usa.weather");
+        publish_messages(session, exchange, "europe.news");
+        publish_messages(session, exchange, "europe.weather");
 
-	no_more_messages(session);
+        no_more_messages(session, exchange);
 
   //-----------------------------------------------------------------------------
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Oct 23 17:45:11 2008
@@ -34,6 +34,18 @@
 using qpid::management::ManagementAgent;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
+namespace 
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
 namespace qpid {
 namespace broker {
 
@@ -45,8 +57,9 @@
 Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
                const _qmf::ArgsLinkBridge& _args) : 
     link(_link), id(_id), args(_args), mgmtObject(0),
-    listener(l), name(Uuid(true).str()), persistenceId(0)
+    listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0)
 {
+    queueName += name;
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0) {
         mgmtObject = new _qmf::Bridge
@@ -65,7 +78,10 @@
 
 void Bridge::create(ConnectionState& c)
 {
+    connState = &c;
     if (args.i_srcIsLocal) {
+        if (args.i_dynamic)
+            throw Exception("Dynamic routing not supported for push routes");
         // Point the bridging commands at the local connection handler
         Connection* conn = dynamic_cast<Connection*>(&c);
         if (conn == 0)
@@ -74,7 +90,7 @@
         channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
     } else {
         // Point the bridging commands at the remote peer broker
-        channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+        channelHandler.reset(new framing::ChannelHandler(id, &(connState->getOutput())));
     }
 
     session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
@@ -88,8 +104,6 @@
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
     } else {
-        string queue = "bridge_queue_";
-        queue += Uuid(true).str();
         FieldTable queueSettings;
 
         if (args.i_tag.size()) {
@@ -103,19 +117,26 @@
         if (args.i_excludes.size()) {
             queueSettings.setString("qpid.trace.exclude", args.i_excludes);
         } else {
-            const string& peerTag = c.getFederationPeerTag();
+            const string& peerTag = connState->getFederationPeerTag();
             if (peerTag.size())
                 queueSettings.setString("qpid.trace.exclude", peerTag);
         }
 
         bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
         bool autoDelete = !durable;//auto delete transient queues?
-        peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+        peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings);
         if (!args.i_dynamic)
-            peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
-        peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+            peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
+        peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+        if (args.i_dynamic) {
+            Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+            if (exchange.get() == 0)
+                throw Exception("Exchange not found for dynamic route");
+            exchange->registerDynamicBridge(this);
+        }
     }
 }
 
@@ -123,6 +144,11 @@
 {
     peer->getMessage().cancel(args.i_dest);
     peer->getSession().detach(name);
+    if (args.i_dynamic) {
+        Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+        if (exchange.get() != 0)
+            exchange->removeDynamicBridge(this);
+    }
 }
 
 void Bridge::destroy()
@@ -220,4 +246,46 @@
     }
 }
 
+void Bridge::propagateBinding(const string& key, const string& tagList,
+                              const string& op,  const string& origin)
+{
+    const string& localTag = link->getBroker()->getFederationTag();
+    const string& peerTag  = connState->getFederationPeerTag();
+
+    if (tagList.find(peerTag) == tagList.npos) {
+         FieldTable bindArgs;
+         string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag);
+
+         bindArgs.setString(qpidFedOp, op);
+         bindArgs.setString(qpidFedTags, newTagList);
+         if (origin.empty())
+             bindArgs.setString(qpidFedOrigin, localTag);
+         else
+             bindArgs.setString(qpidFedOrigin, origin);
+
+         peer->getExchange().bind(queueName, args.i_src, key, bindArgs);
+    }
+}
+
+void Bridge::sendReorigin()
+{
+    FieldTable bindArgs;
+
+    bindArgs.setString(qpidFedOp, fedOpReorigin);
+    bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
+
+    peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs);
+}
+
+bool Bridge::containsLocalTag(const string& tagList) const
+{
+    const string& localTag = link->getBroker()->getFederationTag();
+    return (tagList.find(localTag) != tagList.npos);
+}
+
+const string& Bridge::getLocalTag() const
+{
+    return link->getBroker()->getFederationTag();
+}
+
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Thu Oct 23 17:45:11 2008
@@ -27,6 +27,7 @@
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/management/Manageable.h"
+#include "Exchange.h"
 #include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
 #include "qmf/org/apache/qpid/broker/Bridge.h"
 
@@ -41,7 +42,7 @@
 class Link;
 class LinkRegistry;
 
-class Bridge : public PersistableConfig, public management::Manageable
+class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
 {
 public:
     typedef boost::shared_ptr<Bridge> shared_ptr;
@@ -69,6 +70,12 @@
     const std::string& getName() const;
     static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
 
+    // Exchange::DynamicBridge methods
+    void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin);
+    void sendReorigin();
+    bool containsLocalTag(const std::string& tagList) const;
+    const std::string& getLocalTag() const;
+
 private:
     struct PushHandler : framing::FrameHandler {
         PushHandler(Connection* c) { conn = c; }
@@ -87,7 +94,9 @@
     qmf::org::apache::qpid::broker::Bridge*        mgmtObject;
     CancellationListener        listener;
     std::string name;
+    std::string queueName;
     mutable uint64_t  persistenceId;
+    ConnectionState* connState;
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Oct 23 17:45:11 2008
@@ -28,25 +28,45 @@
 using qpid::management::Manageable;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
+namespace 
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
 DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
 {
     if (mgmtExchange != 0)
-        mgmtExchange->set_type (typeName);
+        mgmtExchange->set_type(typeName);
 }
 
-DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+DirectExchange::DirectExchange(const string& _name, bool _durable,
                                const FieldTable& _args, Manageable* _parent) :
     Exchange(_name, _durable, _args, _parent)
 {
     if (mgmtExchange != 0)
-        mgmtExchange->set_type (typeName);
+        mgmtExchange->set_type(typeName);
 }
 
-bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
-    {
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+{
+    string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+    string fedTags(args ? args->getAsString(qpidFedTags) : "");
+    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+    bool propagate = false;
+
+    if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
         Mutex::ScopedLock l(lock);
-        Binding::shared_ptr b(new Binding (routingKey, queue, this));
-        if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
+        Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin));
+        BoundKey& bk = bindings[routingKey];
+        if (bk.queues.add_unless(b, MatchQueue(queue))) {
+            propagate = bk.fedBinding.addOrigin(fedOrigin);
             if (mgmtExchange != 0) {
                 mgmtExchange->inc_bindingCount();
                 ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -54,30 +74,58 @@
         } else {
             return false;
         }
+    } else if (fedOp == fedOpUnbind) {
+        Mutex::ScopedLock l(lock);
+        BoundKey& bk = bindings[routingKey];
+        propagate = bk.fedBinding.delOrigin(fedOrigin);
+        if (bk.fedBinding.count() == 0)
+            unbind(queue, routingKey, 0);
+    } else if (fedOp == fedOpReorigin) {
+        for (std::map<string, BoundKey>::iterator iter = bindings.begin();
+             iter != bindings.end(); iter++) {
+            const BoundKey& bk = iter->second;
+            if (bk.fedBinding.hasLocal()) {
+                propagateFedOp(iter->first, string(), fedOpBind, string());
+            }
+        }
     }
+
     routeIVE();
+    if (propagate)
+        propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
     return true;
 }
 
-bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
-    Mutex::ScopedLock l(lock);
-    if (bindings[routingKey].remove_if(MatchQueue(queue))) {
-        if (mgmtExchange != 0) {
-            mgmtExchange->dec_bindingCount();
-            ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+{
+    bool propagate = false;
+
+    {
+        Mutex::ScopedLock l(lock);
+        BoundKey& bk = bindings[routingKey];
+        if (bk.queues.remove_if(MatchQueue(queue))) {
+            propagate = bk.fedBinding.delOrigin();
+            if (mgmtExchange != 0) {
+                mgmtExchange->dec_bindingCount();
+                ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
+            }
+        } else {
+            return false;
         }
-        return true;
-    } else {
-        return false;
     }
+
+    if (propagate)
+        propagateFedOp(routingKey, string(), fedOpUnbind, string());
+    return true;
 }
 
-void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
     PreRoute pr(msg, this);
     Queues::ConstPtr p;
     {
         Mutex::ScopedLock l(lock);
-        p = bindings[routingKey].snapshot();
+        p = bindings[routingKey].queues.snapshot();
     }
     int count(0);
 
@@ -85,26 +133,26 @@
         for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
             msg.deliverTo((*i)->queue);
             if ((*i)->mgmtBinding != 0)
-                (*i)->mgmtBinding->inc_msgMatched ();
+                (*i)->mgmtBinding->inc_msgMatched();
         }
     }
 
     if(!count){
         QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
         if (mgmtExchange != 0) {
-            mgmtExchange->inc_msgDrops  ();
-            mgmtExchange->inc_byteDrops (msg.contentSize ());
+            mgmtExchange->inc_msgDrops();
+            mgmtExchange->inc_byteDrops(msg.contentSize());
         }
     } else {
         if (mgmtExchange != 0) {
-            mgmtExchange->inc_msgRoutes  (count);
-            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+            mgmtExchange->inc_msgRoutes(count);
+            mgmtExchange->inc_byteRoutes(count * msg.contentSize());
         }
     }
 
     if (mgmtExchange != 0) {
-        mgmtExchange->inc_msgReceives  ();
-        mgmtExchange->inc_byteReceives (msg.contentSize ());
+        mgmtExchange->inc_msgReceives();
+        mgmtExchange->inc_byteReceives(msg.contentSize());
     }
 }
 
@@ -120,14 +168,14 @@
         if (!queue)
             return true;
 
-        Queues::ConstPtr p = i->second.snapshot();
+        Queues::ConstPtr p = i->second.queues.snapshot();
         return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
     } else if (!queue) {
         //if no queue or routing key is specified, just report whether any bindings exist
         return bindings.size() > 0;
     } else {
         for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
-            Queues::ConstPtr p = i->second.snapshot();
+            Queues::ConstPtr p = i->second.queues.snapshot();
             if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
         }
         return false;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Thu Oct 23 17:45:11 2008
@@ -31,33 +31,35 @@
 
 namespace qpid {
 namespace broker {
-    class DirectExchange : public virtual Exchange{
-        typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues;
-        typedef std::map<string, Queues> Bindings;
-        Bindings bindings;
-        qpid::sys::Mutex lock;
+class DirectExchange : public virtual Exchange {
+    typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues;
+    struct BoundKey {
+        Queues     queues;
+        FedBinding fedBinding;
+    };
+    typedef std::map<string, BoundKey> Bindings;
+    Bindings bindings;
+    qpid::sys::Mutex lock;
 
-    public:
-        static const std::string typeName;
+public:
+    static const std::string typeName;
         
-        DirectExchange(const std::string& name, management::Manageable* parent = 0);
-        DirectExchange(const string& _name, bool _durable, 
-                       const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
+    DirectExchange(const std::string& name, management::Manageable* parent = 0);
+    DirectExchange(const string& _name, bool _durable, 
+                   const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
 
-        virtual std::string getType() const { return typeName; }            
+    virtual std::string getType() const { return typeName; }            
         
-        virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
-        virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+    virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+    virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+    virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+    virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
 
-        virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+    virtual ~DirectExchange();
 
-        virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
-
-        virtual ~DirectExchange();
-    };
-}
-}
+    virtual bool supportsDynamicBinding() { return true; }
+};
 
+}}
 
 #endif

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Oct 23 17:45:11 2008
@@ -40,6 +40,14 @@
 {
 const std::string qpidMsgSequence("qpid.msg_sequence");
 const std::string qpidIVE("qpid.ive");
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
 }
 
 
@@ -73,7 +81,7 @@
 
 Exchange::Exchange (const string& _name, Manageable* parent) :
     name(_name), durable(false), persistenceId(0), sequence(false), 
-	sequenceNo(0), ive(false), mgmtExchange(0)
+    sequenceNo(0), ive(false), mgmtExchange(0)
 {
     if (parent != 0)
     {
@@ -89,7 +97,7 @@
 Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                    Manageable* parent)
     : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), 
-	sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
+      sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
 {
     if (parent != 0)
     {
@@ -107,7 +115,7 @@
             }
         }
     }
-	
+
     sequence = _args.get(qpidMsgSequence);
     if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
 
@@ -166,8 +174,46 @@
     return (ManagementObject*) mgmtExchange;
 }
 
+void Exchange::registerDynamicBridge(DynamicBridge* db)
+{
+    if (!supportsDynamicBinding())
+        throw Exception("Exchange type does not support dynamic binding");
+
+    for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+         iter != bridgeVector.end(); iter++)
+        (*iter)->sendReorigin();
+
+    bridgeVector.push_back(db);
+    FieldTable args;
+    args.setString(qpidFedOp, fedOpReorigin);
+    bind(Queue::shared_ptr(), string(), &args);
+}
+
+void Exchange::removeDynamicBridge(DynamicBridge* db)
+{
+    for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+         iter != bridgeVector.end(); iter++)
+        if (*iter == db) {
+            bridgeVector.erase(iter);
+            break;
+        }
+}
+
+void Exchange::handleHelloRequest()
+{
+}
+
+void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin)
+{
+    string myOp(op.empty() ? fedOpBind : op);
+
+    for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+         iter != bridgeVector.end(); iter++)
+        (*iter)->propagateBinding(routingKey, tags, op, origin);
+}
+
 Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent,
-                           FieldTable _args)
+                           FieldTable _args, const string& origin)
     : queue(_queue), key(_key), args(_args), mgmtBinding(0)
 {
     if (parent != 0)
@@ -181,6 +227,8 @@
                 management::ObjectId queueId = mo->getObjectId();
                 mgmtBinding = new _qmf::Binding
                     (agent, this, (Manageable*) parent, queueId, key, args);
+                if (!origin.empty())
+                    mgmtBinding->set_origin(origin);
                 agent->addObject (mgmtBinding);
             }
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Oct 23 17:45:11 2008
@@ -74,7 +74,7 @@
         qmf::org::apache::qpid::broker::Binding* mgmtBinding;
 
         Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
-                framing::FieldTable args = framing::FieldTable());
+                framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
         ~Binding();
         management::ManagementObject* GetManagementObject() const;
     };
@@ -85,6 +85,34 @@
         bool operator()(Exchange::Binding::shared_ptr b);
     };
 
+    class FedBinding {
+        uint32_t localBindings;
+        std::set<std::string> originSet;
+    public:
+        FedBinding() : localBindings(0) {}
+        bool hasLocal() const { return localBindings != 0; }
+        bool addOrigin(const std::string& origin) {
+            if (origin.empty()) {
+                localBindings++;
+                return localBindings == 1;
+            }
+            originSet.insert(origin);
+            return true;
+        }
+        bool delOrigin(const std::string& origin) {
+            originSet.erase(origin);
+            return true;
+        }
+        bool delOrigin() {
+            if (localBindings > 0)
+                localBindings--;
+            return localBindings == 0;
+        }
+        uint32_t count() {
+            return localBindings + originSet.size();
+        }
+    };
+
     qmf::org::apache::qpid::broker::Exchange* mgmtExchange;
 
 public:
@@ -121,6 +149,27 @@
 
     // Manageable entry points
     management::ManagementObject* GetManagementObject(void) const;
+
+    // Federation hooks
+    class DynamicBridge {
+    public:
+        virtual ~DynamicBridge() {}
+        virtual void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin) = 0;
+        virtual void sendReorigin() = 0;
+        virtual bool containsLocalTag(const std::string& tagList) const = 0;
+        virtual const std::string& getLocalTag() const = 0;
+    };
+
+    void registerDynamicBridge(DynamicBridge* db);
+    void removeDynamicBridge(DynamicBridge* db);
+    virtual bool supportsDynamicBinding() { return false; }
+
+protected:
+    std::vector<DynamicBridge*> bridgeVector;
+
+    virtual void handleHelloRequest();
+    void propagateFedOp(const std::string& routingKey, const std::string& tags,
+                        const std::string& op,         const std::string& origin);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Oct 23 17:45:11 2008
@@ -26,6 +26,18 @@
 using namespace qpid::sys;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
+namespace 
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
 FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) :
     Exchange(_name, _parent)
 {
@@ -41,32 +53,57 @@
         mgmtExchange->set_type (typeName);
 }
 
-bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args)
 {
-    Binding::shared_ptr binding (new Binding ("", queue, this));
-    if (bindings.add_unless(binding, MatchQueue(queue))) {
-        if (mgmtExchange != 0) {
-            mgmtExchange->inc_bindingCount();
-            ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+    string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+    string fedTags(args ? args->getAsString(qpidFedTags) : "");
+    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+    bool propagate = false;
+
+    if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
+        Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
+        if (bindings.add_unless(binding, MatchQueue(queue))) {
+            propagate = fedBinding.addOrigin(fedOrigin);
+            if (mgmtExchange != 0) {
+                mgmtExchange->inc_bindingCount();
+                ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+            }
+        } else {
+            return false;
+        }
+    } else if (fedOp == fedOpUnbind) {
+        propagate = fedBinding.delOrigin(fedOrigin);
+        if (fedBinding.count() == 0)
+            unbind(queue, "", 0);
+    } else if (fedOp == fedOpReorigin) {
+        if (fedBinding.hasLocal()) {
+            propagateFedOp(string(), string(), fedOpBind, string());
         }
-        routeIVE();
-        return true;
-    } else {
-        return false;
     }
+
+    routeIVE();
+    if (propagate)
+        propagateFedOp(string(), fedTags, fedOp, fedOrigin);
+    return true;
 }
 
 bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
 {
+    bool propagate = false;
+
     if (bindings.remove_if(MatchQueue(queue))) {
+        propagate = fedBinding.delOrigin();
         if (mgmtExchange != 0) {
             mgmtExchange->dec_bindingCount();
             ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
         }
-        return true;
     } else {
         return false;
     }
+
+    if (propagate)
+        propagateFedOp(string(), string(), fedOpUnbind, string());
+    return true;
 }
 
 void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Thu Oct 23 17:45:11 2008
@@ -34,6 +34,7 @@
 class FanOutExchange : public virtual Exchange {
     typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> BindingsArray;
     BindingsArray bindings;
+    FedBinding fedBinding;
   public:
     static const std::string typeName;
         
@@ -53,6 +54,7 @@
     virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
 
     virtual ~FanOutExchange();
+    virtual bool supportsDynamicBinding() { return true; }
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Thu Oct 23 17:45:11 2008
@@ -336,7 +336,7 @@
     return (ManagementObject*) mgmtObject;
 }
 
-Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args, string&)
+Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args, string& text)
 {
     switch (op)
     {
@@ -350,8 +350,22 @@
         _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
 
         // Durable bridges are only valid on durable links
-        if (iargs.i_durable && !durable)
-            return Manageable::STATUS_INVALID_PARAMETER;
+        if (iargs.i_durable && !durable) {
+            text = "Can't create a durable route on a non-durable link";
+            return Manageable::STATUS_USER;
+        }
+
+        if (iargs.i_dynamic) {
+            Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
+            if (exchange.get() == 0) {
+                text = "Exchange not found";
+                return Manageable::STATUS_USER;
+            }
+            if (!exchange->supportsDynamicBinding()) {
+                text = "Exchange type does not support dynamic routing";
+                return Manageable::STATUS_USER;
+            }
+        }
 
         std::pair<Bridge::shared_ptr, bool> result =
             links->declare (host, port, iargs.i_durable, iargs.i_src,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Oct 23 17:45:11 2008
@@ -31,6 +31,18 @@
 // - excessive string copying: should be 0 copy, match from original buffer.
 // - match/lookup: use descision tree or other more efficient structure.
 
+namespace 
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
 Tokens& Tokens::operator=(const std::string& s) {
     clear();
     if (s.empty()) return *this;
@@ -51,6 +63,15 @@
     return *this;
 }
 
+void Tokens::key(string& keytext) const
+{
+    for (std::vector<string>::const_iterator iter = begin(); iter != end(); iter++) {
+        if (iter != begin())
+            keytext += ".";
+        keytext += *iter;
+    }
+}
+
 namespace {
 const std::string hashmark("#");
 const std::string star("*");
@@ -81,7 +102,7 @@
 
 
 namespace {
-// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// TODO aconway 2006-09-20: Inefficient to convert every routingKey to a string.
 // Need StringRef class that operates on a string in place witout copy.
 // Should be applied everywhere strings are extracted from frames.
 // 
@@ -130,30 +151,63 @@
         mgmtExchange->set_type (typeName);
 }
 
-bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
-    {
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+{
+    string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+    string fedTags(args ? args->getAsString(qpidFedTags) : "");
+    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+    bool propagate = false;
+    bool reallyUnbind;
+    TopicPattern routingPattern(routingKey);
+
+    if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
         RWlock::ScopedWlock l(lock);
-        TopicPattern routingPattern(routingKey);
         if (isBound(queue, routingPattern)) {
             return false;
         } else {
-            Binding::shared_ptr binding (new Binding (routingKey, queue, this));
-            bindings[routingPattern].push_back(binding);
+            Binding::shared_ptr binding (new Binding (routingKey, queue, this, FieldTable(), fedOrigin));
+            BoundKey& bk = bindings[routingPattern];
+            bk.bindingVector.push_back(binding);
+            propagate = bk.fedBinding.addOrigin(fedOrigin);
             if (mgmtExchange != 0) {
                 mgmtExchange->inc_bindingCount();
                 ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
             }
         }
+    } else if (fedOp == fedOpUnbind) {
+        {
+            RWlock::ScopedWlock l(lock);
+            BoundKey& bk = bindings[routingPattern];
+            propagate = bk.fedBinding.delOrigin(fedOrigin);
+            reallyUnbind = bk.fedBinding.count() == 0;
+        }
+        if (reallyUnbind)
+            unbind(queue, routingKey, 0);
+    } else if (fedOp == fedOpReorigin) {
+        for (std::map<TopicPattern, BoundKey>::iterator iter = bindings.begin();
+             iter != bindings.end(); iter++) {
+            const BoundKey& bk = iter->second;
+            if (bk.fedBinding.hasLocal()) {
+                string propKey;
+                iter->first.key(propKey);
+                propagateFedOp(propKey, string(), fedOpBind, string());
+            }
+        }
     }
+
     routeIVE();
+    if (propagate)
+        propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
     return true;
 }
 
 bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
     RWlock::ScopedWlock l(lock);
     BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
-    Binding::vector& qv(bi->second);
     if (bi == bindings.end()) return false;
+    BoundKey& bk = bi->second;
+    Binding::vector& qv(bk.bindingVector);
+    bool propagate = false;
 
     Binding::vector::iterator q;
     for (q = qv.begin(); q != qv.end(); q++)
@@ -161,11 +215,15 @@
             break;
     if(q == qv.end()) return false;
     qv.erase(q);
+    propagate = bk.fedBinding.delOrigin();
     if(qv.empty()) bindings.erase(bi);
     if (mgmtExchange != 0) {
         mgmtExchange->dec_bindingCount();
         ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
     }
+
+    if (propagate)
+        propagateFedOp(routingKey, string(), fedOpUnbind, string());
     return true;
 }
 
@@ -173,7 +231,7 @@
 {
     BindingMap::iterator bi = bindings.find(pattern);
     if (bi == bindings.end()) return false;
-    Binding::vector& qv(bi->second);
+    Binding::vector& qv(bi->second.bindingVector);
     Binding::vector::iterator q;
     for (q = qv.begin(); q != qv.end(); q++)
         if ((*q)->queue == queue)
@@ -189,7 +247,7 @@
 
     for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
         if (i->first.match(tokens)) {
-            Binding::vector& qv(i->second);
+            Binding::vector& qv(i->second.bindingVector);
             for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
                 msg.deliverTo((*j)->queue);
                 if ((*j)->mgmtBinding != 0)
@@ -230,7 +288,7 @@
         }            
     } else {
         for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
-            Binding::vector& qv(i->second);
+            Binding::vector& qv(i->second.bindingVector);
             Binding::vector::iterator q;
             for (q = qv.begin(); q != qv.end(); q++)
                 if ((*q)->queue == queue)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Thu Oct 23 17:45:11 2008
@@ -41,6 +41,7 @@
     Tokens(const std::string& s) { operator=(s); }
     /** Tokenizing assignment operator s */
     Tokens & operator=(const std::string& s);
+    void key(std::string& key) const;
     
   private:
     size_t hash;
@@ -70,8 +71,12 @@
     void normalize();
 };
 
-class TopicExchange : public virtual Exchange{
-    typedef std::map<TopicPattern, Binding::vector> BindingMap;
+class TopicExchange : public virtual Exchange {
+    struct BoundKey {
+        Binding::vector bindingVector;
+        FedBinding fedBinding;
+    };
+    typedef std::map<TopicPattern, BoundKey> BindingMap;
     BindingMap bindings;
     qpid::sys::RWlock lock;
 
@@ -94,6 +99,7 @@
     virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
 
     virtual ~TopicExchange();
+    virtual bool supportsDynamicBinding() { return true; }
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Thu Oct 23 17:45:11 2008
@@ -20,7 +20,6 @@
 
 import sys
 from qpid.testlib import TestBase010, testrunner
-from qpid.management import managementChannel, managementClient
 from qpid.datatypes import Message
 from qpid.queue import Empty
 from time import sleep
@@ -54,68 +53,45 @@
 def remote_port():
     return int(scan_args("--remote-port"))
 
-class Helper:
-    def __init__(self, parent):
-        self.parent = parent
-        self.session = parent.conn.session("Helper")
-        self.mc  = managementClient(self.session.spec)
-        self.mch = self.mc.addChannel(self.session)
-        self.mc.syncWaitForStable(self.mch)
-
-    def shutdown (self):
-        self.mc.removeChannel (self.mch)
-
-    def get_objects(self, type):
-        return self.mc.syncGetObjects(self.mch, type)
-
-    def get_object(self, type, position = 1, expected = None):
-        objects = self.get_objects(type)
-        if not expected: expected = position
-        self.assertEqual(len(objects), expected)
-        return objects[(position - 1)]
-
-        
-    def call_method(self, object, method, args=None):
-        res = self.mc.syncCallMethod(self.mch, object.id, object.classKey, method, args)
-        self.assertEqual(res.status,     0)
-        self.assertEqual(res.statusText, "OK")
-        return res
-    
-    def assertEqual(self, a, b):
-        self.parent.assertEqual(a, b)
-
 class FederationTests(TestBase010):
 
     def test_bridge_create_and_close(self):
-        mgmt = Helper(self)
-        broker = mgmt.get_object("broker")
+        self.startQmf();
+        qmf = self.qmf
 
-        mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
-        link = mgmt.get_object("link")
-            
-        mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.direct", "key":"my-key"})
-        bridge = mgmt.get_object("bridge")
-            
-        mgmt.call_method(bridge, "close")
-        mgmt.call_method(link, "close")
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False)
+        self.assertEqual(result.status, 0)
+
+        bridge = qmf.getObjects(_class="bridge")[0]
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
 
-        sleep(6)
-        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
-        self.assertEqual(len(mgmt.get_objects("link")), 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
 
-        mgmt.shutdown ()
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
 
     def test_pull_from_exchange(self):
         session = self.session
         
-        mgmt = Helper(self)
-        broker = mgmt.get_object("broker")
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False)
+        self.assertEqual(result.status, 0)
 
-        mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
-        link = mgmt.get_object("link")
-        
-        mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key"})
-        bridge = mgmt.get_object("bridge")
+        bridge = qmf.getObjects(_class="bridge")[0]
 
         #setup queue to receive messages from local broker
         session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
@@ -140,27 +116,29 @@
             self.fail("Got unexpected message in queue: " + extra.body)
         except Empty: None
 
-        mgmt.call_method(bridge, "close")
-        mgmt.call_method(link, "close")
-        sleep(6)
-        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
-        self.assertEqual(len(mgmt.get_objects("link")), 0)
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
 
-        mgmt.shutdown()
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
 
     def test_push_to_exchange(self):
         session = self.session
         
-        mgmt = Helper(self)
-        broker = mgmt.get_object("broker")
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False)
+        self.assertEqual(result.status, 0)
 
-        mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
-        link = mgmt.get_object("link")
-        
-        mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout",
-                                          "key":"my-key", "tag":"", "excludes":"", "srcIsQueue":0,
-                                          "srcIsLocal":1})
-        bridge = mgmt.get_object("bridge")
+        bridge = qmf.getObjects(_class="bridge")[0]
 
         #setup queue to receive messages from remote broker
         r_conn = self.connect(host=remote_host(), port=remote_port())
@@ -184,13 +162,14 @@
             self.fail("Got unexpected message in queue: " + extra.body)
         except Empty: None
 
-        mgmt.call_method(bridge, "close")
-        mgmt.call_method(link, "close")
-        sleep(6)
-        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
-        self.assertEqual(len(mgmt.get_objects("link")), 0)
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
 
-        mgmt.shutdown()
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
 
     def test_pull_from_queue(self):
         session = self.session
@@ -209,16 +188,18 @@
         self.subscribe(queue="fed1", destination="f1")
         queue = session.incoming("f1")
 
-        mgmt = Helper(self)
-        broker = mgmt.get_object("broker")
-
-        mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
-        link = mgmt.get_object("link")
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False)
+        self.assertEqual(result.status, 0)
 
-        mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout",
-                                          "key":"", "tag":"", "excludes":"", "srcIsQueue":1})
-        sleep(6)
-        bridge = mgmt.get_object("bridge")
+        bridge = qmf.getObjects(_class="bridge")[0]
+        sleep(3)
 
         #add some more messages (i.e. after bridge was created)
         for i in range(6, 11):
@@ -236,14 +217,14 @@
             self.fail("Got unexpected message in queue: " + extra.body)
         except Empty: None
 
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
 
-        mgmt.call_method(bridge, "close")
-        mgmt.call_method(link, "close")
-        sleep(6)
-        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
-        self.assertEqual(len(mgmt.get_objects("link")), 0)
-
-        mgmt.shutdown ()
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
 
     def test_tracing_automatic(self):
         remoteUrl = "%s:%d" % (remote_host(), remote_port())
@@ -307,22 +288,24 @@
     def test_tracing(self):
         session = self.session
         
-        mgmt = Helper(self)
-        broker = mgmt.get_object("broker")
-
-        mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
-        link = mgmt.get_object("link")
-        
-        mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key",
-                                          "tag":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"})
-        sleep(6)
-        bridge = mgmt.get_object("bridge")
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id",
+                             "exclude-me,also-exclude-me", False, False, False)
+        self.assertEqual(result.status, 0)
+        bridge = qmf.getObjects(_class="bridge")[0]
 
         #setup queue to receive messages from local broker
         session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
         session.exchange_bind(queue="fed1", exchange="amq.fanout")
         self.subscribe(queue="fed1", destination="f1")
         queue = session.incoming("f1")
+        sleep(6)
 
         #send messages to remote broker and confirm it is routed to local broker
         r_conn = self.connect(host=remote_host(), port=remote_port())
@@ -347,13 +330,155 @@
             self.fail("Got unexpected message in queue: " + extra.body)
         except Empty: None
 
-        mgmt.call_method(bridge, "close")
-        mgmt.call_method(link, "close")
-        sleep(6)
-        self.assertEqual(len(mgmt.get_objects("bridge")), 0)
-        self.assertEqual(len(mgmt.get_objects("link")), 0)
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+    def test_dynamic_fanout(self):
+        session = self.session
+        r_conn = self.connect(host=remote_host(), port=remote_port())
+        r_session = r_conn.session("test_dynamic_fanout")
+
+        session.exchange_declare(exchange="fed.fanout", type="fanout")
+        r_session.exchange_declare(exchange="fed.fanout", type="fanout")
+
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True)
+        self.assertEqual(result.status, 0)
+        bridge = qmf.getObjects(_class="bridge")[0]
+        sleep(5)
+
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="fed.fanout")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        for i in range(1, 11):
+            dp = r_session.delivery_properties()
+            r_session.message_transfer(destination="fed.fanout", message=Message(dp, "Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            self.assertEqual("Message %d" % i, msg.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
+    def test_dynamic_direct(self):
+        session = self.session
+        r_conn = self.connect(host=remote_host(), port=remote_port())
+        r_session = r_conn.session("test_dynamic_direct")
+
+        session.exchange_declare(exchange="fed.direct", type="direct")
+        r_session.exchange_declare(exchange="fed.direct", type="direct")
+
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True)
+        self.assertEqual(result.status, 0)
+        bridge = qmf.getObjects(_class="bridge")[0]
+        sleep(5)
+
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="fed.direct", binding_key="fd-key")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        for i in range(1, 11):
+            dp = r_session.delivery_properties(routing_key="fd-key")
+            r_session.message_transfer(destination="fed.direct", message=Message(dp, "Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            self.assertEqual("Message %d" % i, msg.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
+    def test_dynamic_topic(self):
+        session = self.session
+        r_conn = self.connect(host=remote_host(), port=remote_port())
+        r_session = r_conn.session("test_dynamic_topic")
+
+        session.exchange_declare(exchange="fed.topic", type="topic")
+        r_session.exchange_declare(exchange="fed.topic", type="topic")
+
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True)
+        self.assertEqual(result.status, 0)
+        bridge = qmf.getObjects(_class="bridge")[0]
+        sleep(5)
+
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="ft-key.#")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        for i in range(1, 11):
+            dp = r_session.delivery_properties(routing_key="ft-key.one.two")
+            r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i))
+
+        for i in range(1, 11):
+            msg = queue.get(timeout=5)
+            self.assertEqual("Message %d" % i, msg.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        result = bridge.close()
+        self.assertEqual(result.status, 0)
+        result = link.close()
+        self.assertEqual(result.status, 0)
+
+        sleep(3)
+        self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+        self.assertEqual(len(qmf.getObjects(_class="link")), 0)
 
-        mgmt.shutdown ()
 
     def getProperty(self, msg, name):
         for h in msg.headers:
@@ -364,7 +489,8 @@
         headers = self.getProperty(msg, "application_headers")
         if headers:
             return headers[name]
-        return None            
+        return None
+
 
 if __name__ == '__main__':
     args = sys.argv[1:]

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Thu Oct 23 17:45:11 2008
@@ -34,6 +34,8 @@
     print "        qpid-route [OPTIONS] route del   <dest-broker> <src-broker> <exchange> <routing-key>"
     print "        qpid-route [OPTIONS] route list  [<dest-broker>]"
     print "        qpid-route [OPTIONS] route flush [<dest-broker>]"
+    print "        qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
+    print "        qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
     print
     print "Options:"
     print "    -v [ --verbose ]         Verbose output"
@@ -118,7 +120,7 @@
                 print "%-16s%-8d   %c     %-18s%s" % \
                 (link.host, link.port, YN(link.durable), link.state, link.lastError)
 
-    def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes):
+    def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes, dynamic=False):
         self.src = qmfconsole.BrokerURL(srcBroker)
         if self.dest.match(self.src.host, self.src.port):
             raise Exception("Linking broker to itself is not permitted")
@@ -155,13 +157,13 @@
 
         if _verbose:
             print "Creating inter-broker binding..."
-        res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, False)
-        if res.status == 4:
-            raise Exception("Can't create a durable route on a non-durable link")
+        res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, dynamic)
+        if res.status != 0:
+            raise Exception(res.text)
         if _verbose:
             print "Bridge method returned:", res.status, res.text
 
-    def DelRoute (self, srcBroker, exchange, routingKey):
+    def DelRoute (self, srcBroker, exchange, routingKey, dynamic=False):
         self.src = qmfconsole.BrokerURL(srcBroker)
         link = self.getLink()
         if link == None:
@@ -171,7 +173,8 @@
 
         bridges = self.qmf.getObjects(_class="bridge")
         for bridge in bridges:
-            if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey:
+            if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
+                    and bridge.dynamic == dynamic:
                 if _verbose:
                     print "Closing bridge..."
                 res = bridge.close()
@@ -201,7 +204,11 @@
                     myLink = link
                     break
             if myLink != None:
-                print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
+                if bridge.dynamic:
+                    keyText = "<dynamic>"
+                else:
+                    keyText = bridge.key
+                print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, keyText)
 
     def ClearAllRoutes (self):
         links   = self.qmf.getObjects(_class="link")
@@ -285,6 +292,22 @@
         elif cmd == "list":
             rm.ListLinks ()
 
+    elif group == "dynamic":
+        if cmd == "add":
+            if nargs < 5 or nargs > 7:
+                Usage ()
+
+            tag = ""
+            excludes = ""
+            if nargs > 5: tag = cargs[5]     
+            if nargs > 6: excludes = cargs[6]     
+            rm.AddRoute (cargs[3], cargs[4], "", tag, excludes, dynamic=True)
+        elif cmd == "del":
+            if nargs != 5:
+                Usage ()
+            else:
+                rm.DelRoute (cargs[3], cargs[4], "", dynamic=True)
+
     elif group == "route":
         if cmd == "add":
             if nargs < 6 or nargs > 8:
@@ -294,12 +317,12 @@
             excludes = ""
             if nargs > 6: tag = cargs[6]     
             if nargs > 7: excludes = cargs[7]     
-            rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes)
+            rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes, dynamic=False)
         elif cmd == "del":
             if nargs != 6:
                 Usage ()
             else:
-                rm.DelRoute (cargs[3], cargs[4], cargs[5])
+                rm.DelRoute (cargs[3], cargs[4], cargs[5], dynamic=False)
         else:
             if   cmd == "list":
                 rm.ListRoutes ()

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Thu Oct 23 17:45:11 2008
@@ -196,6 +196,7 @@
     <property name="queueRef"    type="objId" references="Queue"    access="RC" index="y"/>
     <property name="bindingKey"  type="sstr"  access="RC" index="y"/>
     <property name="arguments"   type="map"   access="RC"/>
+    <property name="origin"      type="sstr"  access="RO" optional="y"/>
 
     <statistic name="msgMatched" type="count64"/>
   </class>