You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2022/05/31 09:04:45 UTC
[qpid-cpp] branch main updated: If cannot deliver incoming message due to queue limit then RELEASE message
This is an automated email from the ASF dual-hosted git repository.
gsim pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 462e9568f If cannot deliver incoming message due to queue limit then RELEASE message
462e9568f is described below
commit 462e9568f81241ac4924d75bf7e551bea185e82d
Author: Pete Fawcett <pe...@fawcett.co.uk>
AuthorDate: Fri May 27 13:47:29 2022 +0100
If cannot deliver incoming message due to queue limit then RELEASE message
---
src/qpid/broker/Queue.cpp | 6 ++++-
src/qpid/broker/amqp/Incoming.cpp | 16 +++++++++----
src/qpid/broker/amqp/Session.cpp | 50 +++++++++++++++++----------------------
3 files changed, 39 insertions(+), 33 deletions(-)
diff --git a/src/qpid/broker/Queue.cpp b/src/qpid/broker/Queue.cpp
index 16ac65946..7baaaa254 100644
--- a/src/qpid/broker/Queue.cpp
+++ b/src/qpid/broker/Queue.cpp
@@ -1661,7 +1661,11 @@ bool Queue::checkDepth(const QueueDepth& increment, const Message&)
if (brokerMgmtObject)
brokerMgmtObject->inc_discardsOverflow();
}
- throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]"));
+ throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name <<
+ ": current=[" << current <<
+ "], max=[" << settings.maxDepth <<
+ "], increment=[" << increment <<
+ "]"));
} else {
current += increment;
return true;
diff --git a/src/qpid/broker/amqp/Incoming.cpp b/src/qpid/broker/amqp/Incoming.cpp
index 0507aade4..5e851a8bc 100644
--- a/src/qpid/broker/amqp/Incoming.cpp
+++ b/src/qpid/broker/amqp/Incoming.cpp
@@ -154,9 +154,17 @@ void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>
}
userid.verify(message.getUserId());
received->begin();
- handle(message, session.getTransaction(delivery));
- Transfer t(delivery, sessionPtr);
- sessionPtr->pending_accept(delivery);
- received->end(t);
+
+ try {
+ handle(message, session.getTransaction(delivery));
+ Transfer t(delivery, sessionPtr);
+ sessionPtr->pending_accept(delivery);
+ received->end(t);
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ pn_delivery_update(delivery, PN_RELEASED);
+ pn_delivery_settle(delivery);
+ } catch (const qpid::SessionException& e) {
+ throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
+ }
}
}}} // namespace qpid::broker::amqp
diff --git a/src/qpid/broker/amqp/Session.cpp b/src/qpid/broker/amqp/Session.cpp
index d1a0f38bb..684235cf0 100644
--- a/src/qpid/broker/amqp/Session.cpp
+++ b/src/qpid/broker/amqp/Session.cpp
@@ -948,28 +948,27 @@ void IncomingToQueue::handle(qpid::broker::Message& message, qpid::broker::TxBuf
msg << " Queue " << queue->getName() << " has been deleted";
throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str());
}
+
try {
queue->deliver(message, transaction);
- } catch (const qpid::SessionException& e) {
- throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
}
+ catch (const qpid::Exception& e) {
+ QPID_LOG(warning, "Cannot deliver to queue " << queue->getName() << ": " << e.what());
+ throw;
+ }
+
}
void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction)
{
- if (exchange->isDestroyed())
+ if (exchange->isDestroyed()) {
throw qpid::framing::ResourceDeletedException(QPID_MSG("Exchange " << exchange->getName() << " has been deleted."));
- try {
- authorise.route(exchange, message);
- DeliverableMessage deliverable(message, transaction);
- exchange->route(deliverable);
- if (!deliverable.delivered) {
- if (exchange->getAlternate()) {
- exchange->getAlternate()->route(deliverable);
- }
- }
- } catch (const qpid::SessionException& e) {
- throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
+ }
+ authorise.route(exchange, message);
+ DeliverableMessage deliverable(message, transaction);
+ exchange->route(deliverable);
+ if (!deliverable.delivered && exchange->getAlternate()) {
+ exchange->getAlternate()->route(deliverable);
}
}
@@ -993,21 +992,16 @@ void AnonymousRelay::handle(qpid::broker::Message& message, qpid::broker::TxBuff
}
}
- try {
- if (queue) {
- authorise.incoming(queue);
- queue->deliver(message, transaction);
- } else if (exchange) {
- authorise.route(exchange, message);
- DeliverableMessage deliverable(message, transaction);
- exchange->route(deliverable);
- } else {
- QPID_LOG(info, "AnonymousRelay dropping message for " << dest);
- }
- } catch (const qpid::SessionException& e) {
- throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
+ if (queue) {
+ authorise.incoming(queue);
+ queue->deliver(message, transaction);
+ } else if (exchange) {
+ authorise.route(exchange, message);
+ DeliverableMessage deliverable(message, transaction);
+ exchange->route(deliverable);
+ } else {
+ QPID_LOG(info, "AnonymousRelay dropping message for " << dest);
}
-
}
void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org