You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/10/31 15:09:30 UTC
svn commit: r1537498 - in /qpid/trunk/qpid: cpp/src/qpid/broker/Broker.cpp
cpp/src/qpid/broker/Broker.h cpp/src/qpid/broker/SemanticState.cpp
cpp/src/qpid/broker/SessionAdapter.cpp
tests/src/py/qpid_tests/broker_0_10/queue.py
Author: gsim
Date: Thu Oct 31 14:09:29 2013
New Revision: 1537498
URL: http://svn.apache.org/r1537498
Log:
QPID-5280: prevent bind/unbind of someone elses exclusive queue
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1537498&r1=1537497&r2=1537498&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct 31 14:09:29 2013
@@ -795,7 +795,7 @@ void Broker::createObject(const std::str
framing::FieldTable arguments;
qpid::amqp_0_10::translate(extensions, arguments);
- bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId);
+ bind(binding.queue, binding.exchange, binding.key, arguments, 0, userId, connectionId);
} else if (type == TYPE_LINK) {
@@ -935,7 +935,7 @@ void Broker::deleteObject(const std::str
deleteExchange(name, userId, connectionId);
} else if (type == TYPE_BINDING) {
BindingIdentifier binding(name);
- unbind(binding.queue, binding.exchange, binding.key, userId, connectionId);
+ unbind(binding.queue, binding.exchange, binding.key, 0, userId, connectionId);
} else if (type == TYPE_LINK) {
boost::shared_ptr<Link> link = links.getLink(name);
if (link) {
@@ -1432,6 +1432,7 @@ void Broker::bind(const std::string& que
const std::string& exchangeName,
const std::string& key,
const qpid::framing::FieldTable& arguments,
+ const OwnershipToken* owner,
const std::string& userId,
const std::string& connectionId)
{
@@ -1453,6 +1454,9 @@ void Broker::bind(const std::string& que
throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
} else if (!exchange) {
throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+ } else if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(owner)) {
+ throw framing::ResourceLockedException(QPID_MSG("Cannot bind queue "
+ << queue->getName() << "; it is exclusive to another session"));
} else {
if (queue->bind(exchange, key, arguments)) {
getBrokerObservers().bind(exchange, queue, key, arguments);
@@ -1473,6 +1477,7 @@ void Broker::bind(const std::string& que
void Broker::unbind(const std::string& queueName,
const std::string& exchangeName,
const std::string& key,
+ const OwnershipToken* owner,
const std::string& userId,
const std::string& connectionId)
{
@@ -1492,6 +1497,9 @@ void Broker::unbind(const std::string& q
throw framing::NotFoundException(QPID_MSG("Unbind failed. No such queue: " << queueName));
} else if (!exchange) {
throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName));
+ } else if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(owner)) {
+ throw framing::ResourceLockedException(QPID_MSG("Cannot unbind queue "
+ << queue->getName() << "; it is exclusive to another session"));
} else {
if (exchange->unbind(queue, key, 0)) {
if (exchange->isDurable() && queue->isDurable()) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1537498&r1=1537497&r2=1537498&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Oct 31 14:09:29 2013
@@ -342,6 +342,7 @@ class Broker : public sys::Runnable, pub
const std::string& exchange,
const std::string& key,
const qpid::framing::FieldTable& arguments,
+ const OwnershipToken* owner,
const std::string& userId,
const std::string& connectionId);
@@ -349,6 +350,7 @@ class Broker : public sys::Runnable, pub
const std::string& queue,
const std::string& exchange,
const std::string& key,
+ const OwnershipToken* owner,
const std::string& userId,
const std::string& connectionId);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1537498&r1=1537497&r2=1537498&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Oct 31 14:09:29 2013
@@ -883,10 +883,10 @@ void SemanticState::unbindSessionBinding
fedArguments.setString(qpidFedOp, fedOpUnbind);
fedArguments.setString(qpidFedOrigin, fedOrigin);
session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments,
- userID, connectionId);
+ &session, userID, connectionId);
} else {
session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(),
- userID, connectionId);
+ &session, userID, connectionId);
}
}
catch (...) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1537498&r1=1537497&r2=1537498&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Thu Oct 31 14:09:29 2013
@@ -155,7 +155,7 @@ void SessionAdapter::ExchangeHandlerImpl
const string& exchangeName, const string& routingKey,
const FieldTable& arguments)
{
- getBroker().bind(queueName, exchangeName, routingKey, arguments,
+ getBroker().bind(queueName, exchangeName, routingKey, arguments, &session,
getConnection().getUserId(), getConnection().getMgmtId());
state.addBinding(queueName, exchangeName, routingKey, arguments);
}
@@ -165,7 +165,7 @@ void SessionAdapter::ExchangeHandlerImpl
const string& routingKey)
{
state.removeBinding(queueName, exchangeName, routingKey);
- getBroker().unbind(queueName, exchangeName, routingKey,
+ getBroker().unbind(queueName, exchangeName, routingKey, &session,
getConnection().getUserId(), getConnection().getMgmtId());
}
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py?rev=1537498&r1=1537497&r2=1537498&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py Thu Oct 31 14:09:29 2013
@@ -92,6 +92,7 @@ class QueueTests(TestBase010):
#declare an exclusive queue:
s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+ s1.exchange_bind(exchange="amq.fanout", queue="exclusive-queue")
try:
#other connection should not be allowed to declare this:
s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
@@ -115,6 +116,22 @@ class QueueTests(TestBase010):
except SessionException, e:
self.assertEquals(405, e.args[0].error_code)
+ s5 = self.conn.session("binder")
+ try:
+ #other connection should not be allowed to declare this:
+ s5.exchange_bind(exchange="amq.direct", queue="exclusive-queue", binding_key="abc")
+ self.fail("Expected exchange_bind on an exclusive queue to raise an exception")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+ s6 = self.conn.session("unbinder")
+ try:
+ #other connection should not be allowed to declare this:
+ s6.exchange_unbind(exchange="amq.fanout", queue="exclusive-queue")
+ self.fail("Expected exchange_unbind on an exclusive queue to raise an exception")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
def test_declare_passive(self):
"""
Test that the passive field is honoured in queue.declare
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org