You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/10/24 02:45:12 UTC
svn commit: r707515 - in /incubator/qpid/trunk/qpid: cpp/examples/direct/
cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/src/qpid/broker/
cpp/src/tests/ python/commands/ specs/
Author: tross
Date: Thu Oct 23 17:45:11 2008
New Revision: 707515
URL: http://svn.apache.org/viewvc?rev=707515&view=rev
Log:
QPID-1348 - Dynamic binding for federation. Parameterized exchange names for CPP examples
Modified:
incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp
incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp
incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp
incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp
incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp
incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
incubator/qpid/trunk/qpid/python/commands/qpid-route
incubator/qpid/trunk/qpid/specs/management-schema.xml
Modified: incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/direct/declare_queues.cpp Thu Oct 23 17:45:11 2008
@@ -56,6 +56,7 @@
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
+ string exchange(argc>3 ? argv[3] : "amq.direct");
Connection connection;
try {
@@ -69,7 +70,7 @@
// routing key is "routing_key" to this newly created queue.
session.queueDeclare(arg::queue="message_queue");
- session.exchangeBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::bindingKey="routing_key");
+ session.exchangeBind(arg::exchange=exchange, arg::queue="message_queue", arg::bindingKey="routing_key");
//-----------------------------------------------------------------------------
Modified: incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp Thu Oct 23 17:45:11 2008
@@ -65,6 +65,7 @@
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
int count = argc>3 ? atoi(argv[3]) : 10;
+ string exchange(argc>4 ? argv[4] : "amq.direct");
Connection connection;
Message message;
try {
@@ -89,14 +90,14 @@
message.setData(message_data.str());
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
- // async(session).messageTransfer(arg::content=message, arg::destination="amq.direct");
- session.messageTransfer(arg::content=message, arg::destination="amq.direct");
+ // async(session).messageTransfer(arg::content=message, arg::destination=exchange);
+ session.messageTransfer(arg::content=message, arg::destination=exchange);
}
// And send a final message to indicate termination.
message.setData("That's all, folks!");
- session.messageTransfer(arg::content=message, arg::destination="amq.direct");
+ session.messageTransfer(arg::content=message, arg::destination=exchange);
//-----------------------------------------------------------------------------
Modified: incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/fanout/fanout_producer.cpp Thu Oct 23 17:45:11 2008
@@ -64,6 +64,7 @@
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
+ string exchange = argc>3 ? argv[3] : "amq.fanout";
Connection connection;
Message message;
try {
@@ -86,13 +87,13 @@
message.setData(message_data.str());
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
- async(session).messageTransfer(arg::content=message, arg::destination="amq.fanout");
+ async(session).messageTransfer(arg::content=message, arg::destination=exchange);
}
// And send a final message to indicate termination.
message.setData("That's all, folks!");
- session.messageTransfer(arg::content=message, arg::destination="amq.fanout");
+ session.messageTransfer(arg::content=message, arg::destination=exchange);
//-----------------------------------------------------------------------------
Modified: incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/fanout/listener.cpp Thu Oct 23 17:45:11 2008
@@ -60,6 +60,7 @@
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
+ string exchange = argc>3 ? argv[3] : "amq.fanout";
Connection connection;
Message msg;
try {
@@ -82,7 +83,7 @@
session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
arg::autoDelete=true);
- session.exchangeBind(arg::exchange="amq.fanout", arg::queue=myQueue, arg::bindingKey="my-key");
+ session.exchangeBind(arg::exchange=exchange, arg::queue=myQueue, arg::bindingKey="my-key");
// Create a listener and subscribe it to my queue.
SubscriptionManager subscriptions(session);
Modified: incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_listener.cpp Thu Oct 23 17:45:11 2008
@@ -61,7 +61,7 @@
SubscriptionManager subscriptions;
public:
Listener(Session& session);
- virtual void prepareQueue(std::string queue, std::string routing_key);
+ virtual void prepareQueue(std::string queue, std::string exchange, std::string routing_key);
virtual void received(Message& message);
virtual void listen();
~Listener() { };
@@ -84,7 +84,7 @@
}
-void Listener::prepareQueue(std::string queue, std::string routing_key) {
+void Listener::prepareQueue(std::string queue, std::string exchange, std::string routing_key) {
/* Create a unique queue name for this consumer by concatenating
* the queue name parameter with the Session ID.
@@ -106,8 +106,8 @@
* "control" routing key, when it is finished.
*/
- session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey=routing_key);
- session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey="control");
+ session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=routing_key);
+ session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey="control");
/*
* subscribe to the queue using the subscription manager.
@@ -134,6 +134,7 @@
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
+ std::string exchange = argc>3 ? argv[3] : "amq.topic";
Connection connection;
try {
connection.open(host, port);
@@ -147,12 +148,12 @@
// Subscribe to messages on the queues we are interested in
- listener.prepareQueue("usa", "usa.#");
- listener.prepareQueue("europe", "europe.#");
- listener.prepareQueue("news", "#.news");
- listener.prepareQueue("weather", "#.weather");
+ listener.prepareQueue("usa", exchange, "usa.#");
+ listener.prepareQueue("europe", exchange, "europe.#");
+ listener.prepareQueue("news", exchange, "#.news");
+ listener.prepareQueue("weather", exchange, "#.weather");
- std::cout << "Listening for messages ..." << std::endl;
+ std::cout << "Listening for messages ..." << std::endl;
// Give up control and receive messages
listener.listen();
Modified: incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/pub-sub/topic_publisher.cpp Thu Oct 23 17:45:11 2008
@@ -60,7 +60,7 @@
using std::stringstream;
using std::string;
-void publish_messages(Session& session, string routing_key)
+void publish_messages(Session& session, string exchange, string routing_key)
{
Message message;
@@ -75,7 +75,7 @@
message.setData(message_data.str());
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
- async(session).messageTransfer(arg::content=message, arg::destination="amq.topic");
+ async(session).messageTransfer(arg::content=message, arg::destination=exchange);
}
}
@@ -88,18 +88,19 @@
*
*/
-void no_more_messages(Session& session)
+void no_more_messages(Session& session, string exchange)
{
Message message;
message.getDeliveryProperties().setRoutingKey("control");
message.setData("That's all, folks!");
- session.messageTransfer(arg::content=message, arg::destination="amq.topic");
+ session.messageTransfer(arg::content=message, arg::destination=exchange);
}
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
+ std::string exchange = argc>3 ? argv[3] : "amq.topic";
Connection connection;
Message message;
try {
@@ -108,12 +109,12 @@
//--------- Main body of program --------------------------------------------
- publish_messages(session, "usa.news");
- publish_messages(session, "usa.weather");
- publish_messages(session, "europe.news");
- publish_messages(session, "europe.weather");
+ publish_messages(session, exchange, "usa.news");
+ publish_messages(session, exchange, "usa.weather");
+ publish_messages(session, exchange, "europe.news");
+ publish_messages(session, exchange, "europe.weather");
- no_more_messages(session);
+ no_more_messages(session, exchange);
//-----------------------------------------------------------------------------
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Oct 23 17:45:11 2008
@@ -34,6 +34,18 @@
using qpid::management::ManagementAgent;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
namespace qpid {
namespace broker {
@@ -45,8 +57,9 @@
Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
const _qmf::ArgsLinkBridge& _args) :
link(_link), id(_id), args(_args), mgmtObject(0),
- listener(l), name(Uuid(true).str()), persistenceId(0)
+ listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), persistenceId(0)
{
+ queueName += name;
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
@@ -65,7 +78,10 @@
void Bridge::create(ConnectionState& c)
{
+ connState = &c;
if (args.i_srcIsLocal) {
+ if (args.i_dynamic)
+ throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
Connection* conn = dynamic_cast<Connection*>(&c);
if (conn == 0)
@@ -74,7 +90,7 @@
channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
} else {
// Point the bridging commands at the remote peer broker
- channelHandler.reset(new framing::ChannelHandler(id, &(c.getOutput())));
+ channelHandler.reset(new framing::ChannelHandler(id, &(connState->getOutput())));
}
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
@@ -88,8 +104,6 @@
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
- string queue = "bridge_queue_";
- queue += Uuid(true).str();
FieldTable queueSettings;
if (args.i_tag.size()) {
@@ -103,19 +117,26 @@
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
} else {
- const string& peerTag = c.getFederationPeerTag();
+ const string& peerTag = connState->getFederationPeerTag();
if (peerTag.size())
queueSettings.setString("qpid.trace.exclude", peerTag);
}
bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
- peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings);
+ peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings);
if (!args.i_dynamic)
- peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
- peer->getMessage().subscribe(queue, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
+ peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+ if (exchange.get() == 0)
+ throw Exception("Exchange not found for dynamic route");
+ exchange->registerDynamicBridge(this);
+ }
}
}
@@ -123,6 +144,11 @@
{
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(name);
+ if (args.i_dynamic) {
+ Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
+ if (exchange.get() != 0)
+ exchange->removeDynamicBridge(this);
+ }
}
void Bridge::destroy()
@@ -220,4 +246,46 @@
}
}
+void Bridge::propagateBinding(const string& key, const string& tagList,
+ const string& op, const string& origin)
+{
+ const string& localTag = link->getBroker()->getFederationTag();
+ const string& peerTag = connState->getFederationPeerTag();
+
+ if (tagList.find(peerTag) == tagList.npos) {
+ FieldTable bindArgs;
+ string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag);
+
+ bindArgs.setString(qpidFedOp, op);
+ bindArgs.setString(qpidFedTags, newTagList);
+ if (origin.empty())
+ bindArgs.setString(qpidFedOrigin, localTag);
+ else
+ bindArgs.setString(qpidFedOrigin, origin);
+
+ peer->getExchange().bind(queueName, args.i_src, key, bindArgs);
+ }
+}
+
+void Bridge::sendReorigin()
+{
+ FieldTable bindArgs;
+
+ bindArgs.setString(qpidFedOp, fedOpReorigin);
+ bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
+
+ peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs);
+}
+
+bool Bridge::containsLocalTag(const string& tagList) const
+{
+ const string& localTag = link->getBroker()->getFederationTag();
+ return (tagList.find(localTag) != tagList.npos);
+}
+
+const string& Bridge::getLocalTag() const
+{
+ return link->getBroker()->getFederationTag();
+}
+
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Thu Oct 23 17:45:11 2008
@@ -27,6 +27,7 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/management/Manageable.h"
+#include "Exchange.h"
#include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
#include "qmf/org/apache/qpid/broker/Bridge.h"
@@ -41,7 +42,7 @@
class Link;
class LinkRegistry;
-class Bridge : public PersistableConfig, public management::Manageable
+class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
@@ -69,6 +70,12 @@
const std::string& getName() const;
static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
+ // Exchange::DynamicBridge methods
+ void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin);
+ void sendReorigin();
+ bool containsLocalTag(const std::string& tagList) const;
+ const std::string& getLocalTag() const;
+
private:
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
@@ -87,7 +94,9 @@
qmf::org::apache::qpid::broker::Bridge* mgmtObject;
CancellationListener listener;
std::string name;
+ std::string queueName;
mutable uint64_t persistenceId;
+ ConnectionState* connState;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Oct 23 17:45:11 2008
@@ -28,25 +28,45 @@
using qpid::management::Manageable;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
{
if (mgmtExchange != 0)
- mgmtExchange->set_type (typeName);
+ mgmtExchange->set_type(typeName);
}
-DirectExchange::DirectExchange(const std::string& _name, bool _durable,
+DirectExchange::DirectExchange(const string& _name, bool _durable,
const FieldTable& _args, Manageable* _parent) :
Exchange(_name, _durable, _args, _parent)
{
if (mgmtExchange != 0)
- mgmtExchange->set_type (typeName);
+ mgmtExchange->set_type(typeName);
}
-bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
- {
+bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+{
+ string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+ string fedTags(args ? args->getAsString(qpidFedTags) : "");
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+ bool propagate = false;
+
+ if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
Mutex::ScopedLock l(lock);
- Binding::shared_ptr b(new Binding (routingKey, queue, this));
- if (bindings[routingKey].add_unless(b, MatchQueue(queue))) {
+ Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin));
+ BoundKey& bk = bindings[routingKey];
+ if (bk.queues.add_unless(b, MatchQueue(queue))) {
+ propagate = bk.fedBinding.addOrigin(fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
@@ -54,30 +74,58 @@
} else {
return false;
}
+ } else if (fedOp == fedOpUnbind) {
+ Mutex::ScopedLock l(lock);
+ BoundKey& bk = bindings[routingKey];
+ propagate = bk.fedBinding.delOrigin(fedOrigin);
+ if (bk.fedBinding.count() == 0)
+ unbind(queue, routingKey, 0);
+ } else if (fedOp == fedOpReorigin) {
+ for (std::map<string, BoundKey>::iterator iter = bindings.begin();
+ iter != bindings.end(); iter++) {
+ const BoundKey& bk = iter->second;
+ if (bk.fedBinding.hasLocal()) {
+ propagateFedOp(iter->first, string(), fedOpBind, string());
+ }
+ }
}
+
routeIVE();
+ if (propagate)
+ propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
return true;
}
-bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- Mutex::ScopedLock l(lock);
- if (bindings[routingKey].remove_if(MatchQueue(queue))) {
- if (mgmtExchange != 0) {
- mgmtExchange->dec_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
+bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+{
+ bool propagate = false;
+
+ {
+ Mutex::ScopedLock l(lock);
+ BoundKey& bk = bindings[routingKey];
+ if (bk.queues.remove_if(MatchQueue(queue))) {
+ propagate = bk.fedBinding.delOrigin();
+ if (mgmtExchange != 0) {
+ mgmtExchange->dec_bindingCount();
+ ((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
+ }
+ } else {
+ return false;
}
- return true;
- } else {
- return false;
}
+
+ if (propagate)
+ propagateFedOp(routingKey, string(), fedOpUnbind, string());
+ return true;
}
-void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
PreRoute pr(msg, this);
Queues::ConstPtr p;
{
Mutex::ScopedLock l(lock);
- p = bindings[routingKey].snapshot();
+ p = bindings[routingKey].queues.snapshot();
}
int count(0);
@@ -85,26 +133,26 @@
for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
msg.deliverTo((*i)->queue);
if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ (*i)->mgmtBinding->inc_msgMatched();
}
}
if(!count){
QPID_LOG(warning, "DirectExchange " << getName() << " could not route message with key " << routingKey);
if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
+ mgmtExchange->inc_msgDrops();
+ mgmtExchange->inc_byteDrops(msg.contentSize());
}
} else {
if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ mgmtExchange->inc_msgRoutes(count);
+ mgmtExchange->inc_byteRoutes(count * msg.contentSize());
}
}
if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
+ mgmtExchange->inc_msgReceives();
+ mgmtExchange->inc_byteReceives(msg.contentSize());
}
}
@@ -120,14 +168,14 @@
if (!queue)
return true;
- Queues::ConstPtr p = i->second.snapshot();
+ Queues::ConstPtr p = i->second.queues.snapshot();
return p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end();
} else if (!queue) {
//if no queue or routing key is specified, just report whether any bindings exist
return bindings.size() > 0;
} else {
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
- Queues::ConstPtr p = i->second.snapshot();
+ Queues::ConstPtr p = i->second.queues.snapshot();
if (p && std::find_if(p->begin(), p->end(), MatchQueue(queue)) != p->end()) return true;
}
return false;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Thu Oct 23 17:45:11 2008
@@ -31,33 +31,35 @@
namespace qpid {
namespace broker {
- class DirectExchange : public virtual Exchange{
- typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues;
- typedef std::map<string, Queues> Bindings;
- Bindings bindings;
- qpid::sys::Mutex lock;
+class DirectExchange : public virtual Exchange {
+ typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> Queues;
+ struct BoundKey {
+ Queues queues;
+ FedBinding fedBinding;
+ };
+ typedef std::map<string, BoundKey> Bindings;
+ Bindings bindings;
+ qpid::sys::Mutex lock;
- public:
- static const std::string typeName;
+public:
+ static const std::string typeName;
- DirectExchange(const std::string& name, management::Manageable* parent = 0);
- DirectExchange(const string& _name, bool _durable,
- const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
+ DirectExchange(const std::string& name, management::Manageable* parent = 0);
+ DirectExchange(const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
- virtual std::string getType() const { return typeName; }
+ virtual std::string getType() const { return typeName; }
- virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
- virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual ~DirectExchange();
- virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
-
- virtual ~DirectExchange();
- };
-}
-}
+ virtual bool supportsDynamicBinding() { return true; }
+};
+}}
#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Oct 23 17:45:11 2008
@@ -40,6 +40,14 @@
{
const std::string qpidMsgSequence("qpid.msg_sequence");
const std::string qpidIVE("qpid.ive");
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
}
@@ -73,7 +81,7 @@
Exchange::Exchange (const string& _name, Manageable* parent) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0)
+ sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -89,7 +97,7 @@
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
: name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
- sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
+ sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
{
if (parent != 0)
{
@@ -107,7 +115,7 @@
}
}
}
-
+
sequence = _args.get(qpidMsgSequence);
if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
@@ -166,8 +174,46 @@
return (ManagementObject*) mgmtExchange;
}
+void Exchange::registerDynamicBridge(DynamicBridge* db)
+{
+ if (!supportsDynamicBinding())
+ throw Exception("Exchange type does not support dynamic binding");
+
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ (*iter)->sendReorigin();
+
+ bridgeVector.push_back(db);
+ FieldTable args;
+ args.setString(qpidFedOp, fedOpReorigin);
+ bind(Queue::shared_ptr(), string(), &args);
+}
+
+void Exchange::removeDynamicBridge(DynamicBridge* db)
+{
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ if (*iter == db) {
+ bridgeVector.erase(iter);
+ break;
+ }
+}
+
+void Exchange::handleHelloRequest()
+{
+}
+
+void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin)
+{
+ string myOp(op.empty() ? fedOpBind : op);
+
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ (*iter)->propagateBinding(routingKey, tags, op, origin);
+}
+
Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent,
- FieldTable _args)
+ FieldTable _args, const string& origin)
: queue(_queue), key(_key), args(_args), mgmtBinding(0)
{
if (parent != 0)
@@ -181,6 +227,8 @@
management::ObjectId queueId = mo->getObjectId();
mgmtBinding = new _qmf::Binding
(agent, this, (Manageable*) parent, queueId, key, args);
+ if (!origin.empty())
+ mgmtBinding->set_origin(origin);
agent->addObject (mgmtBinding);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Oct 23 17:45:11 2008
@@ -74,7 +74,7 @@
qmf::org::apache::qpid::broker::Binding* mgmtBinding;
Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
- framing::FieldTable args = framing::FieldTable());
+ framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
~Binding();
management::ManagementObject* GetManagementObject() const;
};
@@ -85,6 +85,34 @@
bool operator()(Exchange::Binding::shared_ptr b);
};
+ class FedBinding {
+ uint32_t localBindings;
+ std::set<std::string> originSet;
+ public:
+ FedBinding() : localBindings(0) {}
+ bool hasLocal() const { return localBindings != 0; }
+ bool addOrigin(const std::string& origin) {
+ if (origin.empty()) {
+ localBindings++;
+ return localBindings == 1;
+ }
+ originSet.insert(origin);
+ return true;
+ }
+ bool delOrigin(const std::string& origin) {
+ originSet.erase(origin);
+ return true;
+ }
+ bool delOrigin() {
+ if (localBindings > 0)
+ localBindings--;
+ return localBindings == 0;
+ }
+ uint32_t count() {
+ return localBindings + originSet.size();
+ }
+ };
+
qmf::org::apache::qpid::broker::Exchange* mgmtExchange;
public:
@@ -121,6 +149,27 @@
// Manageable entry points
management::ManagementObject* GetManagementObject(void) const;
+
+ // Federation hooks
+ class DynamicBridge {
+ public:
+ virtual ~DynamicBridge() {}
+ virtual void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin) = 0;
+ virtual void sendReorigin() = 0;
+ virtual bool containsLocalTag(const std::string& tagList) const = 0;
+ virtual const std::string& getLocalTag() const = 0;
+ };
+
+ void registerDynamicBridge(DynamicBridge* db);
+ void removeDynamicBridge(DynamicBridge* db);
+ virtual bool supportsDynamicBinding() { return false; }
+
+protected:
+ std::vector<DynamicBridge*> bridgeVector;
+
+ virtual void handleHelloRequest();
+ void propagateFedOp(const std::string& routingKey, const std::string& tags,
+ const std::string& op, const std::string& origin);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Oct 23 17:45:11 2008
@@ -26,6 +26,18 @@
using namespace qpid::sys;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) :
Exchange(_name, _parent)
{
@@ -41,32 +53,57 @@
mgmtExchange->set_type (typeName);
}
-bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
+bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* args)
{
- Binding::shared_ptr binding (new Binding ("", queue, this));
- if (bindings.add_unless(binding, MatchQueue(queue))) {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_bindingCount();
- ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+ string fedTags(args ? args->getAsString(qpidFedTags) : "");
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+ bool propagate = false;
+
+ if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
+ Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
+ if (bindings.add_unless(binding, MatchQueue(queue))) {
+ propagate = fedBinding.addOrigin(fedOrigin);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_bindingCount();
+ ((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
+ }
+ } else {
+ return false;
+ }
+ } else if (fedOp == fedOpUnbind) {
+ propagate = fedBinding.delOrigin(fedOrigin);
+ if (fedBinding.count() == 0)
+ unbind(queue, "", 0);
+ } else if (fedOp == fedOpReorigin) {
+ if (fedBinding.hasLocal()) {
+ propagateFedOp(string(), string(), fedOpBind, string());
}
- routeIVE();
- return true;
- } else {
- return false;
}
+
+ routeIVE();
+ if (propagate)
+ propagateFedOp(string(), fedTags, fedOp, fedOrigin);
+ return true;
}
bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, const FieldTable* /*args*/)
{
+ bool propagate = false;
+
if (bindings.remove_if(MatchQueue(queue))) {
+ propagate = fedBinding.delOrigin();
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
}
- return true;
} else {
return false;
}
+
+ if (propagate)
+ propagateFedOp(string(), string(), fedOpUnbind, string());
+ return true;
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Thu Oct 23 17:45:11 2008
@@ -34,6 +34,7 @@
class FanOutExchange : public virtual Exchange {
typedef qpid::sys::CopyOnWriteArray<Binding::shared_ptr> BindingsArray;
BindingsArray bindings;
+ FedBinding fedBinding;
public:
static const std::string typeName;
@@ -53,6 +54,7 @@
virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
virtual ~FanOutExchange();
+ virtual bool supportsDynamicBinding() { return true; }
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Thu Oct 23 17:45:11 2008
@@ -336,7 +336,7 @@
return (ManagementObject*) mgmtObject;
}
-Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args, string&)
+Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args, string& text)
{
switch (op)
{
@@ -350,8 +350,22 @@
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
// Durable bridges are only valid on durable links
- if (iargs.i_durable && !durable)
- return Manageable::STATUS_INVALID_PARAMETER;
+ if (iargs.i_durable && !durable) {
+ text = "Can't create a durable route on a non-durable link";
+ return Manageable::STATUS_USER;
+ }
+
+ if (iargs.i_dynamic) {
+ Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
+ if (exchange.get() == 0) {
+ text = "Exchange not found";
+ return Manageable::STATUS_USER;
+ }
+ if (!exchange->supportsDynamicBinding()) {
+ text = "Exchange type does not support dynamic routing";
+ return Manageable::STATUS_USER;
+ }
+ }
std::pair<Bridge::shared_ptr, bool> result =
links->declare (host, port, iargs.i_durable, iargs.i_src,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Oct 23 17:45:11 2008
@@ -31,6 +31,18 @@
// - excessive string copying: should be 0 copy, match from original buffer.
// - match/lookup: use descision tree or other more efficient structure.
+namespace
+{
+const std::string qpidFedOp("qpid.fed.op");
+const std::string qpidFedTags("qpid.fed.tags");
+const std::string qpidFedOrigin("qpid.fed.origin");
+
+const std::string fedOpBind("B");
+const std::string fedOpUnbind("U");
+const std::string fedOpReorigin("R");
+const std::string fedOpHello("H");
+}
+
Tokens& Tokens::operator=(const std::string& s) {
clear();
if (s.empty()) return *this;
@@ -51,6 +63,15 @@
return *this;
}
+void Tokens::key(string& keytext) const
+{
+ for (std::vector<string>::const_iterator iter = begin(); iter != end(); iter++) {
+ if (iter != begin())
+ keytext += ".";
+ keytext += *iter;
+ }
+}
+
namespace {
const std::string hashmark("#");
const std::string star("*");
@@ -81,7 +102,7 @@
namespace {
-// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// TODO aconway 2006-09-20: Inefficient to convert every routingKey to a string.
// Need StringRef class that operates on a string in place witout copy.
// Should be applied everywhere strings are extracted from frames.
//
@@ -130,30 +151,63 @@
mgmtExchange->set_type (typeName);
}
-bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- {
+bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
+{
+ string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
+ string fedTags(args ? args->getAsString(qpidFedTags) : "");
+ string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
+ bool propagate = false;
+ bool reallyUnbind;
+ TopicPattern routingPattern(routingKey);
+
+ if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
RWlock::ScopedWlock l(lock);
- TopicPattern routingPattern(routingKey);
if (isBound(queue, routingPattern)) {
return false;
} else {
- Binding::shared_ptr binding (new Binding (routingKey, queue, this));
- bindings[routingPattern].push_back(binding);
+ Binding::shared_ptr binding (new Binding (routingKey, queue, this, FieldTable(), fedOrigin));
+ BoundKey& bk = bindings[routingPattern];
+ bk.bindingVector.push_back(binding);
+ propagate = bk.fedBinding.addOrigin(fedOrigin);
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
}
+ } else if (fedOp == fedOpUnbind) {
+ {
+ RWlock::ScopedWlock l(lock);
+ BoundKey& bk = bindings[routingPattern];
+ propagate = bk.fedBinding.delOrigin(fedOrigin);
+ reallyUnbind = bk.fedBinding.count() == 0;
+ }
+ if (reallyUnbind)
+ unbind(queue, routingKey, 0);
+ } else if (fedOp == fedOpReorigin) {
+ for (std::map<TopicPattern, BoundKey>::iterator iter = bindings.begin();
+ iter != bindings.end(); iter++) {
+ const BoundKey& bk = iter->second;
+ if (bk.fedBinding.hasLocal()) {
+ string propKey;
+ iter->first.key(propKey);
+ propagateFedOp(propKey, string(), fedOpBind, string());
+ }
+ }
}
+
routeIVE();
+ if (propagate)
+ propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
return true;
}
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedWlock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Binding::vector& qv(bi->second);
if (bi == bindings.end()) return false;
+ BoundKey& bk = bi->second;
+ Binding::vector& qv(bk.bindingVector);
+ bool propagate = false;
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
@@ -161,11 +215,15 @@
break;
if(q == qv.end()) return false;
qv.erase(q);
+ propagate = bk.fedBinding.delOrigin();
if(qv.empty()) bindings.erase(bi);
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
}
+
+ if (propagate)
+ propagateFedOp(routingKey, string(), fedOpUnbind, string());
return true;
}
@@ -173,7 +231,7 @@
{
BindingMap::iterator bi = bindings.find(pattern);
if (bi == bindings.end()) return false;
- Binding::vector& qv(bi->second);
+ Binding::vector& qv(bi->second.bindingVector);
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
if ((*q)->queue == queue)
@@ -189,7 +247,7 @@
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(tokens)) {
- Binding::vector& qv(i->second);
+ Binding::vector& qv(i->second.bindingVector);
for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
msg.deliverTo((*j)->queue);
if ((*j)->mgmtBinding != 0)
@@ -230,7 +288,7 @@
}
} else {
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- Binding::vector& qv(i->second);
+ Binding::vector& qv(i->second.bindingVector);
Binding::vector::iterator q;
for (q = qv.begin(); q != qv.end(); q++)
if ((*q)->queue == queue)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Thu Oct 23 17:45:11 2008
@@ -41,6 +41,7 @@
Tokens(const std::string& s) { operator=(s); }
/** Tokenizing assignment operator s */
Tokens & operator=(const std::string& s);
+ void key(std::string& key) const;
private:
size_t hash;
@@ -70,8 +71,12 @@
void normalize();
};
-class TopicExchange : public virtual Exchange{
- typedef std::map<TopicPattern, Binding::vector> BindingMap;
+class TopicExchange : public virtual Exchange {
+ struct BoundKey {
+ Binding::vector bindingVector;
+ FedBinding fedBinding;
+ };
+ typedef std::map<TopicPattern, BoundKey> BindingMap;
BindingMap bindings;
qpid::sys::RWlock lock;
@@ -94,6 +99,7 @@
virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
virtual ~TopicExchange();
+ virtual bool supportsDynamicBinding() { return true; }
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Thu Oct 23 17:45:11 2008
@@ -20,7 +20,6 @@
import sys
from qpid.testlib import TestBase010, testrunner
-from qpid.management import managementChannel, managementClient
from qpid.datatypes import Message
from qpid.queue import Empty
from time import sleep
@@ -54,68 +53,45 @@
def remote_port():
return int(scan_args("--remote-port"))
-class Helper:
- def __init__(self, parent):
- self.parent = parent
- self.session = parent.conn.session("Helper")
- self.mc = managementClient(self.session.spec)
- self.mch = self.mc.addChannel(self.session)
- self.mc.syncWaitForStable(self.mch)
-
- def shutdown (self):
- self.mc.removeChannel (self.mch)
-
- def get_objects(self, type):
- return self.mc.syncGetObjects(self.mch, type)
-
- def get_object(self, type, position = 1, expected = None):
- objects = self.get_objects(type)
- if not expected: expected = position
- self.assertEqual(len(objects), expected)
- return objects[(position - 1)]
-
-
- def call_method(self, object, method, args=None):
- res = self.mc.syncCallMethod(self.mch, object.id, object.classKey, method, args)
- self.assertEqual(res.status, 0)
- self.assertEqual(res.statusText, "OK")
- return res
-
- def assertEqual(self, a, b):
- self.parent.assertEqual(a, b)
-
class FederationTests(TestBase010):
def test_bridge_create_and_close(self):
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
+ self.startQmf();
+ qmf = self.qmf
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
-
- mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.direct", "key":"my-key"})
- bridge = mgmt.get_object("bridge")
-
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False)
+ self.assertEqual(result.status, 0)
+
+ bridge = qmf.getObjects(_class="bridge")[0]
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
- mgmt.shutdown ()
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
def test_pull_from_exchange(self):
session = self.session
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False)
+ self.assertEqual(result.status, 0)
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
-
- mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key"})
- bridge = mgmt.get_object("bridge")
+ bridge = qmf.getObjects(_class="bridge")[0]
#setup queue to receive messages from local broker
session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
@@ -140,27 +116,29 @@
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
- mgmt.shutdown()
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
def test_push_to_exchange(self):
session = self.session
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False)
+ self.assertEqual(result.status, 0)
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
-
- mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout",
- "key":"my-key", "tag":"", "excludes":"", "srcIsQueue":0,
- "srcIsLocal":1})
- bridge = mgmt.get_object("bridge")
+ bridge = qmf.getObjects(_class="bridge")[0]
#setup queue to receive messages from remote broker
r_conn = self.connect(host=remote_host(), port=remote_port())
@@ -184,13 +162,14 @@
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
- mgmt.shutdown()
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
def test_pull_from_queue(self):
session = self.session
@@ -209,16 +188,18 @@
self.subscribe(queue="fed1", destination="f1")
queue = session.incoming("f1")
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
-
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False)
+ self.assertEqual(result.status, 0)
- mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout",
- "key":"", "tag":"", "excludes":"", "srcIsQueue":1})
- sleep(6)
- bridge = mgmt.get_object("bridge")
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(3)
#add some more messages (i.e. after bridge was created)
for i in range(6, 11):
@@ -236,14 +217,14 @@
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
-
- mgmt.shutdown ()
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
def test_tracing_automatic(self):
remoteUrl = "%s:%d" % (remote_host(), remote_port())
@@ -307,22 +288,24 @@
def test_tracing(self):
session = self.session
- mgmt = Helper(self)
- broker = mgmt.get_object("broker")
-
- mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
- link = mgmt.get_object("link")
-
- mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", "dest":"amq.fanout", "key":"my-key",
- "tag":"my-bridge-id", "excludes":"exclude-me,also-exclude-me"})
- sleep(6)
- bridge = mgmt.get_object("bridge")
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id",
+ "exclude-me,also-exclude-me", False, False, False)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
#setup queue to receive messages from local broker
session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
session.exchange_bind(queue="fed1", exchange="amq.fanout")
self.subscribe(queue="fed1", destination="f1")
queue = session.incoming("f1")
+ sleep(6)
#send messages to remote broker and confirm it is routed to local broker
r_conn = self.connect(host=remote_host(), port=remote_port())
@@ -347,13 +330,155 @@
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
- mgmt.call_method(bridge, "close")
- mgmt.call_method(link, "close")
- sleep(6)
- self.assertEqual(len(mgmt.get_objects("bridge")), 0)
- self.assertEqual(len(mgmt.get_objects("link")), 0)
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+ def test_dynamic_fanout(self):
+ session = self.session
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_dynamic_fanout")
+
+ session.exchange_declare(exchange="fed.fanout", type="fanout")
+ r_session.exchange_declare(exchange="fed.fanout", type="fanout")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.fanout")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ for i in range(1, 11):
+ dp = r_session.delivery_properties()
+ r_session.message_transfer(destination="fed.fanout", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
+ def test_dynamic_direct(self):
+ session = self.session
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_dynamic_direct")
+
+ session.exchange_declare(exchange="fed.direct", type="direct")
+ r_session.exchange_declare(exchange="fed.direct", type="direct")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.direct", binding_key="fd-key")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ for i in range(1, 11):
+ dp = r_session.delivery_properties(routing_key="fd-key")
+ r_session.message_transfer(destination="fed.direct", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
+
+
+ def test_dynamic_topic(self):
+ session = self.session
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_dynamic_topic")
+
+ session.exchange_declare(exchange="fed.topic", type="topic")
+ r_session.exchange_declare(exchange="fed.topic", type="topic")
+
+ self.startQmf()
+ qmf = self.qmf
+ broker = qmf.getObjects(_class="broker")[0]
+ result = broker.connect(remote_host(), remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+ self.assertEqual(result.status, 0)
+
+ link = qmf.getObjects(_class="link")[0]
+ result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True)
+ self.assertEqual(result.status, 0)
+ bridge = qmf.getObjects(_class="bridge")[0]
+ sleep(5)
+
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="fed.topic", binding_key="ft-key.#")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ for i in range(1, 11):
+ dp = r_session.delivery_properties(routing_key="ft-key.one.two")
+ r_session.message_transfer(destination="fed.topic", message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ result = bridge.close()
+ self.assertEqual(result.status, 0)
+ result = link.close()
+ self.assertEqual(result.status, 0)
+
+ sleep(3)
+ self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
+ self.assertEqual(len(qmf.getObjects(_class="link")), 0)
- mgmt.shutdown ()
def getProperty(self, msg, name):
for h in msg.headers:
@@ -364,7 +489,8 @@
headers = self.getProperty(msg, "application_headers")
if headers:
return headers[name]
- return None
+ return None
+
if __name__ == '__main__':
args = sys.argv[1:]
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Thu Oct 23 17:45:11 2008
@@ -34,6 +34,8 @@
print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>"
print " qpid-route [OPTIONS] route list [<dest-broker>]"
print " qpid-route [OPTIONS] route flush [<dest-broker>]"
+ print " qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
+ print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
print
print "Options:"
print " -v [ --verbose ] Verbose output"
@@ -118,7 +120,7 @@
print "%-16s%-8d %c %-18s%s" % \
(link.host, link.port, YN(link.durable), link.state, link.lastError)
- def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes):
+ def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes, dynamic=False):
self.src = qmfconsole.BrokerURL(srcBroker)
if self.dest.match(self.src.host, self.src.port):
raise Exception("Linking broker to itself is not permitted")
@@ -155,13 +157,13 @@
if _verbose:
print "Creating inter-broker binding..."
- res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, False)
- if res.status == 4:
- raise Exception("Can't create a durable route on a non-durable link")
+ res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, dynamic)
+ if res.status != 0:
+ raise Exception(res.text)
if _verbose:
print "Bridge method returned:", res.status, res.text
- def DelRoute (self, srcBroker, exchange, routingKey):
+ def DelRoute (self, srcBroker, exchange, routingKey, dynamic=False):
self.src = qmfconsole.BrokerURL(srcBroker)
link = self.getLink()
if link == None:
@@ -171,7 +173,8 @@
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
- if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey:
+ if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey \
+ and bridge.dynamic == dynamic:
if _verbose:
print "Closing bridge..."
res = bridge.close()
@@ -201,7 +204,11 @@
myLink = link
break
if myLink != None:
- print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
+ if bridge.dynamic:
+ keyText = "<dynamic>"
+ else:
+ keyText = bridge.key
+ print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, keyText)
def ClearAllRoutes (self):
links = self.qmf.getObjects(_class="link")
@@ -285,6 +292,22 @@
elif cmd == "list":
rm.ListLinks ()
+ elif group == "dynamic":
+ if cmd == "add":
+ if nargs < 5 or nargs > 7:
+ Usage ()
+
+ tag = ""
+ excludes = ""
+ if nargs > 5: tag = cargs[5]
+ if nargs > 6: excludes = cargs[6]
+ rm.AddRoute (cargs[3], cargs[4], "", tag, excludes, dynamic=True)
+ elif cmd == "del":
+ if nargs != 5:
+ Usage ()
+ else:
+ rm.DelRoute (cargs[3], cargs[4], "", dynamic=True)
+
elif group == "route":
if cmd == "add":
if nargs < 6 or nargs > 8:
@@ -294,12 +317,12 @@
excludes = ""
if nargs > 6: tag = cargs[6]
if nargs > 7: excludes = cargs[7]
- rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes)
+ rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes, dynamic=False)
elif cmd == "del":
if nargs != 6:
Usage ()
else:
- rm.DelRoute (cargs[3], cargs[4], cargs[5])
+ rm.DelRoute (cargs[3], cargs[4], cargs[5], dynamic=False)
else:
if cmd == "list":
rm.ListRoutes ()
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=707515&r1=707514&r2=707515&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Thu Oct 23 17:45:11 2008
@@ -196,6 +196,7 @@
<property name="queueRef" type="objId" references="Queue" access="RC" index="y"/>
<property name="bindingKey" type="sstr" access="RC" index="y"/>
<property name="arguments" type="map" access="RC"/>
+ <property name="origin" type="sstr" access="RO" optional="y"/>
<statistic name="msgMatched" type="count64"/>
</class>