You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/11/12 14:42:50 UTC

svn commit: r1541059 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/broker/amqp/ qpid/sys/ tests/

Author: gsim
Date: Tue Nov 12 13:42:50 2013
New Revision: 1541059

URL: http://svn.apache.org/r1541059
Log:
QPID-5251: allow policies to be specified that will create topics or queues on demand if they match the specified pattern

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h
    qpid/trunk/qpid/cpp/src/tests/policies.py
Modified:
    qpid/trunk/qpid/cpp/src/amqp.cmake
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h
    qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml
    qpid/trunk/qpid/cpp/src/qpid/sys/regex.h
    qpid/trunk/qpid/cpp/src/tests/swig_python_tests

Modified: qpid/trunk/qpid/cpp/src/amqp.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/amqp.cmake?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/amqp.cmake (original)
+++ qpid/trunk/qpid/cpp/src/amqp.cmake Tue Nov 12 13:42:50 2013
@@ -104,6 +104,8 @@ if (BUILD_AMQP)
          qpid/broker/amqp/ManagedOutgoingLink.cpp
          qpid/broker/amqp/Message.h
          qpid/broker/amqp/Message.cpp
+         qpid/broker/amqp/NodePolicy.h
+         qpid/broker/amqp/NodePolicy.cpp
          qpid/broker/amqp/NodeProperties.h
          qpid/broker/amqp/NodeProperties.cpp
          qpid/broker/amqp/Outgoing.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.cpp Tue Nov 12 13:42:50 2013
@@ -33,6 +33,7 @@ PersistableObject::PersistableObject(con
 PersistableObject::PersistableObject() : id(0) {}
 PersistableObject::~PersistableObject() {}
 const std::string& PersistableObject::getName() const { return name; }
+const std::string& PersistableObject::getType() const { return type; }
 void PersistableObject::setPersistenceId(uint64_t i) const { id = i; }
 uint64_t PersistableObject::getPersistenceId() const { return id; }
 void PersistableObject::encode(framing::Buffer& buffer) const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableObject.h Tue Nov 12 13:42:50 2013
@@ -41,6 +41,7 @@ class PersistableObject : public Persist
     QPID_BROKER_EXTERN PersistableObject(const std::string& name, const std::string& type, const qpid::types::Variant::Map properties);
     QPID_BROKER_EXTERN virtual ~PersistableObject();
     QPID_BROKER_EXTERN const std::string& getName() const;
+    QPID_BROKER_EXTERN const std::string& getType() const;
     QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
     QPID_BROKER_EXTERN uint64_t getPersistenceId() const;
     QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Tue Nov 12 13:42:50 2013
@@ -62,6 +62,7 @@ const std::string FILTER("qpid.filter");
 const std::string LIFETIME_POLICY("qpid.lifetime-policy");
 const std::string DELETE_IF_UNUSED_KEY("delete-if-unused");
 const std::string DELETE_IF_UNUSED_AND_EMPTY_KEY("delete-if-unused-and-empty");
+const std::string MANUAL("manual");
 
 const std::string LVQ_LEGACY("qpid.last_value_queue");
 const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
@@ -227,8 +228,12 @@ bool QueueSettings::handle(const std::st
     } else if (key == LIFETIME_POLICY) {
         if (value.asString() == DELETE_IF_UNUSED_KEY) {
             lifetime = DELETE_IF_UNUSED;
+            autodelete = true;
         } else if (value.asString() == DELETE_IF_UNUSED_AND_EMPTY_KEY) {
             lifetime = DELETE_IF_UNUSED_AND_EMPTY;
+            autodelete = true;
+        } else if (value.asString() == MANUAL) {
+            autodelete = false;
         } else {
             QPID_LOG(warning, "Invalid value for " << LIFETIME_POLICY << ": " << value);
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp Tue Nov 12 13:42:50 2013
@@ -23,10 +23,11 @@
 namespace qpid {
 namespace broker {
 namespace amqp {
-BrokerContext::BrokerContext(Broker& b, Interconnects& i, TopicRegistry& t, const std::string& d) : broker(b), interconnects(i), topics(t), domain(d) {}
-BrokerContext::BrokerContext(BrokerContext& c) : broker(c.broker), interconnects(c.interconnects), topics(c.topics), domain(c.domain) {}
+BrokerContext::BrokerContext(Broker& b, Interconnects& i, TopicRegistry& t, NodePolicyRegistry& np, const std::string& d) : broker(b), interconnects(i), topics(t), nodePolicies(np), domain(d) {}
+BrokerContext::BrokerContext(BrokerContext& c) : broker(c.broker), interconnects(c.interconnects), topics(c.topics), nodePolicies(c.nodePolicies), domain(c.domain) {}
 Broker& BrokerContext::getBroker() { return broker; }
 Interconnects& BrokerContext::getInterconnects() { return interconnects; }
 TopicRegistry& BrokerContext::getTopics() { return topics; }
+NodePolicyRegistry& BrokerContext::getNodePolicies() { return nodePolicies; }
 std::string BrokerContext::getDomain() { return domain; }
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h Tue Nov 12 13:42:50 2013
@@ -29,22 +29,25 @@ class Broker;
 namespace amqp {
 class Interconnects;
 class TopicRegistry;
+class NodePolicyRegistry;
 /**
  * Context providing access to broker scoped entities.
  */
 class BrokerContext
 {
   public:
-    BrokerContext(Broker&, Interconnects&, TopicRegistry&, const std::string&);
+    BrokerContext(Broker&, Interconnects&, TopicRegistry&, NodePolicyRegistry&, const std::string&);
     BrokerContext(BrokerContext&);
     Broker& getBroker();
     Interconnects& getInterconnects();
     TopicRegistry& getTopics();
+    NodePolicyRegistry& getNodePolicies();
     std::string getDomain();
   private:
     Broker& broker;
     Interconnects& interconnects;
     TopicRegistry& topics;
+    NodePolicyRegistry& nodePolicies;
     std::string domain;
 };
 }}} // namespace qpid::broker::amqp

Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp?rev=1541059&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp Tue Nov 12 13:42:50 2013
@@ -0,0 +1,325 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/amqp/NodePolicy.h"
+#include "qpid/broker/amqp/Connection.h"
+#include "qpid/broker/amqp/Topic.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/types/Exception.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+const std::string DURABLE("durable");
+const std::string AUTO_DELETE("auto-delete");
+const std::string LIFETIME_POLICY("qpid.lifetime-policy");
+const std::string MANUAL("manual");
+const std::string UNUSED("delete-if-unused");
+const std::string UNUSED_AND_EMPTY("delete-if-unused-and-empty");
+const std::string QUEUE_POLICY("QueuePolicy");
+const std::string TOPIC_POLICY("TopicPolicy");
+const std::string QUEUE("queue");
+const std::string TOPIC("topic");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string EXCHANGE_TYPE("exchange-type");
+const std::string QPID_MSG_SEQUENCE("qpid.msg_sequence");
+const std::string QPID_IVE("qpid.ive");
+const std::string EMPTY;
+
+template <typename T>
+T get(const std::string& k, const qpid::types::Variant::Map& m, T defaultValue)
+{
+    qpid::types::Variant::Map::const_iterator i = m.find(k);
+    if (i == m.end()) return defaultValue;
+    else return i->second;
+}
+
+std::string getProperty(const std::string& k, const qpid::types::Variant::Map& m)
+{
+    return get(k, m, EMPTY);
+}
+
+bool testProperty(const std::string& k, const qpid::types::Variant::Map& m)
+{
+    return get(k, m, false);
+}
+
+qpid::types::Variant::Map filterForQueue(const qpid::types::Variant::Map& properties)
+{
+    qpid::types::Variant::Map filtered = properties;
+    filtered.erase(DURABLE);
+    filtered.erase(AUTO_DELETE);
+    filtered.erase(ALTERNATE_EXCHANGE);
+    return filtered;
+}
+qpid::types::Variant::Map filterForTopic(const qpid::types::Variant::Map& properties)
+{
+    qpid::types::Variant::Map filtered = properties;
+    filtered.erase(DURABLE);
+    filtered.erase(EXCHANGE_TYPE);
+    filtered.erase(AUTO_DELETE);
+    filtered.erase(QPID_IVE);
+    filtered.erase(QPID_MSG_SEQUENCE);
+    return filtered;
+}
+void copy(const std::string& key, const qpid::types::Variant::Map& from, qpid::types::Variant::Map& to)
+{
+    qpid::types::Variant::Map::const_iterator i = from.find(key);
+    if (i != from.end()) to.insert(*i);
+}
+
+}
+NodePolicy::NodePolicy(const std::string& type, const std::string& ptrn, const qpid::types::Variant::Map& props)
+    : PersistableObject(ptrn, type, props), pattern(ptrn),
+      durable(testProperty(DURABLE, props)),
+      alternateExchange(getProperty(ALTERNATE_EXCHANGE, props)),
+      compiled(pattern) {}
+
+NodePolicy::~NodePolicy() {}
+
+const std::string& NodePolicy::getPattern() const
+{
+    return pattern;
+}
+
+bool NodePolicy::isDurable() const
+{
+    return durable;
+}
+
+bool NodePolicy::match(const std::string& name) const
+{
+    return qpid::sys::regex_match(name, compiled);
+}
+
+QueuePolicy::QueuePolicy(Broker& broker, const std::string& pattern, const qpid::types::Variant::Map& props)
+    : NodePolicy(QUEUE_POLICY, pattern, props),
+      queueSettings(durable, testProperty(AUTO_DELETE, props))
+{
+    qpid::types::Variant::Map unused;
+    qpid::types::Variant::Map filtered = filterForQueue(props);
+    //if queue is not durable and neither lifetime policy nor
+    //autodelete were explicitly specified, clean it up when not
+    //needed by default:
+    if (!queueSettings.durable && props.find(LIFETIME_POLICY) == props.end() && props.find(AUTO_DELETE) == props.end()) {
+        filtered[LIFETIME_POLICY] = UNUSED_AND_EMPTY;
+    }
+    queueSettings.populate(filtered, unused);
+    qpid::amqp_0_10::translate(filtered, queueSettings.storeSettings);
+
+    qpid::management::ManagementAgent* agent = broker.getManagementAgent();
+    if (agent != 0) {
+        policy = _qmf::QueuePolicy::shared_ptr(new _qmf::QueuePolicy(agent, this, pattern));
+        policy->set_properties(props);
+        agent->addObject(policy);
+    }
+}
+QueuePolicy::~QueuePolicy()
+{
+    if (policy != 0) policy->resourceDestroy();
+}
+
+
+std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > QueuePolicy::create(const std::string& name, Connection& connection)
+{
+    std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result;
+    result.first = connection.getBroker().createQueue(name, queueSettings, 0/*not exclusive*/, alternateExchange, connection.getUserId(), connection.getId()).first;
+    return result;
+}
+
+boost::shared_ptr<qpid::management::ManagementObject> QueuePolicy::GetManagementObject() const
+{
+    return policy;
+}
+
+TopicPolicy::TopicPolicy(Broker& broker, const std::string& pattern, const qpid::types::Variant::Map& props)
+    : NodePolicy(TOPIC_POLICY, pattern, props), exchangeType(getProperty(EXCHANGE_TYPE, props)),
+      autodelete(get(AUTO_DELETE, props, !durable))
+{
+    qpid::types::Variant::Map::const_iterator i = props.find(LIFETIME_POLICY);
+    if (i != props.end()) {
+        if (i->second == MANUAL) {
+            autodelete = false;
+        } else if (i->second == UNUSED || i->second == UNUSED_AND_EMPTY/*though empty doesn't mean much for an exchange*/) {
+            autodelete = true;
+        } else {
+            QPID_LOG(warning, "Did not recognise lifetime policy " << i->second << " in topic policy for " << pattern);
+        }
+    }
+    topicSettings = filterForTopic(props);
+    copy(QPID_IVE, props, exchangeSettings);
+    copy(QPID_MSG_SEQUENCE, props, exchangeSettings);
+    if (exchangeType.empty()) exchangeType = TOPIC;
+
+    qpid::management::ManagementAgent* agent = broker.getManagementAgent();
+    if (agent != 0) {
+        policy = _qmf::TopicPolicy::shared_ptr(new _qmf::TopicPolicy(agent, this, pattern));
+        policy->set_properties(props);
+        agent->addObject(policy);
+    }
+}
+
+TopicPolicy::~TopicPolicy()
+{
+    if (policy != 0) policy->resourceDestroy();
+}
+
+std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > TopicPolicy::create(const std::string& name, Connection& connection)
+{
+    std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result;
+    qpid::framing::FieldTable args;
+    qpid::amqp_0_10::translate(exchangeSettings, args);
+    boost::shared_ptr<Exchange> exchange = connection.getBroker().createExchange(name, exchangeType, isDurable(), autodelete, alternateExchange,
+                                                                                 args, connection.getUserId(), connection.getId()).first;
+    result.second = connection.getTopics().createTopic(connection.getBroker(), name, exchange, topicSettings);
+    return result;
+}
+
+boost::shared_ptr<qpid::management::ManagementObject> TopicPolicy::GetManagementObject() const
+{
+    return policy;
+}
+
+boost::shared_ptr<NodePolicy> NodePolicyRegistry::createQueuePolicy(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties)
+{
+    boost::shared_ptr<NodePolicy> nodePolicy(new QueuePolicy(broker, name, properties));
+    add(nodePolicy);
+    return nodePolicy;
+}
+
+boost::shared_ptr<NodePolicy> NodePolicyRegistry::createTopicPolicy(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties)
+{
+    boost::shared_ptr<NodePolicy> nodePolicy(new TopicPolicy(broker, name, properties));
+    add(nodePolicy);
+    return nodePolicy;
+}
+
+boost::shared_ptr<NodePolicy> NodePolicyRegistry::createNodePolicy(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties)
+{
+    if (type == QUEUE_POLICY) {
+         return createQueuePolicy(broker, name, properties);
+    } else if (type == TOPIC_POLICY) {
+         return createTopicPolicy(broker, name, properties);
+    } else {
+        return boost::shared_ptr<NodePolicy>();
+    }
+}
+
+bool NodePolicyRegistry::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                                      const std::string& /*userId*/, const std::string& /*connectionId*/)
+{
+    boost::shared_ptr<NodePolicy> nodePolicy = createNodePolicy(broker, type, name, properties);
+    if (nodePolicy) {
+        if (nodePolicy->isDurable()) broker.getStore().create(*nodePolicy);
+        return true;
+    } else {
+        return false;
+    }
+}
+bool NodePolicyRegistry::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map&,
+                                      const std::string& /*userId*/, const std::string& /*connectionId*/)
+{
+    if (type == QUEUE_POLICY || type == TOPIC_POLICY) {
+        boost::shared_ptr<NodePolicy> nodePolicy = remove(name, type);
+        if (nodePolicy) {
+            if (nodePolicy->isDurable()) broker.getStore().destroy(*nodePolicy);
+            return true;
+        } else {
+            return false;
+        }
+    } else {
+        return false;
+    }
+}
+bool NodePolicyRegistry::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                                       uint64_t persistenceId)
+{
+
+    boost::shared_ptr<NodePolicy> nodePolicy = createNodePolicy(broker, type, name, properties);
+    if (nodePolicy) {
+        nodePolicy->setPersistenceId(persistenceId);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void NodePolicyRegistry::add(boost::shared_ptr<NodePolicy> nodePolicy)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    NodePolicies::const_iterator i = nodePolicies.find(nodePolicy->getName());
+    if (i == nodePolicies.end()) {
+        nodePolicies.insert(NodePolicies::value_type(nodePolicy->getName(), nodePolicy));
+    } else {
+        if (i->second->getType() != nodePolicy->getType()) {
+            throw qpid::types::Exception(QPID_MSG("Cannot create object of type " << nodePolicy->getType() << " with key "
+                                                  << nodePolicy->getName() << " as an object of type " << i->second->getType() << " already exists with the same key"));
+        } else {
+            throw qpid::types::Exception(QPID_MSG("An object of type " << nodePolicy->getType() << " with key " << nodePolicy->getName() << " already exists"));
+        }
+    }
+}
+boost::shared_ptr<NodePolicy> NodePolicyRegistry::remove(const std::string& pattern, const std::string& type)
+{
+    boost::shared_ptr<NodePolicy> result;
+    qpid::sys::Mutex::ScopedLock l(lock);
+    NodePolicies::iterator i = nodePolicies.find(pattern);
+    if (i != nodePolicies.end()) {
+        if (i->second->getType() != type) {
+            throw qpid::types::Exception(QPID_MSG("Object with key " << i->first << " is of type " << i->second->getType() << " not " << type));
+        }
+        result = i->second;
+        nodePolicies.erase(i);
+    }
+    return result;
+}
+boost::shared_ptr<NodePolicy> NodePolicyRegistry::get(const std::string& pattern)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    NodePolicies::const_iterator i = nodePolicies.find(pattern);
+    if (i == nodePolicies.end()) {
+        return boost::shared_ptr<NodePolicy>();
+    } else {
+        return i->second;
+    }
+}
+
+boost::shared_ptr<NodePolicy> NodePolicyRegistry::match(const std::string& name)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    boost::shared_ptr<NodePolicy> best;
+    for (NodePolicies::const_iterator i = nodePolicies.begin(); i != nodePolicies.end(); ++i) {
+        //where multiple policies match, pick the one with the longest
+        //pattern as a crude guesstimate of the more specific one
+        if (i->second->match(name) && (!best || i->first.size() > best->getPattern().size())) {
+            best = i->second;
+        }
+    }
+    return best;
+}
+
+}}} // namespace qpid::broker::amqp

Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h?rev=1541059&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h Tue Nov 12 13:42:50 2013
@@ -0,0 +1,117 @@
+#ifndef QPID_BROKER_AMQP_NODEPOLICY_H
+#define QPID_BROKER_AMQP_NODEPOLICY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/ObjectFactory.h"
+#include "qpid/broker/PersistableObject.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/regex.h"
+#include "qpid/types/Variant.h"
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/QueuePolicy.h"
+#include "qmf/org/apache/qpid/broker/TopicPolicy.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class Broker;
+class Queue;
+namespace amqp {
+class Connection;
+class Topic;
+
+/**
+ * Policy for creation of nodes 'on-demand'
+ */
+class NodePolicy : public PersistableObject, public management::Manageable
+{
+  public:
+    NodePolicy(const std::string& type, const std::string& ptrn, const qpid::types::Variant::Map& props);
+    virtual ~NodePolicy();
+    const std::string& getPattern() const;
+    bool match(const std::string&) const;
+    bool isDurable() const;
+    virtual std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > create(const std::string&, Connection&) = 0;
+    virtual boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const = 0;
+  protected:
+    NodePolicy(Broker&, const std::string& type, const std::string& pattern, const qpid::types::Variant::Map& properties);
+    const std::string pattern;
+    bool durable;
+    std::string alternateExchange;
+    qpid::sys::regex compiled;
+};
+
+class QueuePolicy : public NodePolicy
+{
+  public:
+    QueuePolicy(Broker&, const std::string& pattern, const qpid::types::Variant::Map& properties);
+    ~QueuePolicy();
+    std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > create(const std::string&, Connection&);
+    boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const;
+  private:
+    qpid::broker::QueueSettings queueSettings;
+    qmf::org::apache::qpid::broker::QueuePolicy::shared_ptr policy;
+};
+
+class TopicPolicy : public NodePolicy
+{
+  public:
+    TopicPolicy(Broker&, const std::string& pattern, const qpid::types::Variant::Map& properties);
+    ~TopicPolicy();
+    std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > create(const std::string&, Connection&);
+    boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const;
+  private:
+    qpid::types::Variant::Map topicSettings;
+    std::string exchangeType;
+    bool autodelete;
+    qpid::types::Variant::Map exchangeSettings;
+    qmf::org::apache::qpid::broker::TopicPolicy::shared_ptr policy;
+};
+
+class NodePolicyRegistry : public ObjectFactory
+{
+  public:
+    bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                              const std::string& userId, const std::string& connectionId);
+    bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                              const std::string& userId, const std::string& connectionId);
+    bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+                       uint64_t persistenceId);
+
+    boost::shared_ptr<NodePolicy> match(const std::string& name);
+    boost::shared_ptr<NodePolicy> createQueuePolicy(Broker&, const std::string& name, const qpid::types::Variant::Map& properties);
+    boost::shared_ptr<NodePolicy> createTopicPolicy(Broker&, const std::string& name, const qpid::types::Variant::Map& properties);
+  private:
+    typedef std::map<std::string, boost::shared_ptr<NodePolicy> > NodePolicies;
+    qpid::sys::Mutex lock;
+    NodePolicies nodePolicies;
+
+    boost::shared_ptr<NodePolicy> createNodePolicy(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties);
+    void add(boost::shared_ptr<NodePolicy> nodePolicy);
+    boost::shared_ptr<NodePolicy> remove(const std::string& pattern, const std::string& type);
+    boost::shared_ptr<NodePolicy> get(const std::string& pattern);
+};
+
+}}} // namespace qpid::broker::amqp
+
+#endif  /*!QPID_BROKER_AMQP_NODEPOLICY_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp Tue Nov 12 13:42:50 2013
@@ -29,6 +29,7 @@
 #include "qpid/broker/amqp/Connection.h"
 #include "qpid/broker/amqp/Interconnects.h"
 #include "qpid/broker/amqp/Message.h"
+#include "qpid/broker/amqp/NodePolicy.h"
 #include "qpid/broker/amqp/Sasl.h"
 #include "qpid/broker/amqp/Topic.h"
 #include "qpid/broker/amqp/Translation.h"
@@ -44,22 +45,27 @@ namespace amqp {
 
 struct Options : public qpid::Options {
     std::string domain;
+    std::vector<std::string> queuePatterns;
+    std::vector<std::string> topicPatterns;
 
     Options() : qpid::Options("AMQP 1.0 Options") {
         addOptions()
-            ("domain", optValue(domain, "DOMAIN"), "Domain of this broker");
+            ("domain", optValue(domain, "DOMAIN"), "Domain of this broker")
+            ("queue-patterns", optValue(queuePatterns, "PATTERN"), "Pattern for on-demand queues")
+            ("topic-patterns", optValue(topicPatterns, "PATTERN"), "Pattern for on-demand topics");
     }
 };
 
 class ProtocolImpl : public BrokerContext, public Protocol
 {
   public:
-    ProtocolImpl(Interconnects* interconnects, TopicRegistry* topics, Broker& broker, const std::string& domain)
-        : BrokerContext(broker, *interconnects, *topics, domain)
+    ProtocolImpl(Interconnects* interconnects, TopicRegistry* topics, NodePolicyRegistry* policies, Broker& broker, const std::string& domain)
+        : BrokerContext(broker, *interconnects, *topics, *policies, domain)
     {
         interconnects->setContext(*this);
         broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown
         broker.getObjectFactoryRegistry().add(topics);//registry deletes on shutdown
+        broker.getObjectFactoryRegistry().add(policies);//registry deletes on shutdown
     }
     qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
     boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&);
@@ -71,18 +77,33 @@ struct ProtocolPlugin : public Plugin
 {
     Options options;
     Options* getOptions() { return &options; }
+    NodePolicyRegistry* policies;
+
+    ProtocolPlugin() : policies(0) {}
 
     void earlyInitialize(Plugin::Target& target)
     {
         //need to register protocol before recovery from store
         broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target);
         if (broker) {
-            ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), *broker, options.domain);
+            policies = new NodePolicyRegistry();
+            ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), policies, *broker, options.domain);
             broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown
         }
     }
 
-    void initialize(Plugin::Target&) {}
+    void initialize(Plugin::Target& target)
+    {
+        broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target);
+        if (broker) {
+            for (std::vector<std::string>::const_iterator i = options.queuePatterns.begin(); i != options.queuePatterns.end(); ++i) {
+                policies->createQueuePolicy(*broker, *i, qpid::types::Variant::Map());
+            }
+            for (std::vector<std::string>::const_iterator i = options.topicPatterns.begin(); i != options.topicPatterns.end(); ++i) {
+                policies->createTopicPolicy(*broker, *i, qpid::types::Variant::Map());
+            }
+        }
+    }
 };
 
 ProtocolPlugin instance; // Static initialization

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Tue Nov 12 13:42:50 2013
@@ -26,6 +26,7 @@
 #include "Domain.h"
 #include "Exception.h"
 #include "Interconnects.h"
+#include "NodePolicy.h"
 #include "Relay.h"
 #include "Topic.h"
 #include "qpid/amqp/descriptors.h"
@@ -260,19 +261,36 @@ Session::ResolvedNode Session::resolve(c
                 node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
             }
         } else {
-            size_t i = name.find('@');
-            if (i != std::string::npos && (i+1) < name.length()) {
-                std::string domain = name.substr(i+1);
-                std::string local = name.substr(0, i);
-                std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str();
-                //does this domain exist?
-                boost::shared_ptr<Domain> d = connection.getInterconnects().findDomain(domain);
-                if (d) {
-                    node.relay = boost::shared_ptr<Relay>(new Relay(1000));
-                    if (incoming) {
-                        d->connect(false, id, name, local, connection, node.relay);
-                    } else {
-                        d->connect(true, id, local, name, connection, node.relay);
+            boost::shared_ptr<NodePolicy> nodePolicy = connection.getNodePolicies().match(name);
+            if (nodePolicy) {
+                std::pair<boost::shared_ptr<Queue>, boost::shared_ptr<Topic> > result = nodePolicy->create(name, connection);
+                node.queue = result.first;
+                node.topic = result.second;
+                if (node.topic) node.exchange = node.topic->getExchange();
+
+                if (node.queue) {
+                    QPID_LOG(info, "Created queue " << name << " from policy with pattern " << nodePolicy->getPattern());
+                } else if (node.topic) {
+                    QPID_LOG(info, "Created topic " << name << " from policy with pattern " << nodePolicy->getPattern());
+                } else {
+                    QPID_LOG(debug, "Created neither a topic nor a queue for " << name << " from policy with pattern " << nodePolicy->getPattern());
+                }
+
+            } else {
+                size_t i = name.find('@');
+                if (i != std::string::npos && (i+1) < name.length()) {
+                    std::string domain = name.substr(i+1);
+                    std::string local = name.substr(0, i);
+                    std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str();
+                    //does this domain exist?
+                    boost::shared_ptr<Domain> d = connection.getInterconnects().findDomain(domain);
+                    if (d) {
+                        node.relay = boost::shared_ptr<Relay>(new Relay(1000));
+                        if (incoming) {
+                            d->connect(false, id, name, local, connection, node.relay);
+                        } else {
+                            d->connect(true, id, local, name, connection, node.relay);
+                        }
                     }
                 }
             }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.cpp Tue Nov 12 13:42:50 2013
@@ -58,8 +58,8 @@ qpid::types::Variant::Map filter(const q
 }
 }
 
-Topic::Topic(Broker& broker, const std::string& n, const qpid::types::Variant::Map& properties)
-    : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(broker.getExchanges().get(getProperty(EXCHANGE, properties))),
+Topic::Topic(Broker& broker, const std::string& n, boost::shared_ptr<Exchange> e, const qpid::types::Variant::Map& properties)
+    : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(e),
       alternateExchange(getProperty(ALTERNATE_EXCHANGE, properties))
 {
     if (exchange->getName().empty()) throw qpid::Exception("Exchange must be specified.");
@@ -107,9 +107,9 @@ const std::string& Topic::getAlternateEx
 {
     return alternateExchange;
 }
-boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties)
+boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::string& name, boost::shared_ptr<Exchange> exchange, const qpid::types::Variant::Map& properties)
 {
-    boost::shared_ptr<Topic> topic(new Topic(broker, name, properties));
+    boost::shared_ptr<Topic> topic(new Topic(broker, name, exchange, properties));
     add(topic);
     topic->getExchange()->setDeletionListener(name, boost::bind(&TopicRegistry::remove, this, name));
     return topic;
@@ -119,7 +119,7 @@ bool TopicRegistry::createObject(Broker&
                                  const std::string& /*userId*/, const std::string& /*connectionId*/)
 {
     if (type == TOPIC) {
-        boost::shared_ptr<Topic> topic = createTopic(broker, name, props);
+        boost::shared_ptr<Topic> topic = createTopic(broker, name, broker.getExchanges().get(getProperty(EXCHANGE, props)), props);
         if (topic->isDurable()) broker.getStore().create(*topic);
         return true;
     } else {
@@ -147,7 +147,7 @@ bool TopicRegistry::recoverObject(Broker
                    uint64_t persistenceId)
 {
     if (type == TOPIC) {
-        boost::shared_ptr<Topic> topic = createTopic(broker, name, properties);
+        boost::shared_ptr<Topic> topic = createTopic(broker, name, broker.getExchanges().get(getProperty(EXCHANGE, properties)), properties);
         topic->setPersistenceId(persistenceId);
         return true;
     } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Topic.h Tue Nov 12 13:42:50 2013
@@ -47,7 +47,7 @@ namespace amqp {
 class Topic : public PersistableObject, public management::Manageable
 {
   public:
-    Topic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties);
+    Topic(Broker&, const std::string& name, boost::shared_ptr<Exchange>, const qpid::types::Variant::Map& properties);
     ~Topic();
     const std::string& getName() const;
     const QueueSettings& getPolicy() const;
@@ -77,12 +77,12 @@ class TopicRegistry : public ObjectFacto
     bool add(boost::shared_ptr<Topic> topic);
     boost::shared_ptr<Topic> remove(const std::string& name);
     boost::shared_ptr<Topic> get(const std::string& name);
+    boost::shared_ptr<Topic> createTopic(Broker&, const std::string& name, boost::shared_ptr<Exchange> exchange, const qpid::types::Variant::Map& properties);
   private:
     typedef std::map<std::string, boost::shared_ptr<Topic> > Topics;
     qpid::sys::Mutex lock;
     Topics topics;
 
-    boost::shared_ptr<Topic> createTopic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties);
 };
 
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/management-schema.xml Tue Nov 12 13:42:50 2013
@@ -437,6 +437,24 @@
     <property name="durable"        type="bool"     access="RC"/>
     <property name="properties"     type="map"      access="RO"/>
   </class>
+  <!--
+  ===============================================================
+  AMQP 1.0 QueuePolicy
+  ===============================================================
+  -->
+  <class name="QueuePolicy">
+    <property name="name"           type="sstr"     access="RC" index="y"/>
+    <property name="properties"     type="map"      access="RO"/>
+  </class>
+  <!--
+  ===============================================================
+  AMQP 1.0 TopicPolicy
+  ===============================================================
+  -->
+  <class name="TopicPolicy">
+    <property name="name"           type="sstr"     access="RC" index="y"/>
+    <property name="properties"     type="map"      access="RO"/>
+  </class>
 
 
   <!--

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/regex.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/regex.h?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/regex.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/regex.h Tue Nov 12 13:42:50 2013
@@ -56,7 +56,7 @@ public:
     friend bool regex_match(const std::string& s, const regex& re);
 };
 
-bool regex_match(const std::string& s, const regex& re) {
+inline bool regex_match(const std::string& s, const regex& re) {
     return ::regexec(&(re.re), s.c_str(), 0, 0, 0)==0;
 }
 

Added: qpid/trunk/qpid/cpp/src/tests/policies.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/policies.py?rev=1541059&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/policies.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/policies.py Tue Nov 12 13:42:50 2013
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.tests.messaging.implementation import *
+from qpid.tests.messaging import VersionTest
+
+class Mgmt:
+    """
+    Simple QMF management utility (qpidtoollibs uses
+    qpid.messaging.Message rather than swigged version)
+    """
+    def __init__(self, conn):
+        self.conn = conn
+        self.sess = self.conn.session()
+        self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
+            str(uuid4())
+        self.reply_rx = self.sess.receiver(self.reply_to)
+        self.reply_rx.capacity = 10
+        self.tx = self.sess.sender("qmf.default.direct/broker")
+        self.next_correlator = 1
+
+    def list(self, class_name):
+        props = {'method'             : 'request',
+                 'qmf.opcode'         : '_query_request',
+                 'x-amqp-0-10.app-id' : 'qmf2'}
+        correlator = str(self.next_correlator)
+        self.next_correlator += 1
+
+        content = {'_what'      : 'OBJECT',
+                 '_schema_id' : {'_class_name' : class_name.lower()}}
+
+        message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
+                          properties=props, subject="broker")
+        self.tx.send(message)
+
+
+        response = self.reply_rx.fetch(10)
+        if response.properties['qmf.opcode'] != '_query_response':
+            raise Exception("bad response")
+        items = []
+        done = False
+        while not done:
+            for item in response.content:
+                items.append(item['_values'])
+            if 'partial' in response.properties:
+                response = self.reply_rx.fetch(10)
+            else:
+                done = True
+            self.sess.acknowledge()
+        return items
+
+    def do_qmf_method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10):
+        props = {'method'             : 'request',
+                 'qmf.opcode'         : '_method_request',
+                 'x-amqp-0-10.app-id' : 'qmf2'}
+        correlator = str(self.next_correlator)
+        self.next_correlator += 1
+
+        content = {'_object_id'   : {'_object_name' : addr},
+                   '_method_name' : method,
+                   '_arguments'   : arguments}
+
+        message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
+                          properties=props, subject="broker")
+        self.tx.send(message)
+        response = self.reply_rx.fetch(timeout)
+        self.sess.acknowledge()
+        if response.properties['qmf.opcode'] == '_exception':
+          raise Exception("Exception from Agent: %r" % response.content['_values'])
+        if response.properties['qmf.opcode'] != '_method_response':
+          raise Exception("bad response: %r" % response.properties)
+        return response.content['_arguments']
+
+    def create(self, _type, name, properties={}):
+        return self.do_qmf_method('create', {'type': _type, 'name': name, 'properties': properties})
+
+    def delete(self, _type, name):
+        return self.do_qmf_method('delete', {'type': _type, 'name': name})
+
+
+class PoliciesTests (VersionTest):
+    """
+    Tests for node policies with qpidd
+    """
+
+    def do_simple_queue_test(self, pattern, name, properties={}, autodeleted=True):
+        mgmt = self.create_connection("amqp0-10", True)
+        agent = Mgmt(mgmt)
+        agent.create('QueuePolicy', pattern, properties)
+        try:
+            snd = self.ssn.sender(name)
+            msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']]
+            for m in msgs: snd.send(m)
+            snd.close()
+
+            for expected in msgs:
+                rcv = self.ssn.receiver(name)
+                msg = rcv.fetch(0)
+                assert msg.content == expected.content, (msg.content, expected.content)
+                self.ssn.acknowledge()
+                rcv.close() #close after each message to ensure queue isn't deleted with messages in it
+            self.ssn.close()
+            self.conn.close()
+
+            matched = [q for q in agent.list("Queue") if q['name'] == name]
+            if autodeleted:
+                # ensure that queue is no longer there (as empty and unused)
+                assert len(matched) == 0, (matched)
+            else:
+                # ensure that queue is still there though empty and unused
+                assert len(matched) == 1, (matched)
+        finally:
+            agent.delete('QueuePolicy', pattern)
+            mgmt.close()
+
+    def test_queue(self):
+        self.do_simple_queue_test("queue-*", "queue-1")
+
+    def test_queue_not_autodeleted(self):
+        self.do_simple_queue_test("permanent-queue-*", "permanent-queue-1", {'auto-delete':False}, False)
+
+    def test_queue_manual_delete(self):
+        self.do_simple_queue_test("permanent-queue-*", "permanent-queue-1", {'qpid.lifetime-policy':'manual'}, False)
+
+    def test_queue_delete_if_unused_and_empty(self):
+        self.do_simple_queue_test("queue-*", "queue-1", {'qpid.lifetime-policy':'delete-if-unused-and-empty'}, True)
+
+    def do_simple_topic_test(self, pattern, name, properties={}, autodeleted=True):
+        mgmt = self.create_connection("amqp0-10", True)
+        agent = Mgmt(mgmt)
+        agent.create('TopicPolicy', pattern, properties)
+        try:
+            snd = self.ssn.sender(name)
+            rcv1 = self.ssn.receiver(name)
+            rcv2 = self.ssn.receiver(name)
+
+            msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']]
+            for m in msgs: snd.send(m)
+
+            for rcv in [rcv1, rcv2]:
+                for expected in msgs:
+                    msg = rcv.fetch(0)
+                    assert msg.content == expected.content, (msg.content, expected.content)
+            self.ssn.acknowledge()
+            rcv1.close()
+            rcv2.close()
+            snd.close()
+
+            matched = [e for e in agent.list("Exchange") if e['name'] == name]
+            if autodeleted:
+                # ensure that exchange is no longer there (as it is now unused)
+                assert len(matched) == 0, (matched)
+            else:
+                # ensure that exchange has not been autodeleted in spite of being unused
+                assert len(matched) == 1, (matched)
+        finally:
+            agent.delete('TopicPolicy', pattern)
+            mgmt.close()
+
+    def test_topic(self):
+        self.do_simple_topic_test('fanout-*', 'fanout-1', {'exchange-type':'fanout'})
+
+    def test_topic_not_autodelete(self):
+        self.do_simple_topic_test('permanent-fanout-*', 'permanent-fanout-1', {'exchange-type':'fanout', 'auto-delete':False}, False)
+
+    def test_topic_manual_delete(self):
+        self.do_simple_topic_test('permanent-fanout-*', 'permanent-fanout-1', {'exchange-type':'fanout', 'qpid.lifetime-policy':'manual'}, False)
+
+    def test_topic_delete_if_unused(self):
+        self.do_simple_topic_test('fanout-*', 'fanout-1', {'exchange-type':'fanout', 'qpid.lifetime-policy':'delete-if-unused'}, True)
+
+    def test_mgmt(self):
+        mgmt = self.create_connection("amqp0-10", True)
+        agent = Mgmt(mgmt)
+        agent.create('QueuePolicy', 'queue-*')
+        agent.create('QueuePolicy', 'alt.queue.*')
+        agent.create('TopicPolicy', 'topic-*')
+        try:
+            queues = [q['name'] for q in agent.list("QueuePolicy")]
+            topics = [t['name'] for t in agent.list("TopicPolicy")]
+            assert 'queue-*' in queues, (queues)
+            assert 'alt.queue.*' in queues, (queues)
+
+            try:
+                agent.delete('TopicPolicy', 'queue-*')
+                assert False, ('Deletion of policy using wrong type should fail')
+            except: None
+
+        finally:
+            agent.delete('QueuePolicy', 'queue-*')
+            agent.delete('QueuePolicy', 'alt.queue.*')
+            agent.delete('TopicPolicy', 'topic-*')
+            mgmt.close()

Modified: qpid/trunk/qpid/cpp/src/tests/swig_python_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/swig_python_tests?rev=1541059&r1=1541058&r2=1541059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/swig_python_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/swig_python_tests Tue Nov 12 13:42:50 2013
@@ -53,7 +53,7 @@ export PYTHONPATH=$PYTHONPATH:$PYTHONPAT
 export QPID_USE_SWIG_CLIENT=1
 $QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1
 if [[ -a $AMQP_LIB ]] ; then
-    $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1
+    $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1
 fi
 stop_broker
 if [[ $FAILED -eq 1 ]]; then



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org