You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/07/18 21:50:09 UTC
svn commit: r1504622 - in /qpid/trunk/qpid/cpp/src/qpid:
broker/QueueSettings.cpp broker/amqp/Session.cpp
client/amqp0_10/AddressResolution.cpp messaging/amqp/AddressHelper.cpp
messaging/amqp/AddressHelper.h
Author: gsim
Date: Thu Jul 18 19:50:09 2013
New Revision: 1504622
URL: http://svn.apache.org/r1504622
Log:
QPID-5003: set finite lifetime by default for durable subscription queues that are not currently in use
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp Thu Jul 18 19:50:09 2013
@@ -173,6 +173,7 @@ bool QueueSettings::handle(const std::st
return true;
} else if (key == AUTO_DELETE_TIMEOUT) {
autoDeleteDelay = value;
+ if (autoDeleteDelay) autodelete = true;
return true;
} else if (key == QueueFlowLimit::flowStopCountKey) {
flowStop.setCount(value);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Thu Jul 18 19:50:09 2013
@@ -333,8 +333,12 @@ void Session::setupOutgoing(pn_link_t* l
settings.durable = durable;
settings.autodelete = !durable;
}
+ settings.autoDeleteDelay = pn_terminus_get_timeout(source);
+ if (settings.autoDeleteDelay) {
+ settings.autodelete = true;
+ settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
+ }
filter.configure(settings);
- //TODO: populate settings from source details when available from engine
std::stringstream queueName;
if (shared) {
//just use link name (TODO: could allow this to be
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Thu Jul 18 19:50:09 2013
@@ -89,6 +89,7 @@ const std::string NODE("node");
const std::string LINK("link");
const std::string MODE("mode");
const std::string RELIABILITY("reliability");
+const std::string TIMEOUT("timeout");
const std::string NAME("name");
const std::string DURABLE("durable");
const std::string X_DECLARE("x-declare");
@@ -517,14 +518,24 @@ Subscription::Subscription(const Address
: Exchange(address),
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
durable(Opt(address)/LINK/DURABLE),
- //if the link is durable, then assume it is also reliable unless expclitly stated otherwise
- //if not assume it is unreliable unless expclitly stated otherwise
+ //if the link is durable, then assume it is also reliable unless explicitly stated otherwise
+ //if not assume it is unreliable unless explicitly stated otherwise
reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)),
actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)),
alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str())
{
+ const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
+ if (timeout) {
+ if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32());
+ } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) {
+ //if durable but not explicitly reliable, then set a non-zero
+ //default for the autodelete timeout (previously this would
+ //have defaulted to autodelete immediately anyway, so the risk
+ //of the change causing problems is mitigated)
+ queueOptions.setInt("qpid.auto_delete_delay", 15*60);
+ }
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
std::string selector = Opt(address)/LINK/SELECTOR;
@@ -999,6 +1010,7 @@ Verifier::Verifier()
link[NAME] = true;
link[DURABLE] = true;
link[RELIABILITY] = true;
+ link[TIMEOUT] = true;
link[X_SUBSCRIBE] = true;
link[X_DECLARE] = true;
link[X_BINDINGS] = true;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Thu Jul 18 19:50:09 2013
@@ -57,6 +57,7 @@ const std::string PROPERTIES("properties
const std::string MODE("mode");
const std::string BROWSE("browse");
const std::string CONSUME("consume");
+const std::string TIMEOUT("timeout");
const std::string TYPE("type");
const std::string TOPIC("topic");
@@ -144,6 +145,16 @@ bool test(const Variant::Map& options, c
}
}
+template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return defaultValue;
+ } else {
+ return j->second;
+ }
+}
+
bool bind(const Variant::Map& options, const std::string& name, std::string& variable)
{
Variant::Map::const_iterator j = options.find(name);
@@ -260,6 +271,8 @@ void write(pn_data_t* data, const Varian
break;
}
}
+const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
+const uint32_t DEFAULT_TIMEOUT(0);
}
AddressHelper::AddressHelper(const Address& address) :
@@ -268,6 +281,7 @@ AddressHelper::AddressHelper(const Addre
type(address.getType()),
durableNode(false),
durableLink(false),
+ timeout(0),
browse(false)
{
verifier.verify(address);
@@ -281,6 +295,7 @@ AddressHelper::AddressHelper(const Addre
bind(node, CAPABILITIES, capabilities);
durableNode = test(node, DURABLE);
durableLink = test(link, DURABLE);
+ timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
std::string mode;
if (bind(address, MODE, mode)) {
if (mode == BROWSE) {
@@ -521,27 +536,30 @@ void AddressHelper::configure(pn_terminu
if (durableLink) {
pn_terminus_set_durability(terminus, PN_DELIVERIES);
}
- if (mode == FOR_RECEIVER && browse) {
- //when PROTON-139 is resolved, set the required delivery-mode
- }
- //set filter(s):
- if (mode == FOR_RECEIVER && !filters.empty()) {
- pn_data_t* filter = pn_terminus_filter(terminus);
- pn_data_put_map(filter);
- pn_data_enter(filter);
- for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
- pn_data_put_symbol(filter, convert(i->name));
- pn_data_put_described(filter);
+ if (mode == FOR_RECEIVER) {
+ if (timeout) pn_terminus_set_timeout(terminus, timeout);
+ if (browse) {
+ //when PROTON-139 is resolved, set the required delivery-mode
+ }
+ //set filter(s):
+ if (!filters.empty()) {
+ pn_data_t* filter = pn_terminus_filter(terminus);
+ pn_data_put_map(filter);
pn_data_enter(filter);
- if (i->descriptorSymbol.size()) {
- pn_data_put_symbol(filter, convert(i->descriptorSymbol));
- } else {
- pn_data_put_ulong(filter, i->descriptorCode);
+ for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
+ pn_data_put_symbol(filter, convert(i->name));
+ pn_data_put_described(filter);
+ pn_data_enter(filter);
+ if (i->descriptorSymbol.size()) {
+ pn_data_put_symbol(filter, convert(i->descriptorSymbol));
+ } else {
+ pn_data_put_ulong(filter, i->descriptorCode);
+ }
+ write(filter, i->value);
+ pn_data_exit(filter);
}
- write(filter, i->value);
pn_data_exit(filter);
}
- pn_data_exit(filter);
}
}
@@ -632,6 +650,7 @@ Verifier::Verifier()
link[NAME] = true;
link[DURABLE] = true;
link[RELIABILITY] = true;
+ link[TIMEOUT] = true;
link[X_SUBSCRIBE] = true;
link[X_DECLARE] = true;
link[X_BINDINGS] = true;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1504622&r1=1504621&r2=1504622&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Thu Jul 18 19:50:09 2013
@@ -68,6 +68,7 @@ class AddressHelper
std::string type;
bool durableNode;
bool durableLink;
+ uint32_t timeout;
bool browse;
std::vector<Filter> filters;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org