You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2022/05/31 09:04:45 UTC

[qpid-cpp] branch main updated: If cannot deliver incoming message due to queue limit then RELEASE message

This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 462e9568f If cannot deliver incoming message due to queue limit then RELEASE message
462e9568f is described below

commit 462e9568f81241ac4924d75bf7e551bea185e82d
Author: Pete Fawcett <pe...@fawcett.co.uk>
AuthorDate: Fri May 27 13:47:29 2022 +0100

    If cannot deliver incoming message due to queue limit then RELEASE message
---
 src/qpid/broker/Queue.cpp         |  6 ++++-
 src/qpid/broker/amqp/Incoming.cpp | 16 +++++++++----
 src/qpid/broker/amqp/Session.cpp  | 50 +++++++++++++++++----------------------
 3 files changed, 39 insertions(+), 33 deletions(-)

diff --git a/src/qpid/broker/Queue.cpp b/src/qpid/broker/Queue.cpp
index 16ac65946..7baaaa254 100644
--- a/src/qpid/broker/Queue.cpp
+++ b/src/qpid/broker/Queue.cpp
@@ -1661,7 +1661,11 @@ bool Queue::checkDepth(const QueueDepth& increment, const Message&)
             if (brokerMgmtObject)
                 brokerMgmtObject->inc_discardsOverflow();
         }
-        throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]"));
+        throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name <<
+                    ": current=[" << current <<
+                    "], max=[" << settings.maxDepth <<
+                    "], increment=[" << increment <<
+                    "]"));
     } else {
         current += increment;
         return true;
diff --git a/src/qpid/broker/amqp/Incoming.cpp b/src/qpid/broker/amqp/Incoming.cpp
index 0507aade4..5e851a8bc 100644
--- a/src/qpid/broker/amqp/Incoming.cpp
+++ b/src/qpid/broker/amqp/Incoming.cpp
@@ -154,9 +154,17 @@ void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>
     }	
     userid.verify(message.getUserId());
     received->begin();
-    handle(message, session.getTransaction(delivery));
-    Transfer t(delivery, sessionPtr);
-    sessionPtr->pending_accept(delivery);
-    received->end(t);
+
+    try {
+        handle(message, session.getTransaction(delivery));
+        Transfer t(delivery, sessionPtr);
+        sessionPtr->pending_accept(delivery);
+        received->end(t);
+    } catch (const qpid::framing::ResourceLimitExceededException& e) {
+        pn_delivery_update(delivery, PN_RELEASED);
+        pn_delivery_settle(delivery);
+    } catch (const qpid::SessionException& e) {
+        throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
+    }
 }
 }}} // namespace qpid::broker::amqp
diff --git a/src/qpid/broker/amqp/Session.cpp b/src/qpid/broker/amqp/Session.cpp
index d1a0f38bb..684235cf0 100644
--- a/src/qpid/broker/amqp/Session.cpp
+++ b/src/qpid/broker/amqp/Session.cpp
@@ -948,28 +948,27 @@ void IncomingToQueue::handle(qpid::broker::Message& message, qpid::broker::TxBuf
         msg << " Queue " << queue->getName() << " has been deleted";
         throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str());
     }
+
     try {
         queue->deliver(message, transaction);
-    } catch (const qpid::SessionException& e) {
-        throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
     }
+    catch (const qpid::Exception& e) {
+        QPID_LOG(warning, "Cannot deliver to  queue " <<  queue->getName() << ": " << e.what());
+        throw;
+    }
+
 }
 
 void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction)
 {
-    if (exchange->isDestroyed())
+    if (exchange->isDestroyed()) {
         throw qpid::framing::ResourceDeletedException(QPID_MSG("Exchange " << exchange->getName() << " has been deleted."));
-    try {
-        authorise.route(exchange, message);
-        DeliverableMessage deliverable(message, transaction);
-        exchange->route(deliverable);
-        if (!deliverable.delivered) {
-            if (exchange->getAlternate()) {
-                exchange->getAlternate()->route(deliverable);
-            }
-        }
-    } catch (const qpid::SessionException& e) {
-        throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
+    }
+    authorise.route(exchange, message);
+    DeliverableMessage deliverable(message, transaction);
+    exchange->route(deliverable);
+    if (!deliverable.delivered && exchange->getAlternate()) {
+        exchange->getAlternate()->route(deliverable);
     }
 }
 
@@ -993,21 +992,16 @@ void AnonymousRelay::handle(qpid::broker::Message& message, qpid::broker::TxBuff
         }
     }
 
-    try {
-        if (queue) {
-            authorise.incoming(queue);
-            queue->deliver(message, transaction);
-        } else if (exchange) {
-            authorise.route(exchange, message);
-            DeliverableMessage deliverable(message, transaction);
-            exchange->route(deliverable);
-        } else {
-            QPID_LOG(info, "AnonymousRelay dropping message for " << dest);
-        }
-    } catch (const qpid::SessionException& e) {
-        throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
+    if (queue) {
+        authorise.incoming(queue);
+        queue->deliver(message, transaction);
+    } else if (exchange) {
+        authorise.route(exchange, message);
+        DeliverableMessage deliverable(message, transaction);
+        exchange->route(deliverable);
+    } else {
+        QPID_LOG(info, "AnonymousRelay dropping message for " << dest);
     }
-
 }
 
 void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org