You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/12/20 19:28:59 UTC

svn commit: r1424617 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Broker.cpp qpid/broker/Exchange.cpp qpid/broker/ExchangeRegistry.cpp qpid/broker/Queue.cpp qpid/broker/QueueRegistry.cpp tests/ha_tests.py

Author: aconway
Date: Thu Dec 20 18:28:58 2012
New Revision: 1424617

URL: http://svn.apache.org/viewvc?rev=1424617&view=rev
Log:
Bug 886656 - HA backup broker does not properly increment the alternate exchange user count

Set alternate exchange in-use counters correctly on backup brokers.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Dec 20 18:28:58 2012
@@ -1112,6 +1112,10 @@ std::pair<boost::shared_ptr<Queue>, bool
 void Broker::deleteQueue(const std::string& name, const std::string& userId,
                          const std::string& connectionId, QueueFunctor check)
 {
+    QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name
+                 << " user:" << userId
+                 << " rhost:" << connectionId
+    );
     if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
         throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
     }
@@ -1126,11 +1130,6 @@ void Broker::deleteQueue(const std::stri
     } else {
         throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
     }
-    QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
-                 << " user:" << userId
-                 << " rhost:" << connectionId
-    );
-
 }
 
 std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
@@ -1177,6 +1176,9 @@ std::pair<Exchange::shared_ptr, bool> Br
 void Broker::deleteExchange(const std::string& name, const std::string& userId,
                            const std::string& connectionId)
 {
+    QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name
+        << " user:" << userId
+        << " rhost:" << connectionId);
     if (acl) {
         if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
             throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
@@ -1187,13 +1189,10 @@ void Broker::deleteExchange(const std::s
     }
     Exchange::shared_ptr exchange(exchanges.get(name));
     if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
-    if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+    if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange."));
     if (exchange->isDurable()) store->destroy(*exchange);
     if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
     exchanges.destroy(name, connectionId,  userId);
-    QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
-        << " user:" << userId
-        << " rhost:" << connectionId);
 }
 
 void Broker::bind(const std::string& queueName,

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Dec 20 18:28:58 2012
@@ -223,6 +223,7 @@ Exchange::~Exchange ()
 void Exchange::setAlternate(Exchange::shared_ptr _alternate)
 {
     alternate = _alternate;
+    alternate->incAlternateUsers();
     if (mgmtExchange != 0) {
         if (alternate.get() != 0)
             mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId());

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Dec 20 18:28:58 2012
@@ -79,10 +79,7 @@ pair<Exchange::shared_ptr, bool> Exchang
             }
             exchanges[name] = exchange;
             result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
-            if (alternate) {
-                exchange->setAlternate(alternate);
-                alternate->incAlternateUsers();
-            }
+            if (alternate) exchange->setAlternate(alternate);
             // Call exchangeCreate inside the lock to ensure correct ordering.
             if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
         } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Dec 20 18:28:58 2012
@@ -1149,6 +1149,7 @@ Queue::shared_ptr Queue::restore( QueueR
 void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
 {
     alternateExchange = exchange;
+    alternateExchange->incAlternateUsers();
     if (mgmtObject) {
         if (exchange.get() != 0)
             mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Thu Dec 20 18:28:58 2012
@@ -60,10 +60,8 @@ QueueRegistry::declare(const string& nam
         if (i == queues.end()) {
             Queue::shared_ptr queue = create(name, settings);
             //Move this to factory also?
-            if (alternate) {
+            if (alternate)
                 queue->setAlternateExchange(alternate);//need to do this *before* create
-                alternate->incAlternateUsers();
-            }
             if (!recovering) {
                 //create persistent record if required
                 queue->create();

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Dec 20 18:28:58 2012
@@ -20,7 +20,7 @@
 
 import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
 import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
 from qpid.datatypes import uuid4
 from brokertest import *
 from ha_test import *
@@ -613,22 +613,24 @@ acl deny all all
         to new members of a cluster. """
         cluster = HaCluster(self, 2)
         s = cluster[0].connect().session()
+        cluster[0].wait_status("active")
+        cluster[1].wait_status("ready")
         # altex exchange: acts as alternate exchange
         s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
         # altq queue bound to altex, collect re-routed messages.
         s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
-        # 0ex exchange with alternate-exchange altex and no queues bound
-        s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+        # ex exchange with alternate-exchange altex and no queues bound
+        s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
         # create queue q with alternate-exchange altex
         s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
         # create a bunch of exchanges to ensure we don't clean up prematurely if the
         # response comes in multiple fragments.
-        for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+        for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i)
 
         def verify(broker):
             s = broker.connect().session()
             # Verify unmatched message goes to ex's alternate.
-            s.sender("0ex").send("foo")
+            s.sender("ex").send("foo")
             altq = s.receiver("altq")
             self.assertEqual("foo", altq.fetch(timeout=0).content)
             s.acknowledge()
@@ -640,20 +642,39 @@ acl deny all all
             self.assertEqual("bar", altq.fetch(timeout=0).content)
             s.acknowledge()
 
+        def ss(n): return cluster[n].connect().session()
+
         # Sanity check: alternate exchanges on original broker
         verify(cluster[0])
+        # Altex is in use as an alternate exchange.
+        self.assertRaises(SessionError,
+                          lambda:ss(0).sender("altex;{delete:always}").close())
         # Check backup that was connected during setup.
-        cluster[1].wait_backup("0ex")
+        cluster[1].wait_status("ready")
+        cluster[1].wait_backup("ex")
         cluster[1].wait_backup("q")
         cluster.bounce(0)
         verify(cluster[1])
+
         # Check a newly started backup.
         cluster.start()
-        cluster[2].wait_backup("0ex")
+        cluster[2].wait_status("ready")
+        cluster[2].wait_backup("ex")
         cluster[2].wait_backup("q")
         cluster.bounce(1)
         verify(cluster[2])
 
+        # Check that alt-exchange in-use count is replicated
+        s = cluster[2].connect().session();
+
+        self.assertRaises(SessionError,
+                          lambda:ss(2).sender("altex;{delete:always}").close())
+        s.sender("q;{delete:always}").close()
+        self.assertRaises(SessionError,
+                          lambda:ss(2).sender("altex;{delete:always}").close())
+        s.sender("ex;{delete:always}").close()
+        s.sender("altex;{delete:always}").close()
+
     def test_priority_reroute(self):
         """Regression test for QPID-4262, rerouting messages from a priority queue
         to itself causes a crash"""



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org