You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2014/01/10 23:22:55 UTC

svn commit: r1557272 - in /qpid/trunk/qpid/cpp/src/qpid/broker/amqp: Outgoing.cpp Outgoing.h Session.cpp Session.h

Author: gsim
Date: Fri Jan 10 22:22:54 2014
New Revision: 1557272

URL: http://svn.apache.org/r1557272
Log:
QPID-5467: fix handling of delete-on-close

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1557272&r1=1557271&r2=1557272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Fri Jan 10 22:22:54 2014
@@ -66,7 +66,8 @@ OutgoingFromQueue::OutgoingFromQueue(Bro
       current(0), outstanding(0),
       buffer(1024)/*used only for header at present*/,
       //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
-      unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link))
+      unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
+      cancelled(false)
 {
     for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
         deliveries[i].init(i);
@@ -181,6 +182,13 @@ void OutgoingFromQueue::detached()
     }
     if (exclusive) queue->releaseExclusiveOwnership();
     else if (isControllingUser) queue->releaseFromUse(true);
+    cancelled = true;
+}
+
+
+OutgoingFromQueue::~OutgoingFromQueue()
+{
+    if (!cancelled && isControllingUser) queue->releaseFromUse(true);
 }
 
 //Consumer interface:

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1557272&r1=1557271&r2=1557272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Fri Jan 10 22:22:54 2014
@@ -90,6 +90,7 @@ class OutgoingFromQueue : public Outgoin
   public:
     OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
                       qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser);
+    ~OutgoingFromQueue();
     void setSubjectFilter(const std::string&);
     void setSelectorFilter(const std::string&);
     void init();
@@ -144,6 +145,7 @@ class OutgoingFromQueue : public Outgoin
     std::string subjectFilter;
     boost::scoped_ptr<Selector> selector;
     bool unreliable;
+    bool cancelled;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1557272&r1=1557271&r2=1557272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Fri Jan 10 22:22:54 2014
@@ -251,14 +251,20 @@ Session::ResolvedNode Session::resolve(c
                 }
                 qpid::framing::FieldTable args;
                 qpid::amqp_0_10::translate(node.properties.getProperties(), args);
-                node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(),
-                                                                      node.properties.getAlternateExchange(),
-                                                                      args, connection.getUserId(), connection.getId()).first;
+                std::pair<boost::shared_ptr<Exchange>, bool> result
+                    = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(),
+                                                            node.properties.getAlternateExchange(),
+                                                            args, connection.getUserId(), connection.getId());
+                node.exchange = result.first;
+                node.created = result.second;
             } else {
                 if (node.exchange) {
                     QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists");
                 }
-                node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
+                std::pair<boost::shared_ptr<Queue>, bool> result
+                    = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
+                node.queue = result.first;
+                node.created = result.second;
             }
         } else {
             boost::shared_ptr<NodePolicy> nodePolicy = connection.getNodePolicies().match(name);
@@ -415,7 +421,7 @@ void Session::setupIncoming(pn_link_t* l
         source = sourceAddress;
     }
     if (node.queue) {
-        boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.properties.trackControllingLink()));
+        boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.created && node.properties.trackControllingLink()));
         incoming[link] = q;
     } else if (node.exchange) {
         boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source));
@@ -460,7 +466,7 @@ void Session::setupOutgoing(pn_link_t* l
         if (type == CONSUMER && node.queue->hasExclusiveOwner() && !node.queue->isExclusiveOwner(this)) {
             throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot consume from exclusive queue ") + node.queue->getName());
         }
-        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink()));
+        boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.created && node.properties.trackControllingLink()));
         q->init();
         filter.apply(q);
         outgoing[link] = q;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1557272&r1=1557271&r2=1557272&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.h Fri Jan 10 22:22:54 2014
@@ -100,7 +100,8 @@ class Session : public ManagedSession, p
         boost::shared_ptr<qpid::broker::amqp::Topic> topic;
         boost::shared_ptr<Relay> relay;
         NodeProperties properties;
-        ResolvedNode(bool isDynamic) : properties(isDynamic) {}
+        bool created;
+        ResolvedNode(bool isDynamic) : properties(isDynamic), created(false) {}
     };
 
     ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org