You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/04/18 15:49:28 UTC

svn commit: r1588471 - in /qpid/branches/0.28/qpid/cpp/src: qpid/ha/Primary.cpp qpid/ha/PrimaryQueueLimits.h qpid/ha/RemoteBackup.cpp qpid/ha/ReplicationTest.cpp qpid/ha/ReplicationTest.h tests/ha_test.py tests/ha_tests.py

Author: aconway
Date: Fri Apr 18 13:49:28 2014
New Revision: 1588471

URL: http://svn.apache.org/r1588471
Log:
QPID-5666: HA fails with resource-limit-exceeded: Exceeded replicated queue limit

This is regression introduced in r1561206: CommitDate: Fri Jan 24 21:54:59 2014 +0000
  QPID-5513: HA backup fails if number of replicated queues exceeds number of channels.

Fixed by the current commit. PrimaryQueueLimits was not taking account of queues already
on the broker prior to promotion.

Modified:
    qpid/branches/0.28/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/branches/0.28/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
    qpid/branches/0.28/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
    qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.h
    qpid/branches/0.28/qpid/cpp/src/tests/ha_test.py
    qpid/branches/0.28/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/0.28/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.28/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1588471&r1=1588470&r2=1588471&view=diff
==============================================================================
--- qpid/branches/0.28/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/0.28/qpid/cpp/src/qpid/ha/Primary.cpp Fri Apr 18 13:49:28 2014
@@ -136,7 +136,7 @@ Primary::Primary(HaBroker& hb, const Bro
     logPrefix("Primary: "), active(false),
     replicationTest(hb.getSettings().replicateDefault.get()),
     sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
-    queueLimits(logPrefix)
+    queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
 {
     // Note that at this point, we are still rejecting client connections.
     // So we are safe from client interference while we set up the primary.

Modified: qpid/branches/0.28/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.28/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h?rev=1588471&r1=1588470&r2=1588471&view=diff
==============================================================================
--- qpid/branches/0.28/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h (original)
+++ qpid/branches/0.28/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h Fri Apr 18 13:49:28 2014
@@ -22,9 +22,12 @@
  *
  */
 
+#include "ReplicationTest.h"
 #include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueRegistry.h>
 #include <qpid/framing/amqp_types.h>
 #include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
 #include <string>
 
 namespace qpid {
@@ -45,8 +48,15 @@ class PrimaryQueueLimits
 {
   public:
     // FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max
-    PrimaryQueueLimits(const std::string& lp) :
-        logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0) {}
+    PrimaryQueueLimits(const std::string& lp,
+                       broker::QueueRegistry& qr,
+                       const ReplicationTest& rt
+    ) :
+        logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0)
+    {
+        // Get initial count of replicated queues
+        qr.eachQueue(boost::bind(&PrimaryQueueLimits::addQueueIfReplicated, this, _1, rt)); 
+    }
 
     /** Add a replicated queue
      *@exception ResourceLimitExceededException if this would exceed the limit.
@@ -57,15 +67,22 @@ class PrimaryQueueLimits
                      << " exceeds limit of " << maxQueues
                      << " replicated queues.");
             throw framing::ResourceLimitExceededException(
-                "Exceeded replicated queue limit.");
+                Msg() << "Exceeded replicated queue limit " << queues << " >= " << maxQueues);
         }
         else ++queues;
     }
 
+    void addQueueIfReplicated(const boost::shared_ptr<broker::Queue>& q, const ReplicationTest& rt) {
+        if(rt.useLevel(*q)) addQueue(q);
+    }
+
     /** Remove a replicated queue.
      * @pre Was previously added with addQueue
      */
-    void removeQueue(const boost::shared_ptr<broker::Queue>&) { --queues; }
+    void removeQueue(const boost::shared_ptr<broker::Queue>&) {
+        assert(queues != 0);
+        --queues;
+    }
 
     // TODO aconway 2014-01-24: Currently replication links always use the
     // hard-coded framing::CHANNEL_MAX. In future (e.g. when we support AMQP1.0
@@ -83,7 +100,7 @@ class PrimaryQueueLimits
     std::string logPrefix;
     uint64_t maxQueues;
     uint64_t queues;
-};
+}; 
 
 }} // namespace qpid::ha
 

Modified: qpid/branches/0.28/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.28/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1588471&r1=1588470&r2=1588471&view=diff
==============================================================================
--- qpid/branches/0.28/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/0.28/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Fri Apr 18 13:49:28 2014
@@ -41,7 +41,7 @@ RemoteBackup::RemoteBackup(
     std::ostringstream oss;
     oss << "Remote backup at " << info << ": ";
     logPrefix = oss.str();
-    QPID_LOG(debug, logPrefix << "Connected");
+    QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected"));
 }
 
 RemoteBackup::~RemoteBackup() {

Modified: qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1588471&r1=1588470&r2=1588471&view=diff
==============================================================================
--- qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.cpp (original)
+++ qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.cpp Fri Apr 18 13:49:28 2014
@@ -29,20 +29,20 @@ namespace ha {
 
 using types::Variant;
 
-ReplicateLevel ReplicationTest::getLevel(const std::string& str) {
+ReplicateLevel ReplicationTest::getLevel(const std::string& str) const {
     Enum<ReplicateLevel> rl(replicateDefault);
     if (!str.empty()) rl.parse(str);
     return rl.get();
 }
 
-ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) {
+ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) const {
     if (f.isSet(QPID_REPLICATE))
         return getLevel(f.getAsString(QPID_REPLICATE));
     else
         return replicateDefault;
 }
 
-ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
+ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) const {
     Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
     if (i != m.end())
         return getLevel(i->second.asString());
@@ -50,7 +50,7 @@ ReplicateLevel ReplicationTest::getLevel
         return replicateDefault;
 }
 
-ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
+ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) const {
     const Variant::Map& qmap(q.getSettings().original);
     Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE);
     if (i != qmap.end())
@@ -59,16 +59,15 @@ ReplicateLevel ReplicationTest::getLevel
         return getLevel(q.getSettings().storeSettings);
 }
 
-ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) {
+ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) const {
     return getLevel(ex.getArgs());
 }
 
-ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q)
-{
+ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) const {
     return q.getSettings().isTemporary ? ReplicationTest(NONE).getLevel(q) : getLevel(q);
 }
 
-ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) {
+ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) const {
     return ReplicationTest::getLevel(ex);
 }
 

Modified: qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1588471&r1=1588470&r2=1588471&view=diff
==============================================================================
--- qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/branches/0.28/qpid/cpp/src/qpid/ha/ReplicationTest.h Fri Apr 18 13:49:28 2014
@@ -56,18 +56,18 @@ class ReplicationTest
         replicateDefault(replicateDefault_) {}
 
     // Get the replication level set on an object, or default if not set.
-    ReplicateLevel getLevel(const std::string& str);
-    ReplicateLevel getLevel(const framing::FieldTable& f);
-    ReplicateLevel getLevel(const types::Variant::Map& m);
-    ReplicateLevel getLevel(const broker::Queue&);
-    ReplicateLevel getLevel(const broker::Exchange&);
+    ReplicateLevel getLevel(const std::string& str) const;
+    ReplicateLevel getLevel(const framing::FieldTable& f) const;
+    ReplicateLevel getLevel(const types::Variant::Map& m) const;
+    ReplicateLevel getLevel(const broker::Queue&) const;
+    ReplicateLevel getLevel(const broker::Exchange&) const;
 
     // Calculate level for objects that may not have replication set,
     // including auto-delete/exclusive settings.
-    ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive);
-    ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive);
-    ReplicateLevel useLevel(const broker::Queue&);
-    ReplicateLevel useLevel(const broker::Exchange&);
+    ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive) const;
+    ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive) const;
+    ReplicateLevel useLevel(const broker::Queue&) const;
+    ReplicateLevel useLevel(const broker::Exchange&) const;
 
   private:
     ReplicateLevel replicateDefault;

Modified: qpid/branches/0.28/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/branches/0.28/qpid/cpp/src/tests/ha_test.py?rev=1588471&r1=1588470&r2=1588471&view=diff
==============================================================================
--- qpid/branches/0.28/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/branches/0.28/qpid/cpp/src/tests/ha_test.py Fri Apr 18 13:49:28 2014
@@ -196,7 +196,7 @@ acl allow all all
 
     def ha_status(self): return self.qmf().status
 
-    def wait_status(self, status):
+    def wait_status(self, status, timeout=5):
         def try_get_status():
             self._status = "<unknown>"
             # Ignore ConnectionError, the broker may not be up yet.
@@ -204,7 +204,7 @@ acl allow all all
                 self._status = self.ha_status()
                 return self._status == status;
             except ConnectionError: return False
-        assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%(
+        assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
             self, status, self._status)
 
     def wait_queue(self, queue, timeout=1):

Modified: qpid/branches/0.28/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/0.28/qpid/cpp/src/tests/ha_tests.py?rev=1588471&r1=1588470&r2=1588471&view=diff
==============================================================================
--- qpid/branches/0.28/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/0.28/qpid/cpp/src/tests/ha_tests.py Fri Apr 18 13:49:28 2014
@@ -885,6 +885,19 @@ acl deny all all
         old_sess.exchange_declare(exchange='ex1', type='fanout')
         cluster[1].wait_backup("ex1")
 
+    def test_resource_limit_bug(self):
+        """QPID-5666 Regression test: Incorrect resource limit exception for queue creation."""
+        cluster = HaCluster(self, 3)
+        qs = ["q%s"%i for i in xrange(10)]
+        s = cluster[0].connect().session()
+        s.sender("q;{create:always}").close()
+        cluster.kill(0)
+        cluster[1].promote()
+        cluster[1].wait_status("active")
+        s = cluster[1].connect().session()
+        s.receiver("q;{delete:always}").close()
+        s.sender("qq;{create:always}").close()
+                
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given fairshare limit



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org