You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/07/18 21:50:09 UTC

svn commit: r1504622 - in /qpid/trunk/qpid/cpp/src/qpid: broker/QueueSettings.cpp broker/amqp/Session.cpp client/amqp0_10/AddressResolution.cpp messaging/amqp/AddressHelper.cpp messaging/amqp/AddressHelper.h

Author: gsim
Date: Thu Jul 18 19:50:09 2013
New Revision: 1504622

URL: http://svn.apache.org/r1504622
Log:
QPID-5003: set finite lifetime by default for durable subscription queues that are not currently in use

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Thu Jul 18 19:50:09 2013
@@ -173,6 +173,7 @@ bool QueueSettings::handle(const std::st
         return true;
     } else if (key == AUTO_DELETE_TIMEOUT) {
         autoDeleteDelay = value;
+        if (autoDeleteDelay) autodelete = true;
         return true;
     } else if (key == QueueFlowLimit::flowStopCountKey) {
         flowStop.setCount(value);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Thu Jul 18 19:50:09 2013
@@ -333,8 +333,12 @@ void Session::setupOutgoing(pn_link_t* l
             settings.durable = durable;
             settings.autodelete = !durable;
         }
+        settings.autoDeleteDelay = pn_terminus_get_timeout(source);
+        if (settings.autoDeleteDelay) {
+            settings.autodelete = true;
+            settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
+        }
         filter.configure(settings);
-        //TODO: populate settings from source details when available from engine
         std::stringstream queueName;
         if (shared) {
             //just use link name (TODO: could allow this to be

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Thu Jul 18 19:50:09 2013
@@ -89,6 +89,7 @@ const std::string NODE("node");
 const std::string LINK("link");
 const std::string MODE("mode");
 const std::string RELIABILITY("reliability");
+const std::string TIMEOUT("timeout");
 const std::string NAME("name");
 const std::string DURABLE("durable");
 const std::string X_DECLARE("x-declare");
@@ -517,14 +518,24 @@ Subscription::Subscription(const Address
     : Exchange(address),
       queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
       durable(Opt(address)/LINK/DURABLE),
-      //if the link is durable, then assume it is also reliable unless expclitly stated otherwise
-      //if not assume it is unreliable unless expclitly stated otherwise
+      //if the link is durable, then assume it is also reliable unless explicitly stated otherwise
+      //if not assume it is unreliable unless explicitly stated otherwise
       reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)),
       actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
       exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
       exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)),
       alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str())
 {
+    const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
+    if (timeout) {
+        if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32());
+    } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) {
+        //if durable but not explicitly reliable, then set a non-zero
+        //default for the autodelete timeout (previously this would
+        //have defaulted to autodelete immediately anyway, so the risk
+        //of the change causing problems is mitigated)
+        queueOptions.setInt("qpid.auto_delete_delay", 15*60);
+    }
     (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
     (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
     std::string selector = Opt(address)/LINK/SELECTOR;
@@ -999,6 +1010,7 @@ Verifier::Verifier()
     link[NAME] = true;
     link[DURABLE] = true;
     link[RELIABILITY] = true;
+    link[TIMEOUT] = true;
     link[X_SUBSCRIBE] = true;
     link[X_DECLARE] = true;
     link[X_BINDINGS] = true;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Thu Jul 18 19:50:09 2013
@@ -57,6 +57,7 @@ const std::string PROPERTIES("properties
 const std::string MODE("mode");
 const std::string BROWSE("browse");
 const std::string CONSUME("consume");
+const std::string TIMEOUT("timeout");
 
 const std::string TYPE("type");
 const std::string TOPIC("topic");
@@ -144,6 +145,16 @@ bool test(const Variant::Map& options, c
     }
 }
 
+template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue)
+{
+    Variant::Map::const_iterator j = options.find(name);
+    if (j == options.end()) {
+        return defaultValue;
+    } else {
+        return j->second;
+    }
+}
+
 bool bind(const Variant::Map& options, const std::string& name, std::string& variable)
 {
     Variant::Map::const_iterator j = options.find(name);
@@ -260,6 +271,8 @@ void write(pn_data_t* data, const Varian
         break;
     }
 }
+const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
+const uint32_t DEFAULT_TIMEOUT(0);
 }
 
 AddressHelper::AddressHelper(const Address& address) :
@@ -268,6 +281,7 @@ AddressHelper::AddressHelper(const Addre
     type(address.getType()),
     durableNode(false),
     durableLink(false),
+    timeout(0),
     browse(false)
 {
     verifier.verify(address);
@@ -281,6 +295,7 @@ AddressHelper::AddressHelper(const Addre
     bind(node, CAPABILITIES, capabilities);
     durableNode = test(node, DURABLE);
     durableLink = test(link, DURABLE);
+    timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
     std::string mode;
     if (bind(address, MODE, mode)) {
         if (mode == BROWSE) {
@@ -521,27 +536,30 @@ void AddressHelper::configure(pn_terminu
     if (durableLink) {
         pn_terminus_set_durability(terminus, PN_DELIVERIES);
     }
-    if (mode == FOR_RECEIVER && browse) {
-        //when PROTON-139 is resolved, set the required delivery-mode
-    }
-    //set filter(s):
-    if (mode == FOR_RECEIVER && !filters.empty()) {
-        pn_data_t* filter = pn_terminus_filter(terminus);
-        pn_data_put_map(filter);
-        pn_data_enter(filter);
-        for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
-            pn_data_put_symbol(filter, convert(i->name));
-            pn_data_put_described(filter);
+    if (mode == FOR_RECEIVER) {
+        if (timeout) pn_terminus_set_timeout(terminus, timeout);
+        if (browse) {
+            //when PROTON-139 is resolved, set the required delivery-mode
+        }
+        //set filter(s):
+        if (!filters.empty()) {
+            pn_data_t* filter = pn_terminus_filter(terminus);
+            pn_data_put_map(filter);
             pn_data_enter(filter);
-            if (i->descriptorSymbol.size()) {
-                pn_data_put_symbol(filter, convert(i->descriptorSymbol));
-            } else {
-                pn_data_put_ulong(filter, i->descriptorCode);
+            for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
+                pn_data_put_symbol(filter, convert(i->name));
+                pn_data_put_described(filter);
+                pn_data_enter(filter);
+                if (i->descriptorSymbol.size()) {
+                    pn_data_put_symbol(filter, convert(i->descriptorSymbol));
+                } else {
+                    pn_data_put_ulong(filter, i->descriptorCode);
+                }
+                write(filter, i->value);
+                pn_data_exit(filter);
             }
-            write(filter, i->value);
             pn_data_exit(filter);
         }
-        pn_data_exit(filter);
     }
 
 }
@@ -632,6 +650,7 @@ Verifier::Verifier()
     link[NAME] = true;
     link[DURABLE] = true;
     link[RELIABILITY] = true;
+    link[TIMEOUT] = true;
     link[X_SUBSCRIBE] = true;
     link[X_DECLARE] = true;
     link[X_BINDINGS] = true;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Thu Jul 18 19:50:09 2013
@@ -68,6 +68,7 @@ class AddressHelper
     std::string type;
     bool durableNode;
     bool durableLink;
+    uint32_t timeout;
     bool browse;
     std::vector<Filter> filters;
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org