You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/06/27 14:39:50 UTC

svn commit: r551144 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/tests/ python/tests_0-9/

Author: gsim
Date: Wed Jun 27 05:39:49 2007
New Revision: 551144

URL: http://svn.apache.org/viewvc?view=rev&rev=551144
Log:
Added preview of exchange- and binding- query methods that have been approved for 0-10.


Added:
    incubator/qpid/trunk/qpid/python/tests_0-9/query.py   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Wed Jun 27 05:39:49 2007
@@ -40,6 +40,7 @@
     channelHandler(*this),
     connectionHandler(*this),
     exchangeHandler(*this),
+    bindingHandler(*this),
     messageHandler(*this),
     queueHandler(*this),
     txHandler(*this),
@@ -159,6 +160,49 @@
     broker.getExchanges().destroy(name);
     if(!nowait) client.deleteOk(context.getRequestId());
 } 
+
+void BrokerAdapter::ExchangeHandlerImpl::query(const MethodContext& context, u_int16_t /*ticket*/, const string& name)
+{
+    try {
+        Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+        client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs(), context.getRequestId());
+    } catch (const ChannelException& e) {
+        client.queryOk("", false, true, FieldTable(), context.getRequestId());        
+    }
+}
+
+void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& context,
+                                               u_int16_t /*ticket*/,
+                                               const std::string& exchangeName,
+                                               const std::string& queueName,
+                                               const std::string& key,
+                                               const framing::FieldTable& args)
+{
+    Exchange::shared_ptr exchange;
+    try {
+        exchange = broker.getExchanges().get(exchangeName);
+    } catch (const ChannelException&) {}
+
+    Queue::shared_ptr queue;
+    if (!queueName.empty()) {
+        queue = broker.getQueues().find(queueName);
+    }
+
+    if (!exchange) {
+        client.queryOk(true, false, false, false, false, context.getRequestId());
+    } else if (!queueName.empty() && !queue) {
+        client.queryOk(false, true, false, false, false, context.getRequestId());
+    } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
+            client.queryOk(false, false, false, false, false, context.getRequestId());
+    } else {
+        //need to test each specified option individually
+        bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
+        bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
+        bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
+
+        client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched, context.getRequestId());
+    }
+}
 
 void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name, 
                                               bool passive, bool durable, bool exclusive, 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Wed Jun 27 05:39:49 2007
@@ -63,6 +63,7 @@
     ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
     BasicHandler* getBasicHandler() { return &basicHandler; }
     ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
+    BindingHandler* getBindingHandler() { return &bindingHandler; }
     QueueHandler* getQueueHandler() { return &queueHandler; }
     TxHandler* getTxHandler() { return &txHandler;  }
     MessageHandler* getMessageHandler() { return &messageHandler;  }
@@ -142,6 +143,24 @@
                      const qpid::framing::FieldTable& arguments); 
         void delete_(const framing::MethodContext& context, uint16_t ticket,
                      const std::string& exchange, bool ifUnused, bool nowait); 
+        void query(const framing::MethodContext& context,
+                   u_int16_t ticket,
+                   const string& name);
+    };
+
+    class BindingHandlerImpl : 
+        public BindingHandler,
+            public HandlerImpl<framing::AMQP_ClientProxy::Binding>
+    {
+    public:
+        BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
+        void query(const framing::MethodContext& context,
+                   u_int16_t ticket,
+                   const std::string& exchange,
+                   const std::string& queue,
+                   const std::string& routingKey,
+                   const framing::FieldTable& arguments);
     };
 
     class QueueHandlerImpl :
@@ -214,6 +233,7 @@
     ChannelHandlerImpl channelHandler;
     ConnectionHandlerImpl connectionHandler;
     ExchangeHandlerImpl exchangeHandler;
+    BindingHandlerImpl bindingHandler;
     MessageHandlerImpl messageHandler;
     QueueHandlerImpl queueHandler;
     TxHandlerImpl txHandler;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerExchange.h Wed Jun 27 05:39:49 2007
@@ -39,6 +39,8 @@
             const string name;
             const bool durable;
             qpid::framing::FieldTable args;
+            boost::shared_ptr<Exchange> alternate;
+            uint32_t alternateUsers;
             mutable uint64_t persistenceId;
 
         public:
@@ -53,9 +55,15 @@
             bool isDurable() { return durable; }
             qpid::framing::FieldTable& getArgs() { return args; }
 
+            Exchange::shared_ptr getAlternate() { return alternate; }
+            void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; }
+            void incAlternateUsers() { alternateUsers++; }
+            void decAlternateUsers() { alternateUsers--; }
+
             virtual string getType() const = 0;
             virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
             virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+            virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
             virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
 
             //PersistableExchange:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Jun 27 05:39:49 2007
@@ -23,6 +23,8 @@
 
 #include "qpid/log/Statement.h"
 #include "BrokerQueue.h"
+#include "BrokerExchange.h"
+#include "DeliverableMessage.h"
 #include "MessageStore.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
@@ -233,6 +235,16 @@
 
 void Queue::destroy()
 {
+    if (alternateExchange.get()) {
+        Mutex::ScopedLock locker(lock);
+        while(!messages.empty()){
+            DeliverableMessage msg(messages.front());
+            alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
+                                     &(msg.getMessage().getApplicationHeaders()));
+            pop();
+        }
+    }
+
     if (store) {
         store->destroy(*this);
     }
@@ -289,3 +301,8 @@
     return result.first;
 }
 
+
+void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
+{
+    alternateExchange = exchange;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Wed Jun 27 05:39:49 2007
@@ -42,6 +42,7 @@
     namespace broker {
         class MessageStore;
         class QueueRegistry;
+        class Exchange;
 
         /**
          * Thrown when exclusive access would be violated.
@@ -74,6 +75,7 @@
             framing::FieldTable settings;
             std::auto_ptr<QueuePolicy> policy;            
             QueueBindings bindings;
+            boost::shared_ptr<Exchange> alternateExchange;
 
             void pop();
             void push(Message::shared_ptr& msg);
@@ -141,6 +143,9 @@
             Message::shared_ptr dequeue();
 
             const QueuePolicy* const getPolicy();
+
+            void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
+
 
             //PersistableQueue support:
             uint64_t getPersistenceId() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp Wed Jun 27 05:39:49 2007
@@ -31,3 +31,7 @@
     queue->deliver(msg);    
 }
 
+Message& DeliverableMessage::getMessage()
+{
+    return *msg;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h Wed Jun 27 05:39:49 2007
@@ -32,6 +32,7 @@
         public:
             DeliverableMessage(Message::shared_ptr& msg);
             virtual void deliverTo(Queue::shared_ptr& queue);
+            Message& getMessage();
             virtual ~DeliverableMessage(){}
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Wed Jun 27 05:39:49 2007
@@ -69,6 +69,25 @@
     }
 }
 
+
+bool DirectExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+{
+    if (routingKey) {
+        Bindings::iterator i = bindings.find(*routingKey);
+        return i != bindings.end() && (!queue || find(i->second.begin(), i->second.end(), queue) != i->second.end());
+    } else if (!queue) {
+        //if no queue or routing key is specified, just report whether any bindings exist
+        return bindings.size() > 0;
+    } else {
+        for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+            if (find(i->second.begin(), i->second.end(), queue) != i->second.end()) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
+
 DirectExchange::~DirectExchange(){
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Wed Jun 27 05:39:49 2007
@@ -32,7 +32,9 @@
 namespace qpid {
 namespace broker {
     class DirectExchange : public virtual Exchange{
-        std::map<string, std::vector<Queue::shared_ptr> > bindings;
+        typedef std::vector<Queue::shared_ptr> Queues;
+        typedef std::map<string, Queues > Bindings;
+        Bindings bindings;
         qpid::sys::Mutex lock;
 
     public:
@@ -49,6 +51,8 @@
         virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
 
         virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+        virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
 
         virtual ~DirectExchange();
     };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Wed Jun 27 05:39:49 2007
@@ -58,6 +58,12 @@
     }
 }
 
+bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
+{
+    return std::find(bindings.begin(), bindings.end(), queue) != bindings.end();
+}
+
+
 FanOutExchange::~FanOutExchange() {}
 
 const std::string FanOutExchange::typeName("fanout");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Wed Jun 27 05:39:49 2007
@@ -51,6 +51,8 @@
 
     virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
 
+    virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
     virtual ~FanOutExchange();
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Wed Jun 27 05:39:49 2007
@@ -80,6 +80,17 @@
     }
 }
 
+
+bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
+{
+    for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+        if ( (!args || equal(i->first, *args)) && i->second == queue) {
+            return true;
+        }
+    }
+    return false;
+}
+
 HeadersExchange::~HeadersExchange() {}
 
 const std::string HeadersExchange::typeName("headers");
@@ -127,6 +138,19 @@
     } else {
         return false;
     }
+}
+
+bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) {
+    typedef FieldTable::ValueMap Map;
+    for (Map::const_iterator i = a.getMap().begin();
+         i != a.getMap().end();
+         ++i)
+    {
+        Map::const_iterator j = b.getMap().find(i->first);
+        if (j == b.getMap().end()) return false;
+        if (!match_values(*(i->second), *(j->second))) return false;
+    }
+    return true;
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Wed Jun 27 05:39:49 2007
@@ -54,9 +54,12 @@
 
     virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
 
+    virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
     virtual ~HeadersExchange();
 
     static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
+    static bool equal(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Wed Jun 27 05:39:49 2007
@@ -162,6 +162,31 @@
     }
 }
 
+bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
+{
+    if (routingKey && queue) {
+        TopicPattern key(*routingKey);
+        return isBound(queue, key);
+    } else if (!routingKey && !queue) {
+        return bindings.size() > 0;
+    } else if (routingKey) {
+        for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+            if (i->first.match(*routingKey)) {
+                return true;
+            }
+        }            
+        return false;
+    } else {
+        for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+            Queue::vector& qv(i->second);
+            if (find(qv.begin(), qv.end(), queue) != qv.end()) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
+
 TopicExchange::~TopicExchange() {}
 
 const std::string TopicExchange::typeName("topic");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Wed Jun 27 05:39:49 2007
@@ -92,6 +92,8 @@
 
     virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
 
+    virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
     virtual ~TopicExchange();
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp?view=diff&rev=551144&r1=551143&r2=551144
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ExchangeTest.cpp Wed Jun 27 05:39:49 2007
@@ -19,10 +19,12 @@
  *
  */
 
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/BrokerExchange.h"
 #include "qpid/broker/BrokerQueue.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/FanOutExchange.h"
+#include "qpid/broker/HeadersExchange.h"
 #include "qpid/broker/TopicExchange.h"
 #include "qpid_test_plugin.h"
 #include <iostream>
@@ -36,6 +38,7 @@
 {
     CPPUNIT_TEST_SUITE(ExchangeTest);
     CPPUNIT_TEST(testMe);
+    CPPUNIT_TEST(testIsBound);
     CPPUNIT_TEST_SUITE_END();
 
   public:
@@ -65,6 +68,95 @@
         topic.route(msg, "abc", 0);
         direct.route(msg, "abc", 0);
 
+    }
+
+    void testIsBound()
+    {
+        Queue::shared_ptr a(new Queue("a", true));
+        Queue::shared_ptr b(new Queue("b", true));
+        Queue::shared_ptr c(new Queue("c", true));
+        Queue::shared_ptr d(new Queue("d", true));
+        
+        string k1("abc");
+        string k2("def");
+        string k3("xyz");
+
+        FanOutExchange fanout("fanout");
+        fanout.bind(a, "", 0);
+        fanout.bind(b, "", 0);
+        fanout.bind(c, "", 0);
+
+        CPPUNIT_ASSERT(fanout.isBound(a, 0, 0));
+        CPPUNIT_ASSERT(fanout.isBound(b, 0, 0));
+        CPPUNIT_ASSERT(fanout.isBound(c, 0, 0));
+        CPPUNIT_ASSERT(!fanout.isBound(d, 0, 0));
+
+        DirectExchange direct("direct");
+        direct.bind(a, k1, 0);
+        direct.bind(a, k3, 0);
+        direct.bind(b, k2, 0);
+        direct.bind(c, k1, 0);
+
+        CPPUNIT_ASSERT(direct.isBound(a, 0, 0));
+        CPPUNIT_ASSERT(direct.isBound(a, &k1, 0));
+        CPPUNIT_ASSERT(direct.isBound(a, &k3, 0));
+        CPPUNIT_ASSERT(!direct.isBound(a, &k2, 0));
+        CPPUNIT_ASSERT(direct.isBound(b, 0, 0));
+        CPPUNIT_ASSERT(direct.isBound(b, &k2, 0));
+        CPPUNIT_ASSERT(direct.isBound(c, &k1, 0));
+        CPPUNIT_ASSERT(!direct.isBound(d, 0, 0));
+        CPPUNIT_ASSERT(!direct.isBound(d, &k1, 0));
+        CPPUNIT_ASSERT(!direct.isBound(d, &k2, 0));
+        CPPUNIT_ASSERT(!direct.isBound(d, &k3, 0));
+
+        TopicExchange topic("topic");
+        topic.bind(a, k1, 0);
+        topic.bind(a, k3, 0);
+        topic.bind(b, k2, 0);
+        topic.bind(c, k1, 0);
+
+        CPPUNIT_ASSERT(topic.isBound(a, 0, 0));
+        CPPUNIT_ASSERT(topic.isBound(a, &k1, 0));
+        CPPUNIT_ASSERT(topic.isBound(a, &k3, 0));
+        CPPUNIT_ASSERT(!topic.isBound(a, &k2, 0));
+        CPPUNIT_ASSERT(topic.isBound(b, 0, 0));
+        CPPUNIT_ASSERT(topic.isBound(b, &k2, 0));
+        CPPUNIT_ASSERT(topic.isBound(c, &k1, 0));
+        CPPUNIT_ASSERT(!topic.isBound(d, 0, 0));
+        CPPUNIT_ASSERT(!topic.isBound(d, &k1, 0));
+        CPPUNIT_ASSERT(!topic.isBound(d, &k2, 0));
+        CPPUNIT_ASSERT(!topic.isBound(d, &k3, 0));
+
+        HeadersExchange headers("headers");
+        FieldTable args1;
+        args1.setString("x-match", "all");
+        args1.setString("a", "A");
+        args1.setInt("b", 1);
+        FieldTable args2;
+        args2.setString("x-match", "any");
+        args2.setString("a", "A");
+        args2.setInt("b", 1);
+        FieldTable args3;
+        args3.setString("x-match", "any");
+        args3.setString("c", "C");
+        args3.setInt("b", 6);
+
+        headers.bind(a, "", &args1);
+        headers.bind(a, "", &args3);
+        headers.bind(b, "", &args2);
+        headers.bind(c, "", &args1);
+        
+        CPPUNIT_ASSERT(headers.isBound(a, 0, 0));
+        CPPUNIT_ASSERT(headers.isBound(a, 0, &args1));
+        CPPUNIT_ASSERT(headers.isBound(a, 0, &args3));
+        CPPUNIT_ASSERT(!headers.isBound(a, 0, &args2));
+        CPPUNIT_ASSERT(headers.isBound(b, 0, 0));
+        CPPUNIT_ASSERT(headers.isBound(b, 0, &args2));
+        CPPUNIT_ASSERT(headers.isBound(c, 0, &args1));
+        CPPUNIT_ASSERT(!headers.isBound(d, 0, 0));
+        CPPUNIT_ASSERT(!headers.isBound(d, 0, &args1));
+        CPPUNIT_ASSERT(!headers.isBound(d, 0, &args2));
+        CPPUNIT_ASSERT(!headers.isBound(d, 0, &args3));
     }
 };
     

Added: incubator/qpid/trunk/qpid/python/tests_0-9/query.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-9/query.py?view=auto&rev=551144
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-9/query.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-9/query.py Wed Jun 27 05:39:49 2007
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class QueryTests(TestBase):
+    """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
+
+    def test_exchange_query(self):
+        """
+        Test that the exchange_query method works as expected
+        """
+        channel = self.channel
+        #check returned type for the standard exchanges
+        self.assertEqual("direct", channel.exchange_query(name="amq.direct").type)
+        self.assertEqual("topic", channel.exchange_query(name="amq.topic").type)
+        self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type)
+        self.assertEqual("headers", channel.exchange_query(name="amq.match").type)
+        self.assertEqual("direct", channel.exchange_query(name="").type)        
+        #declare an exchange
+        channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+        #check that the result of a query is as expected
+        response = channel.exchange_query(name="my-test-exchange")
+        self.assertEqual("direct", response.type)
+        self.assertEqual(False, response.durable)
+        self.assertEqual(False, response.not_found)
+        #delete the exchange
+        channel.exchange_delete(exchange="my-test-exchange")
+        #check that the query now reports not-found
+        self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
+
+    def test_binding_query_direct(self):
+        """
+        Test that the binding_query method works as expected with the direct exchange
+        """
+        self.binding_query_with_key("amq.direct")
+
+    def test_binding_query_topic(self):
+        """
+        Test that the binding_query method works as expected with the direct exchange
+        """
+        self.binding_query_with_key("amq.topic")
+
+    def binding_query_with_key(self, exchange_name):
+        channel = self.channel
+        #setup: create two queues
+        channel.queue_declare(queue="used-queue", exclusive=True)
+        channel.queue_declare(queue="unused-queue", exclusive=True)
+        
+        channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+
+        # test detection of any binding to specific queue
+        response = channel.binding_query(exchange=exchange_name, queue="used-queue")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+
+        # test detection of specific binding to any queue
+        response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.key_not_matched)        
+
+        # test detection of specific binding to specific queue
+        response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+        self.assertEqual(False, response.key_not_matched)        
+
+        # test unmatched queue, unspecified binding
+        response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+
+        # test unspecified queue, unmatched binding
+        response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.key_not_matched)        
+
+        # test matched queue, unmatched binding
+        response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(False, response.queue_not_matched)        
+        self.assertEqual(True, response.key_not_matched)        
+
+        # test unmatched queue, matched binding
+        response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assertEqual(False, response.key_not_matched)        
+
+        # test unmatched queue, unmatched binding
+        response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assertEqual(True, response.key_not_matched)        
+
+        #test exchange not found
+        self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+        #test queue not found
+        self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
+
+
+    def test_binding_query_fanout(self):
+        """
+        Test that the binding_query method works as expected with fanout exchange
+        """
+        channel = self.channel
+        #setup
+        channel.queue_declare(queue="used-queue", exclusive=True)
+        channel.queue_declare(queue="unused-queue", exclusive=True)
+        channel.queue_bind(exchange="amq.fanout", queue="used-queue")
+
+        response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
+        self.assertEqual(False, response.exchange_not_found)
+
+    def test_binding_query_header(self):
+        """
+        Test that the binding_query method works as expected with headers exchanges
+        """
+        channel = self.channel
+        #setup
+        channel.queue_declare(queue="used-queue", exclusive=True)
+        channel.queue_declare(queue="unused-queue", exclusive=True)
+        channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+        response = channel.binding_query(exchange="amq.match", queue="used-queue")
+        self.assertEqual(False, response.exchange_not_found)
+
+        

Propchange: incubator/qpid/trunk/qpid/python/tests_0-9/query.py
------------------------------------------------------------------------------
    svn:eol-style = native