You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/12/20 19:28:59 UTC
svn commit: r1424617 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Broker.cpp
qpid/broker/Exchange.cpp qpid/broker/ExchangeRegistry.cpp
qpid/broker/Queue.cpp qpid/broker/QueueRegistry.cpp tests/ha_tests.py
Author: aconway
Date: Thu Dec 20 18:28:58 2012
New Revision: 1424617
URL: http://svn.apache.org/viewvc?rev=1424617&view=rev
Log:
Bug 886656 - HA backup broker does not properly increment the alternate exchange user count
Set alternate exchange in-use counters correctly on backup brokers.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Dec 20 18:28:58 2012
@@ -1112,6 +1112,10 @@ std::pair<boost::shared_ptr<Queue>, bool
void Broker::deleteQueue(const std::string& name, const std::string& userId,
const std::string& connectionId, QueueFunctor check)
{
+ QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId
+ );
if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
}
@@ -1126,11 +1130,6 @@ void Broker::deleteQueue(const std::stri
} else {
throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
}
- QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId
- );
-
}
std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
@@ -1177,6 +1176,9 @@ std::pair<Exchange::shared_ptr, bool> Br
void Broker::deleteExchange(const std::string& name, const std::string& userId,
const std::string& connectionId)
{
+ QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name
+ << " user:" << userId
+ << " rhost:" << connectionId);
if (acl) {
if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
@@ -1187,13 +1189,10 @@ void Broker::deleteExchange(const std::s
}
Exchange::shared_ptr exchange(exchanges.get(name));
if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
- if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+ if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange."));
if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name, connectionId, userId);
- QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId);
}
void Broker::bind(const std::string& queueName,
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Dec 20 18:28:58 2012
@@ -223,6 +223,7 @@ Exchange::~Exchange ()
void Exchange::setAlternate(Exchange::shared_ptr _alternate)
{
alternate = _alternate;
+ alternate->incAlternateUsers();
if (mgmtExchange != 0) {
if (alternate.get() != 0)
mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId());
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Dec 20 18:28:58 2012
@@ -79,10 +79,7 @@ pair<Exchange::shared_ptr, bool> Exchang
}
exchanges[name] = exchange;
result = std::pair<Exchange::shared_ptr, bool>(exchange, true);
- if (alternate) {
- exchange->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
+ if (alternate) exchange->setAlternate(alternate);
// Call exchangeCreate inside the lock to ensure correct ordering.
if (broker) broker->getConfigurationObservers().exchangeCreate(exchange);
} else {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Dec 20 18:28:58 2012
@@ -1149,6 +1149,7 @@ Queue::shared_ptr Queue::restore( QueueR
void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
{
alternateExchange = exchange;
+ alternateExchange->incAlternateUsers();
if (mgmtObject) {
if (exchange.get() != 0)
mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Thu Dec 20 18:28:58 2012
@@ -60,10 +60,8 @@ QueueRegistry::declare(const string& nam
if (i == queues.end()) {
Queue::shared_ptr queue = create(name, settings);
//Move this to factory also?
- if (alternate) {
+ if (alternate)
queue->setAlternateExchange(alternate);//need to do this *before* create
- alternate->incAlternateUsers();
- }
if (!recovering) {
//create persistent record if required
queue->create();
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1424617&r1=1424616&r2=1424617&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Dec 20 18:28:58 2012
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from qpid.datatypes import uuid4
from brokertest import *
from ha_test import *
@@ -613,22 +613,24 @@ acl deny all all
to new members of a cluster. """
cluster = HaCluster(self, 2)
s = cluster[0].connect().session()
+ cluster[0].wait_status("active")
+ cluster[1].wait_status("ready")
# altex exchange: acts as alternate exchange
s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
# altq queue bound to altex, collect re-routed messages.
s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
- # 0ex exchange with alternate-exchange altex and no queues bound
- s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ # ex exchange with alternate-exchange altex and no queues bound
+ s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
# create queue q with alternate-exchange altex
s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
# create a bunch of exchanges to ensure we don't clean up prematurely if the
# response comes in multiple fragments.
- for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+ for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i)
def verify(broker):
s = broker.connect().session()
# Verify unmatched message goes to ex's alternate.
- s.sender("0ex").send("foo")
+ s.sender("ex").send("foo")
altq = s.receiver("altq")
self.assertEqual("foo", altq.fetch(timeout=0).content)
s.acknowledge()
@@ -640,20 +642,39 @@ acl deny all all
self.assertEqual("bar", altq.fetch(timeout=0).content)
s.acknowledge()
+ def ss(n): return cluster[n].connect().session()
+
# Sanity check: alternate exchanges on original broker
verify(cluster[0])
+ # Altex is in use as an alternate exchange.
+ self.assertRaises(SessionError,
+ lambda:ss(0).sender("altex;{delete:always}").close())
# Check backup that was connected during setup.
- cluster[1].wait_backup("0ex")
+ cluster[1].wait_status("ready")
+ cluster[1].wait_backup("ex")
cluster[1].wait_backup("q")
cluster.bounce(0)
verify(cluster[1])
+
# Check a newly started backup.
cluster.start()
- cluster[2].wait_backup("0ex")
+ cluster[2].wait_status("ready")
+ cluster[2].wait_backup("ex")
cluster[2].wait_backup("q")
cluster.bounce(1)
verify(cluster[2])
+ # Check that alt-exchange in-use count is replicated
+ s = cluster[2].connect().session();
+
+ self.assertRaises(SessionError,
+ lambda:ss(2).sender("altex;{delete:always}").close())
+ s.sender("q;{delete:always}").close()
+ self.assertRaises(SessionError,
+ lambda:ss(2).sender("altex;{delete:always}").close())
+ s.sender("ex;{delete:always}").close()
+ s.sender("altex;{delete:always}").close()
+
def test_priority_reroute(self):
"""Regression test for QPID-4262, rerouting messages from a priority queue
to itself causes a crash"""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org