You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/19 14:49:45 UTC

svn commit: r1072329 - in /qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker: Broker.cpp Broker.h Queue.cpp Queue.h QueueFlowLimit.cpp SessionAdapter.cpp SessionAdapter.h

Author: kgiusti
Date: Sat Feb 19 13:49:44 2011
New Revision: 1072329

URL: http://svn.apache.org/viewvc?rev=1072329&view=rev
Log:
QPID-2935: merge latest trunk

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.h

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1072329&r1=1072328&r2=1072329&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp Sat Feb 19 13:49:44 2011
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
@@ -34,10 +35,19 @@
 #include "qpid/broker/QueueFlowLimit.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/management/ManagementDirectExchange.h"
 #include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/log/Logger.h"
@@ -45,7 +55,9 @@
 #include "qpid/log/Statement.h"
 #include "qpid/log/posix/SinkOptions.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/ProtocolFactory.h"
 #include "qpid/sys/Poller.h"
@@ -77,7 +89,10 @@ using qpid::management::ManagementAgent;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
+using qpid::management::getManagementExecutionContext;
+using qpid::types::Variant;
 using std::string;
+using std::make_pair;
 
 namespace _qmf = qmf::org::apache::qpid::broker;
 
@@ -457,6 +472,20 @@ Manageable::status_t Broker::ManagementM
         QPID_LOG (debug, "Broker::getLogLevel()");
         status = Manageable::STATUS_OK;
         break;
+    case _qmf::Broker::METHOD_CREATE :
+      {
+          _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args);
+          createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getManagementExecutionContext());
+          status = Manageable::STATUS_OK;
+          break;
+      }
+    case _qmf::Broker::METHOD_DELETE :
+      {
+          _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args);
+          deleteObject(a.i_type, a.i_name, a.i_options, getManagementExecutionContext());
+          status = Manageable::STATUS_OK;
+          break;
+      }
    default:
         QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
         status = Manageable::STATUS_NOT_IMPLEMENTED;
@@ -466,6 +495,169 @@ Manageable::status_t Broker::ManagementM
     return status;
 }
 
+namespace
+{
+const std::string TYPE_QUEUE("queue");
+const std::string TYPE_EXCHANGE("exchange");
+const std::string TYPE_TOPIC("topic");
+const std::string TYPE_BINDING("binding");
+const std::string DURABLE("durable");
+const std::string AUTO_DELETE("auto-delete");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string EXCHANGE_TYPE("exchange-type");
+const std::string QUEUE_NAME("queue");
+const std::string EXCHANGE_NAME("exchange");
+
+const std::string TRUE("true");
+const std::string FALSE("false");
+}
+
+struct InvalidBindingIdentifier : public qpid::Exception
+{
+    InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {}
+    std::string getPrefix() const { return "invalid binding"; }
+};
+
+struct BindingIdentifier
+{
+    std::string exchange;
+    std::string queue;
+    std::string key;
+
+    BindingIdentifier(const std::string& name)
+    {
+        std::vector<std::string> path;
+        split(path, name, "/");
+        switch (path.size()) {
+          case 1:
+            queue = path[0];
+            break;
+          case 2:
+            exchange = path[0];
+            queue = path[1];
+            break;
+          case 3:
+            exchange = path[0];
+            queue = path[1];
+            key = path[2];
+            break;
+          default:
+            throw InvalidBindingIdentifier(name);
+        }
+    }
+};
+
+struct ObjectAlreadyExists : public qpid::Exception
+{
+    ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {}
+    std::string getPrefix() const { return "object already exists"; }
+};
+
+struct UnknownObjectType : public qpid::Exception
+{
+    UnknownObjectType(const std::string& type) : qpid::Exception(type) {}
+    std::string getPrefix() const { return "unknown object type"; }
+};
+
+void Broker::createObject(const std::string& type, const std::string& name,
+                          const Variant::Map& properties, bool /*strict*/, const ConnectionState* context)
+{
+    std::string userId;
+    std::string connectionId;
+    if (context) {
+        userId = context->getUserId();
+        connectionId = context->getUrl();
+    }
+    //TODO: implement 'strict' option (check there are no unrecognised properties)
+    QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
+    if (type == TYPE_QUEUE) {
+        bool durable(false);
+        bool autodelete(false);
+        std::string alternateExchange;
+        Variant::Map extensions;
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            // extract durable, auto-delete and alternate-exchange properties
+            if (i->first == DURABLE) durable = i->second;
+            else if (i->first == AUTO_DELETE) autodelete = i->second;
+            else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+            //treat everything else as extension properties
+            else extensions[i->first] = i->second;
+        }
+        framing::FieldTable arguments;
+        amqp_0_10::translate(extensions, arguments);
+
+        std::pair<boost::shared_ptr<Queue>, bool> result =
+            createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId);
+        if (!result.second) {
+            throw ObjectAlreadyExists(name);
+        }
+    } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+        bool durable(false);
+        std::string exchangeType;
+        std::string alternateExchange;
+        Variant::Map extensions;
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            // extract durable, auto-delete and alternate-exchange properties
+            if (i->first == DURABLE) durable = i->second;
+            else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+            else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
+            //treat everything else as extension properties
+            else extensions[i->first] = i->second;
+        }
+        framing::FieldTable arguments;
+        amqp_0_10::translate(extensions, arguments);
+
+        try {
+            std::pair<boost::shared_ptr<Exchange>, bool> result =
+                createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId);
+            if (!result.second) {
+                throw ObjectAlreadyExists(name);
+            }
+        } catch (const UnknownExchangeTypeException&) {
+            throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType));
+        }
+    } else if (type == TYPE_BINDING) {
+        BindingIdentifier binding(name);
+        std::string exchangeType("topic");
+        Variant::Map extensions;
+        for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+            // extract durable, auto-delete and alternate-exchange properties
+            if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
+            //treat everything else as extension properties
+            else extensions[i->first] = i->second;
+        }
+        framing::FieldTable arguments;
+        amqp_0_10::translate(extensions, arguments);
+
+        bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+    } else {
+        throw UnknownObjectType(type);
+    }
+}
+
+void Broker::deleteObject(const std::string& type, const std::string& name,
+                          const Variant::Map& options, const ConnectionState* context)
+{
+    std::string userId;
+    std::string connectionId;
+    if (context) {
+        userId = context->getUserId();
+        connectionId = context->getUrl();
+    }
+    QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
+    if (type == TYPE_QUEUE) {
+        deleteQueue(name, userId, connectionId);
+    } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
+        deleteExchange(name, userId, connectionId);
+    } else if (type == TYPE_BINDING) {
+        BindingIdentifier binding(name);
+        unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+    } else {
+        throw UnknownObjectType(type);
+    }
+
+}
+
 void Broker::setLogLevel(const std::string& level)
 {
     QPID_LOG(notice, "Changing log level to " << level);
@@ -566,5 +758,221 @@ void Broker::setClusterTimer(std::auto_p
 
 const std::string Broker::TCP_TRANSPORT("tcp");
 
+
+std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
+    const std::string& name,
+    bool durable,
+    bool autodelete,
+    const OwnershipToken* owner,
+    const std::string& alternateExchange,
+    const qpid::framing::FieldTable& arguments,
+    const std::string& userId,
+    const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+        params.insert(make_pair(acl::PROP_PASSIVE, FALSE));
+        params.insert(make_pair(acl::PROP_DURABLE, durable ? TRUE : FALSE));
+        params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? TRUE : FALSE));
+        params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? TRUE : FALSE));
+        params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
+        if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
+    }
+
+    Exchange::shared_ptr alternate;
+    if (!alternateExchange.empty()) {
+        alternate = exchanges.get(alternateExchange);
+        if (!alternate) framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+    }
+
+    std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner);
+    if (result.second) {
+        if (alternate) {
+            result.first->setAlternateExchange(alternate);
+            alternate->incAlternateUsers();
+        }
+
+        //apply settings & create persistent record if required
+        result.first->create(arguments);
+        //add default binding:
+        result.first->bind(exchanges.getDefault(), name);
+
+        if (managementAgent.get()) {
+            //TODO: debatable whether we should raise an event here for
+            //create when this is a 'declare' event; ideally add a create
+            //event instead?
+            managementAgent->raiseEvent(
+                _qmf::EventQueueDeclare(connectionId, userId, name,
+                                        durable, owner, autodelete,
+                                        ManagementAgent::toMap(arguments),
+                                        "created"));
+        }
+    }
+    return result;
+}
+
+void Broker::deleteQueue(const std::string& name, const std::string& userId,
+                         const std::string& connectionId, QueueFunctor check)
+{
+    if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
+        throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+    }
+
+    Queue::shared_ptr queue = queues.find(name);
+    if (queue) {
+        if (check) check(queue);
+        queues.destroy(name);
+        queue->destroyed();
+    } else {
+        throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
+    }
+
+    if (managementAgent.get())
+        managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
+
+}
+
+std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
+    const std::string& name,
+    const std::string& type,
+    bool durable,
+    const std::string& alternateExchange,
+    const qpid::framing::FieldTable& arguments,
+    const std::string& userId,
+    const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_TYPE, type));
+        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+        params.insert(make_pair(acl::PROP_PASSIVE, FALSE));
+        params.insert(make_pair(acl::PROP_DURABLE, durable ? TRUE : FALSE));
+        if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId));
+    }
+
+    Exchange::shared_ptr alternate;
+    if (!alternateExchange.empty()) {
+        alternate = exchanges.get(alternateExchange);
+        if (!alternate) framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
+    }
+
+    std::pair<Exchange::shared_ptr, bool> result;
+    result = exchanges.declare(name, type, durable, arguments);
+    if (result.second) {
+        if (alternate) {
+            result.first->setAlternate(alternate);
+            alternate->incAlternateUsers();
+        }
+        if (durable) {
+            store->create(*result.first, arguments);
+        }
+        if (managementAgent.get()) {
+            //TODO: debatable whether we should raise an event here for
+            //create when this is a 'declare' event; ideally add a create
+            //event instead?
+            managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId,
+                                                         userId,
+                                                         name,
+                                                         type,
+                                                         alternateExchange,
+                                                         durable,
+                                                         false,
+                                                         ManagementAgent::toMap(arguments),
+                                                         "created"));
+        }
+    }
+    return result;
+}
+
+void Broker::deleteExchange(const std::string& name, const std::string& userId,
+                           const std::string& connectionId)
+{
+    if (acl) {
+        if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
+    }
+
+    Exchange::shared_ptr exchange(exchanges.get(name));
+    if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
+    if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+    if (exchange->isDurable()) store->destroy(*exchange);
+    if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
+    exchanges.destroy(name);
+
+    if (managementAgent.get())
+        managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
+
+}
+
+void Broker::bind(const std::string& queueName,
+                  const std::string& exchangeName,
+                  const std::string& key,
+                  const qpid::framing::FieldTable& arguments,
+                  const std::string& userId,
+                  const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+        params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+
+        if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId));
+    }
+
+    Queue::shared_ptr queue = queues.find(queueName);
+    Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+    if (!queue) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
+    } else if (!exchange) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+    } else {
+        if (queue->bind(exchange, key, arguments)) {
+            if (managementAgent.get()) {
+                managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
+                                                  queueName, key, ManagementAgent::toMap(arguments)));
+            }
+        }
+    }
+}
+
+void Broker::unbind(const std::string& queueName,
+                    const std::string& exchangeName,
+                    const std::string& key,
+                    const std::string& userId,
+                    const std::string& connectionId)
+{
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+        params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
+        if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
+            throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId));
+    }
+
+    Queue::shared_ptr queue = queues.find(queueName);
+    Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+    if (!queue) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
+    } else if (!exchange) {
+        throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+    } else {
+        if (exchange->unbind(queue, key, 0)) {
+            if (exchange->isDurable() && queue->isDurable()) {
+                store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+            }
+            if (managementAgent.get()) {
+                managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
+            }
+        }
+    }
+}
+
 }} // namespace qpid::broker
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h?rev=1072329&r1=1072328&r2=1072329&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h Sat Feb 19 13:49:44 2011
@@ -49,6 +49,7 @@
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Timer.h"
+#include "qpid/types/Variant.h"
 #include "qpid/RefCounted.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/sys/Mutex.h"
@@ -68,6 +69,7 @@ struct Url;
 
 namespace broker {
 
+class ConnectionState;
 class ExpiryPolicy;
 class Message;
 
@@ -150,6 +152,10 @@ public:
     void setStore ();
     void setLogLevel(const std::string& level);
     std::string getLogLevel();
+    void createObject(const std::string& type, const std::string& name,
+                      const qpid::types::Variant::Map& properties, bool lenient, const ConnectionState* context);
+    void deleteObject(const std::string& type, const std::string& name,
+                      const qpid::types::Variant::Map& options, const ConnectionState* context);
 
     boost::shared_ptr<sys::Poller> poller;
     sys::Timer timer;
@@ -301,6 +307,42 @@ public:
                           const boost::intrusive_ptr<Message>& msg)> deferDelivery;
 
     bool isAuthenticating ( ) { return config.auth; }
+
+    typedef boost::function1<void, boost::shared_ptr<Queue> > QueueFunctor;
+
+    std::pair<boost::shared_ptr<Queue>, bool> createQueue(
+        const std::string& name,
+        bool durable,
+        bool autodelete,
+        const OwnershipToken* owner,
+        const std::string& alternateExchange,
+        const qpid::framing::FieldTable& arguments,
+        const std::string& userId,
+        const std::string& connectionId);
+    void deleteQueue(const std::string& name,
+                     const std::string& userId,
+                     const std::string& connectionId,
+                     QueueFunctor check = QueueFunctor());
+    std::pair<Exchange::shared_ptr, bool> createExchange(
+        const std::string& name,
+        const std::string& type,
+        bool durable,
+        const std::string& alternateExchange,
+        const qpid::framing::FieldTable& args,
+        const std::string& userId, const std::string& connectionId);
+    void deleteExchange(const std::string& name, const std::string& userId,
+                        const std::string& connectionId);
+    void bind(const std::string& queue,
+              const std::string& exchange,
+              const std::string& key,
+              const qpid::framing::FieldTable& arguments,
+              const std::string& userId,
+              const std::string& connectionId);
+    void unbind(const std::string& queue,
+                const std::string& exchange,
+                const std::string& key,
+                const std::string& userId,
+                const std::string& connectionId);
 };
 
 }}

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1072329&r1=1072328&r2=1072329&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp Sat Feb 19 13:49:44 2011
@@ -440,7 +440,7 @@ void Queue::purgeExpired()
             Mutex::ScopedLock locker(messageLock);
             messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
         }
-        for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+        for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
 }
 
@@ -825,8 +825,9 @@ void Queue::configure(const FieldTable& 
     QueueFlowLimit::observe(*this, _settings);
 }
 
-void Queue::destroy()
+void Queue::destroyed()
 {
+    unbind(broker->getExchanges());
     if (alternateExchange.get()) {
         Mutex::ScopedLock locker(messageLock);
         while(!messages->empty()){
@@ -845,6 +846,7 @@ void Queue::destroy()
         store = 0;//ensure we make no more calls to the store for this queue
     }
     if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
+    notifyDeleted();
 }
 
 void Queue::notifyDeleted()
@@ -864,9 +866,9 @@ void Queue::bound(const string& exchange
     bindings.add(exchange, key, args);
 }
 
-void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref)
+void Queue::unbind(ExchangeRegistry& exchanges)
 {
-    bindings.unbind(exchanges, shared_ref);
+    bindings.unbind(exchanges, shared_from_this());
 }
 
 void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
@@ -954,8 +956,7 @@ void tryAutoDeleteImpl(Broker& broker, Q
     if (broker.getQueues().destroyIf(queue->getName(), 
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
         QPID_LOG(debug, "Auto-deleting " << queue->getName());
-        queue->unbind(broker.getExchanges(), queue);
-        queue->destroy();
+        queue->destroyed();
     }
 }
 
@@ -1174,6 +1175,22 @@ void Queue::flush()
     if (u.acquired && store) store->flush(*this);
 }
 
+
+bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
+                 const qpid::framing::FieldTable& arguments)
+{
+    if (exchange->bind(shared_from_this(), key, &arguments)) {
+        bound(exchange->getName(), key, arguments);
+        if (exchange->isDurable() && isDurable()) {
+            store->bind(*exchange, *this, key, arguments);
+        }
+        return true;
+    } else {
+        return false;
+    }
+}
+
+
 const Broker* Queue::getBroker()
 {
     return broker;

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h?rev=1072329&r1=1072328&r2=1072329&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.h Sat Feb 19 13:49:44 2011
@@ -172,8 +172,9 @@ class Queue : public boost::enable_share
             }
         }
     }
-            
+
     void checkNotDeleted();
+    void notifyDeleted();
 
   public:
 
@@ -196,13 +197,17 @@ class Queue : public boost::enable_share
     // "recovering" means we are doing a MessageStore recovery.
     QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
                                       bool recovering = false);
-    void destroy();
-    void notifyDeleted();
+    void destroyed();
     QPID_BROKER_EXTERN void bound(const std::string& exchange,
                                   const std::string& key,
                                   const qpid::framing::FieldTable& args);
-    QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges,
-                                   Queue::shared_ptr shared_ref);
+    //TODO: get unbind out of the public interface; only there for purposes of one unit test
+    void unbind(ExchangeRegistry& exchanges);
+    /**
+     * Bind self to specified exchange, and record that binding for unbinding on delete.
+     */
+    bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
+              const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable());
 
     QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
     QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1072329&r1=1072328&r2=1072329&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Sat Feb 19 13:49:44 2011
@@ -142,7 +142,7 @@ void QueueFlowLimit::enqueued(const Queu
             queueMgmtObj->set_flowStopped(true);
     }
 
-    // KAG: test - REMOVE ONCE STABLE
+    /** @todo KAG: - REMOVE ONCE STABLE */
     if (index.find(msg.payload) != index.end()) {
         QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position);
     }
@@ -150,7 +150,7 @@ void QueueFlowLimit::enqueued(const Queu
     if (flowStopped || !index.empty()) {
         // ignore flow control if we are populating the queue due to cluster replication:
         if (broker && broker->isClusterUpdatee()) {
-            QPID_LOG(error, "KAG: Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position);
+            QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position);
             return;
         }
         QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
@@ -231,8 +231,7 @@ void QueueFlowLimit::setState(const Queu
         sys::Mutex::ScopedLock l(indexLock);
         assert(index.find(msg.payload) == index.end());
 
-        QPID_LOG(error, "KAG TBD!!!: Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC");
-        // KAG TBD!!!
+        QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC");
         index.insert(msg.payload);
     }
 }
@@ -324,8 +323,9 @@ QueueFlowLimit *QueueFlowLimit::createLi
         if (flowStopCount == 0 && flowStopSize == 0) {   // disable flow control
             return 0;
         }
-        /** todo KAG - remove once cluster support for flow control done. */
+        /** @todo KAG - remove once cluster support for flow control done. */
         // TODO aconway 2011-02-16: is queue==0 only in tests?
+        // TODO kgiusti 2011-02-19: yes!  The unit tests test this class in isolation */
         if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
             QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
                      << queue->getName());

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1072329&r1=1072328&r2=1072329&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Sat Feb 19 13:49:44 2011
@@ -65,51 +65,54 @@ void SessionAdapter::ExchangeHandlerImpl
                                                   const string& alternateExchange, 
                                                   bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
 
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_TYPE, type));
-        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
-        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
-    }
-    
     //TODO: implement autoDelete
     Exchange::shared_ptr alternate;
     if (!alternateExchange.empty()) {
         alternate = getBroker().getExchanges().get(alternateExchange);
     }
     if(passive){
+        AclModule* acl = getBroker().getAcl();
+        if (acl) {
+            //TODO: why does a passive declare require create
+            //permission? The purpose of the passive flag is to state
+            //that the exchange should *not* created. For
+            //authorisation a passive declare is similar to
+            //exchange-query.
+            std::map<acl::Property, std::string> params;
+            params.insert(make_pair(acl::PROP_TYPE, type));
+            params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+            params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
+                throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId()));
+        }
         Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
         checkType(actual, type);
         checkAlternate(actual, alternate);
-    }else{        
+    }else{
         if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
             throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
         }
         try{
-            std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
-            if (response.second) {
-                if (alternate) {
-                    response.first->setAlternate(alternate);
-                    alternate->incAlternateUsers();
-                }
-                if (durable) {
-                    getBroker().getStore().create(*response.first, args);
-                }
-            } else {
+            std::pair<Exchange::shared_ptr, bool> response =
+                getBroker().createExchange(exchange, type, durable, alternateExchange, args,
+                                           getConnection().getUserId(), getConnection().getUrl());
+            if (!response.second) {
+                //exchange already there, not created
                 checkType(response.first, type);
                 checkAlternate(response.first, alternate);
+                ManagementAgent* agent = getBroker().getManagementAgent();
+                if (agent)
+                    agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
+                                                                 getConnection().getUserId(),
+                                                                 exchange,
+                                                                 type,
+                                                                 alternateExchange,
+                                                                 durable,
+                                                                 false,
+                                                                 ManagementAgent::toMap(args),
+                                                                 "existing"));
             }
-
-            ManagementAgent* agent = getBroker().getManagementAgent();
-            if (agent)
-                agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
-                                                             alternateExchange, durable, false, ManagementAgent::toMap(args),
-                                                             response.second ? "created" : "existing"));
-
         }catch(UnknownExchangeTypeException& /*e*/){
             throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
         }
@@ -135,22 +138,8 @@ void SessionAdapter::ExchangeHandlerImpl
                 
 void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
 {
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
-    }
-
-    //TODO: implement unused
-    Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
-    if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
-    if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
-    if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
-    getBroker().getExchanges().destroy(name);
-
-    ManagementAgent* agent = getBroker().getManagementAgent();
-    if (agent)
-        agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
+    //TODO: implement if-unused
+    getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
 }
 
 ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -170,67 +159,19 @@ ExchangeQueryResult SessionAdapter::Exch
 }
 
 void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, 
-                                           const string& exchangeName, const string& routingKey, 
-                                           const FieldTable& arguments)
+                                               const string& exchangeName, const string& routingKey, 
+                                               const FieldTable& arguments)
 {
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
-        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
-
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
-    }
-
-    Queue::shared_ptr queue = getQueue(queueName);
-    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
-    if(exchange){
-        string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
-        if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
-            queue->bound(exchangeName, routingKey, arguments);
-            if (exchange->isDurable() && queue->isDurable()) {
-                getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
-            }
-
-            ManagementAgent* agent = getBroker().getManagementAgent();
-            if (agent)
-                agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
-                                                  queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
-        }
-    }else{
-        throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
-    }
+    getBroker().bind(queueName, exchangeName, routingKey, arguments,
+                     getConnection().getUserId(), getConnection().getUrl());
 }
  
 void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
                                                  const string& exchangeName,
                                                  const string& routingKey)
 {
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
-        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId()));
-    }
-
-    Queue::shared_ptr queue = getQueue(queueName);
-    if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
-    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
-    if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
-    //TODO: revise unbind to rely solely on binding key (not args)
-    if (exchange->unbind(queue, routingKey, 0)) {
-        if (exchange->isDurable() && queue->isDurable())
-            getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
-
-        ManagementAgent* agent = getBroker().getManagementAgent();
-        if (agent)
-            agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
-    }
+    getBroker().unbind(queueName, exchangeName, routingKey,
+                       getConnection().getUserId(), getConnection().getUrl());
 }
 
 ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -333,52 +274,42 @@ QueueQueryResult SessionAdapter::QueueHa
 void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
                                                bool passive, bool durable, bool exclusive, 
                                                bool autoDelete, const qpid::framing::FieldTable& arguments)
-{ 
-    AclModule* acl = getBroker().getAcl();
-    if (acl) {
-        std::map<acl::Property, std::string> params;
-        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
-        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
-        params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
-        params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
-        params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
-        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
-        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
-
-        if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
-            throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
-    }
-
-    Exchange::shared_ptr alternate;
-    if (!alternateExchange.empty()) {
-        alternate = getBroker().getExchanges().get(alternateExchange);
-    }
+{
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
-    queue = getQueue(name);
+        AclModule* acl = getBroker().getAcl();
+        if (acl) {
+            //TODO: why does a passive declare require create
+            //permission? The purpose of the passive flag is to state
+            //that the queue should *not* created. For
+            //authorisation a passive declare is similar to
+            //queue-query (or indeed a qmf query).
+            std::map<acl::Property, std::string> params;
+            params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
+            params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+            params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
+            params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+            params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+            params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+            params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+                throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
+        }
+        queue = getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
-        std::pair<Queue::shared_ptr, bool> queue_created =  
-            getBroker().getQueues().declare(name, durable,
-                                            autoDelete,
-                                            exclusive ? &session : 0);
+        std::pair<Queue::shared_ptr, bool> queue_created =
+            getBroker().createQueue(name, durable,
+                                    autoDelete,
+                                    exclusive ? &session : 0,
+                                    alternateExchange,
+                                    arguments,
+                                    getConnection().getUserId(),
+                                    getConnection().getUrl());
         queue = queue_created.first;
         assert(queue);
         if (queue_created.second) { // This is a new queue
-            if (alternate) {
-                queue->setAlternateExchange(alternate);
-                alternate->incAlternateUsers();
-            }
-
-            //apply settings & create persistent record if required
-            try { queue_created.first->create(arguments); }
-            catch (...) { getBroker().getQueues().destroy(name); throw; }
-
-            //add default binding:
-            getBroker().getExchanges().getDefault()->bind(queue, name, 0);
-            queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
-
             //handle automatic cleanup:
             if (exclusive) {
                 exclusiveQueues.push_back(queue);
@@ -387,21 +318,20 @@ void SessionAdapter::QueueHandlerImpl::d
             if (exclusive && queue->setExclusiveOwner(&session)) {
                 exclusiveQueues.push_back(queue);
             }
+            ManagementAgent* agent = getBroker().getManagementAgent();
+            if (agent)
+                agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
+                                                      name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
+                                                      "existing"));
         }
 
-        ManagementAgent* agent = getBroker().getManagementAgent();
-        if (agent)
-            agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
-                                                      name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
-                                                      queue_created.second ? "created" : "existing"));
     }
 
-    if (exclusive && !queue->isExclusiveOwner(&session)) 
+    if (exclusive && !queue->isExclusiveOwner(&session))
         throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
                                                << queue->getName()));
-} 
-        
-        
+}
+
 void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
     AclModule* acl = getBroker().getAcl();
     if (acl)
@@ -410,40 +340,32 @@ void SessionAdapter::QueueHandlerImpl::p
              throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
     }
     getQueue(queue)->purge();
-} 
-        
-void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
-
-    AclModule* acl = getBroker().getAcl();
-    if (acl)
-    {
-         if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) )
-             throw UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId()));
-    }
+}
 
-    Queue::shared_ptr q = getQueue(queue);
-    if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) 
+void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty)
+{
+    if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) {
         throw ResourceLockedException(QPID_MSG("Cannot delete queue "
-                                               << queue << "; it is exclusive to another session"));
-    if(ifEmpty && q->getMessageCount() > 0){
-        throw PreconditionFailedException("Queue not empty.");
-    }else if(ifUnused && q->getConsumerCount() > 0){
-        throw PreconditionFailedException("Queue in use.");
-    }else{
+                                               << queue->getName() << "; it is exclusive to another session"));
+    } else if(ifEmpty && queue->getMessageCount() > 0) {
+        throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+                                                   << queue->getName() << "; queue not empty"));
+    } else if(ifUnused && queue->getConsumerCount() > 0) {
+        throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
+                                                   << queue->getName() << "; queue in use"));
+    } else if (queue->isExclusiveOwner(&getConnection())) {
         //remove the queue from the list of exclusive queues if necessary
-        if(q->isExclusiveOwner(&getConnection())){
-            QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
-            if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
-        }
-        q->destroy();
-        getBroker().getQueues().destroy(queue);
-        q->unbind(getBroker().getExchanges(), q);
-
-        ManagementAgent* agent = getBroker().getManagementAgent();
-        if (agent)
-            agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
-        q->notifyDeleted();
-    }
+        QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(),
+                                            getConnection().exclusiveQueues.end(),
+                                            queue);
+        if (i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
+    }    
+}
+        
+void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
+{
+    getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
+                            boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
 } 
 
 SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=1072329&r1=1072328&r2=1072329&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/SessionAdapter.h Sat Feb 19 13:49:44 2011
@@ -138,6 +138,7 @@ class Queue;
         bool isLocal(const ConnectionToken* t) const; 
 
         void destroyExclusiveQueues();
+        void checkDelete(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
         template <class F> void eachExclusiveQueue(F f) 
         { 
             std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org