You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:58:12 UTC

qpid-cpp git commit: QPID-7677: C++ broker AMQP 1.0 subscription uses more memory than 0-10

Repository: qpid-cpp
Updated Branches:
  refs/heads/master f0eaf298d -> 4e393bdfa


QPID-7677: C++ broker AMQP 1.0 subscription uses more memory than 0-10

AMQP 1.0 allocates a per-session buffer for unacknowledged message
deliveries. By default it holds up to 5000 deliveries. This commit adds the
qpidd option --session-max-unacked so the buffer can be made smaller if the
broker has a large number of sessions and the memory overhead is a problem.

The following values (in Kb) give an idea of the overhead per session, the
actual values will vary by platform and build type:

RSS per client for AMQP 0.10: 142
RSS per client for AMQP 1.0
qpidd --session-max-unacked=100: 	227 (diff 85)
qpidd --session-max-unacked=1000: 	349 (diff 207)
qpidd --session-max-unacked=5000: 	846 (diff 704)

The broker sending messages on a session without waiting for acknowledgement up
to the session-max-unacked limit (it may stop sending before the limit if the
aggregate link credit for the session is lower.) Once it reaches the limit it
will wait for acknowledgement from the client before sending more messages.


Project: http://git-wip-us.apache.org/repos/asf/qpid-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-cpp/commit/4e393bdf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-cpp/tree/4e393bdf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-cpp/diff/4e393bdf

Branch: refs/heads/master
Commit: 4e393bdfaba0b52cf5d9ee9b254b7814ee43e4ba
Parents: f0eaf29
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Feb 23 12:49:38 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 23 13:38:07 2017 -0500

----------------------------------------------------------------------
 src/qpid/broker/Broker.cpp        | 9 ++++++++-
 src/qpid/broker/Broker.h          | 1 +
 src/qpid/broker/BrokerOptions.h   | 1 +
 src/qpid/broker/amqp/Outgoing.cpp | 3 ++-
 4 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/4e393bdf/src/qpid/broker/Broker.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Broker.cpp b/src/qpid/broker/Broker.cpp
index 7767469..272219c 100644
--- a/src/qpid/broker/Broker.cpp
+++ b/src/qpid/broker/Broker.cpp
@@ -150,7 +150,8 @@ BrokerOptions::BrokerOptions(const std::string& name) :
     linkHeartbeatInterval(120*sys::TIME_SEC),
     dtxDefaultTimeout(60),      // 60s
     dtxMaxTimeout(3600),        // 3600s
-    maxNegotiateTime(10000)     // 10s
+    maxNegotiateTime(10000),    // 10s
+    sessionMaxUnacked(5000) 
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -200,6 +201,7 @@ BrokerOptions::BrokerOptions(const std::string& name) :
         ("dtx-max-timeout", optValue(dtxMaxTimeout, "SECONDS"), "Maximum allowed timeout for DTX transaction. A value of zero disables maximum timeout limit checks and allows arbitrarily large timeout settings.")
         ("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation")
         ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag")
+        ("session-max-unacked", optValue(sessionMaxUnacked, "DELIVERIES"), "Maximum number of un-acknowledged outoing messages per sesssion")
         ;
 }
 
@@ -443,6 +445,11 @@ uint32_t Broker::getMaxNegotiateTime() const
     return config.maxNegotiateTime;
 }
 
+size_t Broker::getSessionMaxUnacked() const
+{
+    return config.sessionMaxUnacked;
+}
+
 uint16_t Broker::getPortOption() const
 {
     return config.port;

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/4e393bdf/src/qpid/broker/Broker.h
----------------------------------------------------------------------
diff --git a/src/qpid/broker/Broker.h b/src/qpid/broker/Broker.h
index af1144e..b612635 100644
--- a/src/qpid/broker/Broker.h
+++ b/src/qpid/broker/Broker.h
@@ -335,6 +335,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
     QPID_BROKER_EXTERN const std::vector<std::string>& getListenInterfaces() const;
     QPID_BROKER_EXTERN int getConnectionBacklog() const;
     uint32_t getMaxNegotiateTime() const;
+    size_t getSessionMaxUnacked() const;
     sys::Duration getLinkMaintenanceInterval() const;
     QPID_BROKER_EXTERN sys::Duration getLinkHeartbeatInterval() const;
     uint32_t getDtxMaxTimeout() const;

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/4e393bdf/src/qpid/broker/BrokerOptions.h
----------------------------------------------------------------------
diff --git a/src/qpid/broker/BrokerOptions.h b/src/qpid/broker/BrokerOptions.h
index 7207c17..0ef25fb 100644
--- a/src/qpid/broker/BrokerOptions.h
+++ b/src/qpid/broker/BrokerOptions.h
@@ -77,6 +77,7 @@ struct BrokerOptions : public qpid::Options
     uint32_t dtxDefaultTimeout; // Default timeout of a DTX transaction
     uint32_t dtxMaxTimeout;     // Maximal timeout of a DTX transaction
     uint32_t maxNegotiateTime;  // Max time in ms for connection with no negotiation
+    size_t sessionMaxUnacked;   // Max un-acknowledged outgoing messages per session
     std::string fedTag;
 
 private:

http://git-wip-us.apache.org/repos/asf/qpid-cpp/blob/4e393bdf/src/qpid/broker/amqp/Outgoing.cpp
----------------------------------------------------------------------
diff --git a/src/qpid/broker/amqp/Outgoing.cpp b/src/qpid/broker/amqp/Outgoing.cpp
index abd96a6..1ef62b9 100644
--- a/src/qpid/broker/amqp/Outgoing.cpp
+++ b/src/qpid/broker/amqp/Outgoing.cpp
@@ -24,6 +24,7 @@
 #include "qpid/broker/amqp/Header.h"
 #include "qpid/broker/amqp/Session.h"
 #include "qpid/broker/amqp/Translation.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/Selector.h"
 #include "qpid/broker/TopicKeyNode.h"
@@ -65,7 +66,7 @@ OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source,
       Consumer(pn_link_name(l), type, target),
       exclusive(e),
       isControllingUser(p),
-      queue(q), deliveries(5000), link(l), out(o),
+      queue(q), deliveries(broker.getSessionMaxUnacked()), link(l), out(o),
       current(0),
       buffer(1024)/*used only for header at present*/,
       //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org