You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/06/27 14:39:50 UTC
svn commit: r551144 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/src/tests/ python/tests_0-9/
Author: gsim
Date: Wed Jun 27 05:39:49 2007
New Revision: 551144
URL: http://svn.apache.org/viewvc?view=rev&rev=551144
Log:
Added preview of exchange- and binding- query methods that have been approved for 0-10.
Added:
incubator/qpid/trunk/qpid/python/tests_0-9/query.py (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Wed Jun 27 05:39:49 2007
@@ -40,6 +40,7 @@
channelHandler(*this),
connectionHandler(*this),
exchangeHandler(*this),
+ bindingHandler(*this),
messageHandler(*this),
queueHandler(*this),
txHandler(*this),
@@ -159,6 +160,49 @@
broker.getExchanges().destroy(name);
if(!nowait) client.deleteOk(context.getRequestId());
}
+
+void BrokerAdapter::ExchangeHandlerImpl::query(const MethodContext& context, u_int16_t /*ticket*/, const string& name)
+{
+ try {
+ Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs(), context.getRequestId());
+ } catch (const ChannelException& e) {
+ client.queryOk("", false, true, FieldTable(), context.getRequestId());
+ }
+}
+
+void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& context,
+ u_int16_t /*ticket*/,
+ const std::string& exchangeName,
+ const std::string& queueName,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ Exchange::shared_ptr exchange;
+ try {
+ exchange = broker.getExchanges().get(exchangeName);
+ } catch (const ChannelException&) {}
+
+ Queue::shared_ptr queue;
+ if (!queueName.empty()) {
+ queue = broker.getQueues().find(queueName);
+ }
+
+ if (!exchange) {
+ client.queryOk(true, false, false, false, false, context.getRequestId());
+ } else if (!queueName.empty() && !queue) {
+ client.queryOk(false, true, false, false, false, context.getRequestId());
+ } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
+ client.queryOk(false, false, false, false, false, context.getRequestId());
+ } else {
+ //need to test each specified option individually
+ bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
+ bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
+ bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
+
+ client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched, context.getRequestId());
+ }
+}
void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Wed Jun 27 05:39:49 2007
@@ -63,6 +63,7 @@
ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
BasicHandler* getBasicHandler() { return &basicHandler; }
ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
+ BindingHandler* getBindingHandler() { return &bindingHandler; }
QueueHandler* getQueueHandler() { return &queueHandler; }
TxHandler* getTxHandler() { return &txHandler; }
MessageHandler* getMessageHandler() { return &messageHandler; }
@@ -142,6 +143,24 @@
const qpid::framing::FieldTable& arguments);
void delete_(const framing::MethodContext& context, uint16_t ticket,
const std::string& exchange, bool ifUnused, bool nowait);
+ void query(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const string& name);
+ };
+
+ class BindingHandlerImpl :
+ public BindingHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Binding>
+ {
+ public:
+ BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
+ void query(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& exchange,
+ const std::string& queue,
+ const std::string& routingKey,
+ const framing::FieldTable& arguments);
};
class QueueHandlerImpl :
@@ -214,6 +233,7 @@
ChannelHandlerImpl channelHandler;
ConnectionHandlerImpl connectionHandler;
ExchangeHandlerImpl exchangeHandler;
+ BindingHandlerImpl bindingHandler;
MessageHandlerImpl messageHandler;
QueueHandlerImpl queueHandler;
TxHandlerImpl txHandler;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h Wed Jun 27 05:39:49 2007
@@ -39,6 +39,8 @@
const string name;
const bool durable;
qpid::framing::FieldTable args;
+ boost::shared_ptr<Exchange> alternate;
+ uint32_t alternateUsers;
mutable uint64_t persistenceId;
public:
@@ -53,9 +55,15 @@
bool isDurable() { return durable; }
qpid::framing::FieldTable& getArgs() { return args; }
+ Exchange::shared_ptr getAlternate() { return alternate; }
+ void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; }
+ void incAlternateUsers() { alternateUsers++; }
+ void decAlternateUsers() { alternateUsers--; }
+
virtual string getType() const = 0;
virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
//PersistableExchange:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Jun 27 05:39:49 2007
@@ -23,6 +23,8 @@
#include "qpid/log/Statement.h"
#include "BrokerQueue.h"
+#include "BrokerExchange.h"
+#include "DeliverableMessage.h"
#include "MessageStore.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
@@ -233,6 +235,16 @@
void Queue::destroy()
{
+ if (alternateExchange.get()) {
+ Mutex::ScopedLock locker(lock);
+ while(!messages.empty()){
+ DeliverableMessage msg(messages.front());
+ alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
+ &(msg.getMessage().getApplicationHeaders()));
+ pop();
+ }
+ }
+
if (store) {
store->destroy(*this);
}
@@ -289,3 +301,8 @@
return result.first;
}
+
+void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
+{
+ alternateExchange = exchange;
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Wed Jun 27 05:39:49 2007
@@ -42,6 +42,7 @@
namespace broker {
class MessageStore;
class QueueRegistry;
+ class Exchange;
/**
* Thrown when exclusive access would be violated.
@@ -74,6 +75,7 @@
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
QueueBindings bindings;
+ boost::shared_ptr<Exchange> alternateExchange;
void pop();
void push(Message::shared_ptr& msg);
@@ -141,6 +143,9 @@
Message::shared_ptr dequeue();
const QueuePolicy* const getPolicy();
+
+ void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
+
//PersistableQueue support:
uint64_t getPersistenceId() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp Wed Jun 27 05:39:49 2007
@@ -31,3 +31,7 @@
queue->deliver(msg);
}
+Message& DeliverableMessage::getMessage()
+{
+ return *msg;
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h Wed Jun 27 05:39:49 2007
@@ -32,6 +32,7 @@
public:
DeliverableMessage(Message::shared_ptr& msg);
virtual void deliverTo(Queue::shared_ptr& queue);
+ Message& getMessage();
virtual ~DeliverableMessage(){}
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Wed Jun 27 05:39:49 2007
@@ -69,6 +69,25 @@
}
}
+
+bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+{
+ if (routingKey) {
+ Bindings::iterator i = bindings.find(*routingKey);
+ return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end());
+ } else if (!queue) {
+ //if no queue or routing key is specified, just report whether any bindings exist
+ return bindings.size() > 0;
+ } else {
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
+
DirectExchange::~DirectExchange(){
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Wed Jun 27 05:39:49 2007
@@ -32,7 +32,9 @@
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
- std::map<string, std::vector<Queue::shared_ptr> > bindings;
+ typedef std::vector<Queue::shared_ptr> Queues;
+ typedef std::map<string, Queues > Bindings;
+ Bindings bindings;
qpid::sys::Mutex lock;
public:
@@ -49,6 +51,8 @@
virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
virtual ~DirectExchange();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Wed Jun 27 05:39:49 2007
@@ -58,6 +58,12 @@
}
}
+bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
+{
+ return std::find(bindings.begin(), bindings.end(), queue) != bindings.end();
+}
+
+
FanOutExchange::~FanOutExchange() {}
const std::string FanOutExchange::typeName("fanout");
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Wed Jun 27 05:39:49 2007
@@ -51,6 +51,8 @@
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
virtual ~FanOutExchange();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Wed Jun 27 05:39:49 2007
@@ -80,6 +80,17 @@
}
}
+
+bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
+{
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if ( (!args || equal(i->first, *args)) && i->second == queue) {
+ return true;
+ }
+ }
+ return false;
+}
+
HeadersExchange::~HeadersExchange() {}
const std::string HeadersExchange::typeName("headers");
@@ -127,6 +138,19 @@
} else {
return false;
}
+}
+
+bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) {
+ typedef FieldTable::ValueMap Map;
+ for (Map::const_iterator i = a.getMap().begin();
+ i != a.getMap().end();
+ ++i)
+ {
+ Map::const_iterator j = b.getMap().find(i->first);
+ if (j == b.getMap().end()) return false;
+ if (!match_values(*(i->second), *(j->second))) return false;
+ }
+ return true;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Wed Jun 27 05:39:49 2007
@@ -54,9 +54,12 @@
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
virtual ~HeadersExchange();
static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
+ static bool equal(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Wed Jun 27 05:39:49 2007
@@ -162,6 +162,31 @@
}
}
+bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+{
+ if (routingKey && queue) {
+ TopicPattern key(*routingKey);
+ return isBound(queue, key);
+ } else if (!routingKey && !queue) {
+ return bindings.size() > 0;
+ } else if (routingKey) {
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (i->first.match(*routingKey)) {
+ return true;
+ }
+ }
+ return false;
+ } else {
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ Queue::vector& qv(i->second);
+ if (find(qv.begin(), qv.end(), queue) != qv.end()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
+
TopicExchange::~TopicExchange() {}
const std::string TopicExchange::typeName("topic");
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Wed Jun 27 05:39:49 2007
@@ -92,6 +92,8 @@
virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
virtual ~TopicExchange();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Wed Jun 27 05:39:49 2007
@@ -19,10 +19,12 @@
*
*/
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/BrokerExchange.h"
#include "qpid/broker/BrokerQueue.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid_test_plugin.h"
#include <iostream>
@@ -36,6 +38,7 @@
{
CPPUNIT_TEST_SUITE(ExchangeTest);
CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST(testIsBound);
CPPUNIT_TEST_SUITE_END();
public:
@@ -65,6 +68,95 @@
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
+ }
+
+ void testIsBound()
+ {
+ Queue::shared_ptr a(new Queue("a", true));
+ Queue::shared_ptr b(new Queue("b", true));
+ Queue::shared_ptr c(new Queue("c", true));
+ Queue::shared_ptr d(new Queue("d", true));
+
+ string k1("abc");
+ string k2("def");
+ string k3("xyz");
+
+ FanOutExchange fanout("fanout");
+ fanout.bind(a, "", 0);
+ fanout.bind(b, "", 0);
+ fanout.bind(c, "", 0);
+
+ CPPUNIT_ASSERT(fanout.isBound(a, 0, 0));
+ CPPUNIT_ASSERT(fanout.isBound(b, 0, 0));
+ CPPUNIT_ASSERT(fanout.isBound(c, 0, 0));
+ CPPUNIT_ASSERT(!fanout.isBound(d, 0, 0));
+
+ DirectExchange direct("direct");
+ direct.bind(a, k1, 0);
+ direct.bind(a, k3, 0);
+ direct.bind(b, k2, 0);
+ direct.bind(c, k1, 0);
+
+ CPPUNIT_ASSERT(direct.isBound(a, 0, 0));
+ CPPUNIT_ASSERT(direct.isBound(a, &k1, 0));
+ CPPUNIT_ASSERT(direct.isBound(a, &k3, 0));
+ CPPUNIT_ASSERT(!direct.isBound(a, &k2, 0));
+ CPPUNIT_ASSERT(direct.isBound(b, 0, 0));
+ CPPUNIT_ASSERT(direct.isBound(b, &k2, 0));
+ CPPUNIT_ASSERT(direct.isBound(c, &k1, 0));
+ CPPUNIT_ASSERT(!direct.isBound(d, 0, 0));
+ CPPUNIT_ASSERT(!direct.isBound(d, &k1, 0));
+ CPPUNIT_ASSERT(!direct.isBound(d, &k2, 0));
+ CPPUNIT_ASSERT(!direct.isBound(d, &k3, 0));
+
+ TopicExchange topic("topic");
+ topic.bind(a, k1, 0);
+ topic.bind(a, k3, 0);
+ topic.bind(b, k2, 0);
+ topic.bind(c, k1, 0);
+
+ CPPUNIT_ASSERT(topic.isBound(a, 0, 0));
+ CPPUNIT_ASSERT(topic.isBound(a, &k1, 0));
+ CPPUNIT_ASSERT(topic.isBound(a, &k3, 0));
+ CPPUNIT_ASSERT(!topic.isBound(a, &k2, 0));
+ CPPUNIT_ASSERT(topic.isBound(b, 0, 0));
+ CPPUNIT_ASSERT(topic.isBound(b, &k2, 0));
+ CPPUNIT_ASSERT(topic.isBound(c, &k1, 0));
+ CPPUNIT_ASSERT(!topic.isBound(d, 0, 0));
+ CPPUNIT_ASSERT(!topic.isBound(d, &k1, 0));
+ CPPUNIT_ASSERT(!topic.isBound(d, &k2, 0));
+ CPPUNIT_ASSERT(!topic.isBound(d, &k3, 0));
+
+ HeadersExchange headers("headers");
+ FieldTable args1;
+ args1.setString("x-match", "all");
+ args1.setString("a", "A");
+ args1.setInt("b", 1);
+ FieldTable args2;
+ args2.setString("x-match", "any");
+ args2.setString("a", "A");
+ args2.setInt("b", 1);
+ FieldTable args3;
+ args3.setString("x-match", "any");
+ args3.setString("c", "C");
+ args3.setInt("b", 6);
+
+ headers.bind(a, "", &args1);
+ headers.bind(a, "", &args3);
+ headers.bind(b, "", &args2);
+ headers.bind(c, "", &args1);
+
+ CPPUNIT_ASSERT(headers.isBound(a, 0, 0));
+ CPPUNIT_ASSERT(headers.isBound(a, 0, &args1));
+ CPPUNIT_ASSERT(headers.isBound(a, 0, &args3));
+ CPPUNIT_ASSERT(!headers.isBound(a, 0, &args2));
+ CPPUNIT_ASSERT(headers.isBound(b, 0, 0));
+ CPPUNIT_ASSERT(headers.isBound(b, 0, &args2));
+ CPPUNIT_ASSERT(headers.isBound(c, 0, &args1));
+ CPPUNIT_ASSERT(!headers.isBound(d, 0, 0));
+ CPPUNIT_ASSERT(!headers.isBound(d, 0, &args1));
+ CPPUNIT_ASSERT(!headers.isBound(d, 0, &args2));
+ CPPUNIT_ASSERT(!headers.isBound(d, 0, &args3));
}
};
Added: incubator/qpid/trunk/qpid/python/tests_0-9/query.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-9/query.py?view=auto&rev=551144
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-9/query.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-9/query.py Wed Jun 27 05:39:49 2007
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class QueryTests(TestBase):
+ """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
+
+ def test_exchange_query(self):
+ """
+ Test that the exchange_query method works as expected
+ """
+ channel = self.channel
+ #check returned type for the standard exchanges
+ self.assertEqual("direct", channel.exchange_query(name="amq.direct").type)
+ self.assertEqual("topic", channel.exchange_query(name="amq.topic").type)
+ self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type)
+ self.assertEqual("headers", channel.exchange_query(name="amq.match").type)
+ self.assertEqual("direct", channel.exchange_query(name="").type)
+ #declare an exchange
+ channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+ #check that the result of a query is as expected
+ response = channel.exchange_query(name="my-test-exchange")
+ self.assertEqual("direct", response.type)
+ self.assertEqual(False, response.durable)
+ self.assertEqual(False, response.not_found)
+ #delete the exchange
+ channel.exchange_delete(exchange="my-test-exchange")
+ #check that the query now reports not-found
+ self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
+
+ def test_binding_query_direct(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.direct")
+
+ def test_binding_query_topic(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.topic")
+
+ def binding_query_with_key(self, exchange_name):
+ channel = self.channel
+ #setup: create two queues
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+
+ channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test matched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test unmatched queue, matched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
+
+
+ def test_binding_query_fanout(self):
+ """
+ Test that the binding_query method works as expected with fanout exchange
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_bind(exchange="amq.fanout", queue="used-queue")
+
+ response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+
+ def test_binding_query_header(self):
+ """
+ Test that the binding_query method works as expected with headers exchanges
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+ response = channel.binding_query(exchange="amq.match", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+
+
Propchange: incubator/qpid/trunk/qpid/python/tests_0-9/query.py
------------------------------------------------------------------------------
svn:eol-style = native