You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jo...@apache.org on 2011/02/07 19:21:25 UTC
svn commit: r1068042 - in /qpid/trunk/qpid/cpp/src:
qpid/broker/SessionAdapter.cpp tests/MessagingSessionTests.cpp
Author: jonathan
Date: Mon Feb 7 18:21:24 2011
New Revision: 1068042
URL: http://svn.apache.org/viewvc?rev=1068042&view=rev
Log:
Ensures that messages acquired, but not acked, are released before a queue is deleted.
Otherwise, these messages are not routed to an alternate exchange, and the queue is not actually deleted.
Resolves QPID-3040 / Red Hat BZ 674678.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1068042&r1=1068041&r2=1068042&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Feb 7 18:21:24 2011
@@ -60,8 +60,8 @@ SessionAdapter::SessionAdapter(SemanticS
static const std::string _TRUE("true");
static const std::string _FALSE("false");
-void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
- const string& alternateExchange,
+void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
+ const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
AclModule* acl = getBroker().getAcl();
@@ -74,7 +74,7 @@ void SessionAdapter::ExchangeHandlerImpl
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
}
-
+
//TODO: implement autoDelete
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
@@ -84,7 +84,7 @@ void SessionAdapter::ExchangeHandlerImpl
Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
checkType(actual, type);
checkAlternate(actual, alternate);
- }else{
+ }else{
if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
}
@@ -128,10 +128,10 @@ void SessionAdapter::ExchangeHandlerImpl
|| !exchange->getAlternate()))
throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange "
<< (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>")
- << ", requested "
+ << ", requested "
<< alternate->getName()));
}
-
+
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
{
AclModule* acl = getBroker().getAcl();
@@ -164,12 +164,12 @@ ExchangeQueryResult SessionAdapter::Exch
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const NotFoundException& /*e*/) {
- return ExchangeQueryResult("", false, true, FieldTable());
+ return ExchangeQueryResult("", false, true, FieldTable());
}
}
-void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
- const string& exchangeName, const string& routingKey,
+void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
+ const string& exchangeName, const string& routingKey,
const FieldTable& arguments)
{
AclModule* acl = getBroker().getAcl();
@@ -201,7 +201,7 @@ void SessionAdapter::ExchangeHandlerImpl
throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
}
}
-
+
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
@@ -245,7 +245,7 @@ ExchangeBoundResult SessionAdapter::Exch
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,¶ms) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId()));
}
-
+
Exchange::shared_ptr exchange;
try {
exchange = getBroker().getExchanges().get(exchangeName);
@@ -296,10 +296,10 @@ void SessionAdapter::QueueHandlerImpl::d
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
-
-bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
-{
- return session.isLocal(t);
+
+bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
+{
+ return session.isLocal(t);
}
@@ -310,15 +310,15 @@ QueueQueryResult SessionAdapter::QueueHa
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId()));
}
-
+
Queue::shared_ptr queue = session.getBroker().getQueues().find(name);
if (queue) {
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
-
- return QueueQueryResult(queue->getName(),
- alternateExchange ? alternateExchange->getName() : "",
- queue->isDurable(),
+
+ return QueueQueryResult(queue->getName(),
+ alternateExchange ? alternateExchange->getName() : "",
+ queue->isDurable(),
queue->hasExclusiveOwner(),
queue->isAutoDelete(),
queue->getSettings(),
@@ -330,9 +330,9 @@ QueueQueryResult SessionAdapter::QueueHa
}
void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
- bool passive, bool durable, bool exclusive,
+ bool passive, bool durable, bool exclusive,
bool autoDelete, const qpid::framing::FieldTable& arguments)
-{
+{
AclModule* acl = getBroker().getAcl();
if (acl) {
std::map<acl::Property, std::string> params;
@@ -358,7 +358,7 @@ void SessionAdapter::QueueHandlerImpl::d
queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
- std::pair<Queue::shared_ptr, bool> queue_created =
+ std::pair<Queue::shared_ptr, bool> queue_created =
getBroker().getQueues().declare(name, durable,
autoDelete,
exclusive ? &session : 0);
@@ -395,12 +395,12 @@ void SessionAdapter::QueueHandlerImpl::d
queue_created.second ? "created" : "existing"));
}
- if (exclusive && !queue->isExclusiveOwner(&session))
+ if (exclusive && !queue->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
<< queue->getName()));
-}
-
-
+}
+
+
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
AclModule* acl = getBroker().getAcl();
if (acl)
@@ -409,8 +409,8 @@ void SessionAdapter::QueueHandlerImpl::p
throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
}
getQueue(queue)->purge();
-}
-
+}
+
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
AclModule* acl = getBroker().getAcl();
@@ -421,7 +421,7 @@ void SessionAdapter::QueueHandlerImpl::d
}
Queue::shared_ptr q = getQueue(queue);
- if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
+ if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
throw ResourceLockedException(QPID_MSG("Cannot delete queue "
<< queue << "; it is exclusive to another session"));
if(ifEmpty && q->getMessageCount() > 0){
@@ -434,6 +434,14 @@ void SessionAdapter::QueueHandlerImpl::d
QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
}
+
+ DeliveryRecords::iterator i;
+ for (i=state.getUnacked().begin(); i != state.getUnacked().end(); ++i) {
+ if (i->getQueue() == q) {
+ i->release(true);
+ }
+ }
+
q->destroy();
getBroker().getQueues().destroy(queue);
q->unbind(getBroker().getExchanges(), q);
@@ -443,9 +451,9 @@ void SessionAdapter::QueueHandlerImpl::d
agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
q->notifyDeleted();
}
-}
+}
-SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
+SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
@@ -482,7 +490,7 @@ SessionAdapter::MessageHandlerImpl::subs
AclModule* acl = getBroker().getAcl();
if (acl)
- {
+ {
if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) )
throw UnauthorizedAccessException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId()));
}
@@ -495,8 +503,8 @@ SessionAdapter::MessageHandlerImpl::subs
throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue "
<< queue->getName()));
- state.consume(destination, queue,
- acceptMode == 0, acquireMode == 0, exclusive,
+ state.consume(destination, queue,
+ acceptMode == 0, acquireMode == 0, exclusive,
resumeId, resumeTtl, arguments);
ManagementAgent* agent = getBroker().getManagementAgent();
@@ -533,9 +541,9 @@ void SessionAdapter::MessageHandlerImpl:
//unknown
throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit));
}
-
+
}
-
+
void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, uint8_t mode)
{
if (mode == 0) {
@@ -545,18 +553,18 @@ void SessionAdapter::MessageHandlerImpl:
//window
state.setWindowMode(destination);
} else{
- throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));
+ throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));
}
}
-
+
void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination)
{
- state.flush(destination);
+ state.flush(destination);
}
void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
{
- state.stop(destination);
+ state.stop(destination);
}
void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
@@ -584,7 +592,7 @@ framing::MessageResumeResult SessionAdap
{
throw NotImplementedException("resuming transfers not yet supported");
}
-
+
void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
@@ -619,7 +627,7 @@ void SessionAdapter::TxHandlerImpl::comm
}
void SessionAdapter::TxHandlerImpl::rollback()
-{
+{
state.rollback();
}
@@ -656,7 +664,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
return XaResult(XA_STATUS_XA_OK);
}
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -675,7 +683,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
}
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -685,7 +693,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
bool ok = getBroker().getDtxManager().prepare(convert(xid));
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -696,7 +704,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
@@ -707,14 +715,14 @@ XaResult SessionAdapter::DtxHandlerImpl:
getBroker().getDtxManager().rollback(convert(xid));
return XaResult(XA_STATUS_XA_OK);
} catch (const DtxTimeoutException& /*e*/) {
- return XaResult(XA_STATUS_XA_RBTIMEOUT);
+ return XaResult(XA_STATUS_XA_RBTIMEOUT);
}
}
DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
{
std::set<std::string> xids;
- getBroker().getStore().collectPreparedXids(xids);
+ getBroker().getStore().collectPreparedXids(xids);
/*
* create array of long structs
*/
@@ -735,7 +743,7 @@ void SessionAdapter::DtxHandlerImpl::for
DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
{
uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
- return DtxGetTimeoutResult(timeout);
+ return DtxGetTimeoutResult(timeout);
}
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1068042&r1=1068041&r2=1068042&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Feb 7 18:21:24 2011
@@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public
~QueueCreatePolicyFixture()
{
- admin.deleteQueue(address.getName());
+ admin.deleteQueue(address.getName());
}
};
@@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : pub
~ExchangeCreatePolicyFixture()
{
- admin.deleteExchange(address.getName());
+ admin.deleteExchange(address.getName());
}
};
@@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueu
s1.close();
Receiver r1 = fix.session.createReceiver(a1);
r1.close();
-
+
std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
@@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerificati
{
MessagingFixture fix;
fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}");
- BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
+ BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
}
QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
@@ -775,21 +775,57 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscri
QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser)
{
MessagingFixture fix;
-
+
std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }";
std::string browseAddress = "exclusive-queue; { mode: browse }";
Receiver receiver = fix.session.createReceiver(address);
fix.session.sync();
- Connection c2 = fix.newConnection();
+ Connection c2 = fix.newConnection();
c2.open();
Session s2 = c2.createSession();
-
+
BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress));
- c2.close();
+ c2.close();
}
+
+QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages)
+{
+ MessagingFixture fix;
+ const uint capacity = 5;
+
+ Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+ Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}");
+ Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}");
+
+ receiver1.setCapacity(capacity);
+ receiver2.setCapacity(2*capacity);
+
+ Message out("test-message");
+ for (uint i = 0; i < capacity*2; ++i) {
+ sender.send(out);
+ }
+
+ // Read half the messages, do not acknowledge
+ Message in;
+ for (uint i = 0; i < capacity; ++i) {
+ in = receiver1.fetch(Duration::SECOND * 5);
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ }
+
+ receiver1.close();
+
+ // Make sure all unacked messages were sent to the alternate
+ // exchange when the queue was deleted.
+ for (uint i = 0; i < capacity*2; ++i) {
+ in = receiver2.fetch(Duration::SECOND * 5);
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ }
+}
+
+
QPID_AUTO_TEST_CASE(testAuthenticatedUsername)
{
MessagingFixture fix;
@@ -828,7 +864,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
messages.push_back(msg);
}
const uint batch(10); //acknowledge first 10 messages only
- for (uint i = 0; i < batch; ++i) {
+ for (uint i = 0; i < batch; ++i) {
other.acknowledge(messages[i]);
}
messages.clear();
@@ -836,7 +872,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
other.close();
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < (count-batch); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
@@ -847,7 +883,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
//check unacknowledged messages are still enqueued
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < ((count-batch)/2); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str());
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org