You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2019/04/01 11:33:37 UTC

[qpid-cpp] branch master updated (aca4aa3 -> ea8e59d)

This is an automated email from the ASF dual-hosted git repository.

gsim pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git.


    from aca4aa3  NO-JIRA: fix test to not prompt for password
     new 05de766  QPID-8292: ensure bridge is marked detached on error
     new ea8e59d  QPID-8293: limit the number of messages that can be purged in a single sweep

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/qpid/broker/Bridge.cpp      |  4 ++++
 src/qpid/broker/Broker.cpp      | 11 ++++++++++-
 src/qpid/broker/Broker.h        |  2 ++
 src/qpid/broker/BrokerOptions.h |  1 +
 src/qpid/broker/Queue.cpp       | 26 +++++++++++++++-----------
 src/qpid/broker/Queue.h         |  1 +
 6 files changed, 33 insertions(+), 12 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-cpp] 02/02: QPID-8293: limit the number of messages that can be purged in a single sweep

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git

commit ea8e59de0e4f6dd3513549befee362c9af1f98ca
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Apr 1 12:32:56 2019 +0100

    QPID-8293: limit the number of messages that can be purged in a single sweep
---
 src/qpid/broker/Broker.cpp      | 11 ++++++++++-
 src/qpid/broker/Broker.h        |  2 ++
 src/qpid/broker/BrokerOptions.h |  1 +
 src/qpid/broker/Queue.cpp       | 26 +++++++++++++++-----------
 src/qpid/broker/Queue.h         |  1 +
 5 files changed, 29 insertions(+), 12 deletions(-)

diff --git a/src/qpid/broker/Broker.cpp b/src/qpid/broker/Broker.cpp
index d214994..48f9675 100644
--- a/src/qpid/broker/Broker.cpp
+++ b/src/qpid/broker/Broker.cpp
@@ -151,7 +151,8 @@ BrokerOptions::BrokerOptions(const std::string& name) :
     dtxDefaultTimeout(60),      // 60s
     dtxMaxTimeout(3600),        // 3600s
     maxNegotiateTime(10000),    // 10s
-    sessionMaxUnacked(5000) 
+    sessionMaxUnacked(5000),
+    maxPurgeBatch(1000)
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -180,6 +181,7 @@ BrokerOptions::BrokerOptions(const std::string& name) :
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
         ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
          "Interval between attempts to purge any expired messages from queues")
+        ("max-purge-batch", optValue(maxPurgeBatch, "MESSAGES"), "maximum number of expired messages queue clenear will purge in one batch (controls impact on other threads)")
         ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted")
         ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication")
         ("sasl-service-name", optValue(saslServiceName, "NAME"), "The service name to specify for SASL")
@@ -248,6 +250,7 @@ Broker::Broker(const BrokerOptions& conf) :
     recoveryInProgress(false),
     protocolRegistry(std::set<std::string>(conf.protocols.begin(), conf.protocols.end()), this),
     timestampRcvMsgs(conf.timestampRcvMsgs),
+    maxPurgeBatch(conf.maxPurgeBatch),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
     if (!dataDir.isEnabled()) {
@@ -1694,5 +1697,11 @@ void Broker::setLinkClientProperties(const framing::FieldTable& ft) {
     linkClientProperties = ft;
 }
 
+uint32_t Broker::getMaxPurgeBatch() const
+{
+    return maxPurgeBatch;
+}
+
+
 }} // namespace qpid::broker
 
diff --git a/src/qpid/broker/Broker.h b/src/qpid/broker/Broker.h
index 6d5514a..629fc0f 100644
--- a/src/qpid/broker/Broker.h
+++ b/src/qpid/broker/Broker.h
@@ -168,6 +168,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
     mutable sys::Mutex linkClientPropertiesLock;
     framing::FieldTable linkClientProperties;
     bool timestampRcvMsgs;
+    const uint32_t maxPurgeBatch;
 
   public:
     QPID_BROKER_EXTERN virtual ~Broker();
@@ -347,6 +348,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
     uint32_t getDtxMaxTimeout() const;
     uint16_t getQueueThresholdEventRatio() const;
     uint getQueueLimit() const;
+    uint32_t getMaxPurgeBatch() const;
 
     /** Information identifying this system */
     boost::shared_ptr<const System> getSystem() const { return systemObject; }
diff --git a/src/qpid/broker/BrokerOptions.h b/src/qpid/broker/BrokerOptions.h
index 0ef25fb..f4f71d8 100644
--- a/src/qpid/broker/BrokerOptions.h
+++ b/src/qpid/broker/BrokerOptions.h
@@ -79,6 +79,7 @@ struct BrokerOptions : public qpid::Options
     uint32_t maxNegotiateTime;  // Max time in ms for connection with no negotiation
     size_t sessionMaxUnacked;   // Max un-acknowledged outgoing messages per session
     std::string fedTag;
+    uint32_t maxPurgeBatch;
 
 private:
     std::string getHome();
diff --git a/src/qpid/broker/Queue.cpp b/src/qpid/broker/Queue.cpp
index 3b54661..bda8fce 100644
--- a/src/qpid/broker/Queue.cpp
+++ b/src/qpid/broker/Queue.cpp
@@ -205,7 +205,8 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
     deleted(false),
     barrier(*this),
     allocator(new FifoDistributor( *messages )),
-    redirectSource(false)
+    redirectSource(false),
+    maxPurgeBatch(broker ? broker->getMaxPurgeBatch() : 1000)
 {
     current.setCount(0);//always track depth in messages
     if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it
@@ -664,17 +665,20 @@ void Queue::purgeExpired(sys::Duration lapse) {
     int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
     if (seconds == 0 || count / seconds < 1) {
         sys::AbsTime time = sys::AbsTime::now();
-        uint32_t count = remove(0, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete);
-        QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
-        //
-        // Report the count of discarded-by-ttl messages
-        //
-        if (mgmtObject && count) {
-            mgmtObject->inc_discardsTtl(count);
-            if (brokerMgmtObject) {
-                brokerMgmtObject->inc_discardsTtl(count);
+        uint32_t removed;
+        do {
+            removed = remove(maxPurgeBatch, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete);
+            QPID_LOG(debug, "Purged " << removed << " expired messages from " << getName());
+            //
+            // Report the count of discarded-by-ttl messages
+            //
+            if (mgmtObject && removed) {
+                mgmtObject->inc_discardsTtl(removed);
+                if (brokerMgmtObject) {
+                    brokerMgmtObject->inc_discardsTtl(removed);
+                }
             }
-        }
+        } while(maxPurgeBatch && removed == maxPurgeBatch);//if we hit the limit, there may be more to purge
     }
 }
 
diff --git a/src/qpid/broker/Queue.h b/src/qpid/broker/Queue.h
index 941af4d..e78b336 100644
--- a/src/qpid/broker/Queue.h
+++ b/src/qpid/broker/Queue.h
@@ -226,6 +226,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
     // Redirect source and target refer to each other. Only one is source.
     Queue::shared_ptr redirectPeer;
     bool redirectSource;
+    const uint32_t maxPurgeBatch;
 
     bool checkAutoDelete(const qpid::sys::Mutex::ScopedLock&) const;
     bool isUnused(const qpid::sys::Mutex::ScopedLock&) const;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-cpp] 01/02: QPID-8292: ensure bridge is marked detached on error

Posted by gs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-cpp.git

commit 05de76684862914880f3bdf58e73518502499903
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Apr 1 10:09:20 2019 +0100

    QPID-8292: ensure bridge is marked detached on error
---
 src/qpid/broker/Bridge.cpp | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/qpid/broker/Bridge.cpp b/src/qpid/broker/Bridge.cpp
index d6cd3e2..e9b0b7c 100644
--- a/src/qpid/broker/Bridge.cpp
+++ b/src/qpid/broker/Bridge.cpp
@@ -435,24 +435,28 @@ const string& Bridge::getLocalTag() const
 void Bridge::connectionException(
     framing::connection::CloseCode code, const std::string& msg)
 {
+    detached = true;
     if (errorListener) errorListener->connectionException(code, msg);
 }
 
 void Bridge::channelException(
     framing::session::DetachCode code, const std::string& msg)
 {
+    detached = true;
     if (errorListener) errorListener->channelException(code, msg);
 }
 
 void Bridge::executionException(
     framing::execution::ErrorCode code, const std::string& msg)
 {
+    detached = true;
     if (errorListener) errorListener->executionException(code, msg);
 }
 
 void Bridge::incomingExecutionException(
     framing::execution::ErrorCode code, const std::string& msg)
 {
+    detached = true;
     if (errorListener) errorListener->incomingExecutionException(code, msg);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org