You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jo...@apache.org on 2011/02/07 19:21:25 UTC

svn commit: r1068042 - in /qpid/trunk/qpid/cpp/src: qpid/broker/SessionAdapter.cpp tests/MessagingSessionTests.cpp

Author: jonathan
Date: Mon Feb  7 18:21:24 2011
New Revision: 1068042

URL: http://svn.apache.org/viewvc?rev=1068042&view=rev
Log:
Ensures that messages acquired, but not acked, are released before a queue is deleted.

Otherwise, these messages are not routed to an alternate exchange, and the queue is not actually deleted.

Resolves QPID-3040 / Red Hat BZ 674678.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1068042&r1=1068041&r2=1068042&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Feb  7 18:21:24 2011
@@ -60,8 +60,8 @@ SessionAdapter::SessionAdapter(SemanticS
 static const std::string _TRUE("true");
 static const std::string _FALSE("false");
 
-void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, 
-                                                  const string& alternateExchange, 
+void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
+                                                  const string& alternateExchange,
                                                   bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
 
     AclModule* acl = getBroker().getAcl();
@@ -74,7 +74,7 @@ void SessionAdapter::ExchangeHandlerImpl
         if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
             throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
     }
-    
+
     //TODO: implement autoDelete
     Exchange::shared_ptr alternate;
     if (!alternateExchange.empty()) {
@@ -84,7 +84,7 @@ void SessionAdapter::ExchangeHandlerImpl
         Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
         checkType(actual, type);
         checkAlternate(actual, alternate);
-    }else{        
+    }else{
         if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
             throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
         }
@@ -128,10 +128,10 @@ void SessionAdapter::ExchangeHandlerImpl
                       || !exchange->getAlternate()))
         throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange "
                                            << (exchange->getAlternate() ? exchange->getAlternate()->getName() : "<nonexistent>")
-                                           << ", requested " 
+                                           << ", requested "
                                            << alternate->getName()));
 }
-                
+
 void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
 {
     AclModule* acl = getBroker().getAcl();
@@ -164,12 +164,12 @@ ExchangeQueryResult SessionAdapter::Exch
         Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
         return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
     } catch (const NotFoundException& /*e*/) {
-        return ExchangeQueryResult("", false, true, FieldTable());        
+        return ExchangeQueryResult("", false, true, FieldTable());
     }
 }
 
-void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, 
-                                           const string& exchangeName, const string& routingKey, 
+void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
+                                           const string& exchangeName, const string& routingKey,
                                            const FieldTable& arguments)
 {
     AclModule* acl = getBroker().getAcl();
@@ -201,7 +201,7 @@ void SessionAdapter::ExchangeHandlerImpl
         throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
     }
 }
- 
+
 void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
                                                  const string& exchangeName,
                                                  const string& routingKey)
@@ -245,7 +245,7 @@ ExchangeBoundResult SessionAdapter::Exch
         if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchangeName,&params) )
             throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bound request from " << getConnection().getUserId()));
     }
-    
+
     Exchange::shared_ptr exchange;
     try {
         exchange = getBroker().getExchanges().get(exchangeName);
@@ -296,10 +296,10 @@ void SessionAdapter::QueueHandlerImpl::d
         exclusiveQueues.erase(exclusiveQueues.begin());
     }
 }
-    
-bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const 
-{ 
-    return session.isLocal(t); 
+
+bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
+{
+    return session.isLocal(t);
 }
 
 
@@ -310,15 +310,15 @@ QueueQueryResult SessionAdapter::QueueHa
         if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_QUEUE,name,NULL) )
             throw UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << getConnection().getUserId()));
     }
-    
+
     Queue::shared_ptr queue = session.getBroker().getQueues().find(name);
     if (queue) {
 
         Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
-        
-        return QueueQueryResult(queue->getName(), 
-                                alternateExchange ? alternateExchange->getName() : "", 
-                                queue->isDurable(), 
+
+        return QueueQueryResult(queue->getName(),
+                                alternateExchange ? alternateExchange->getName() : "",
+                                queue->isDurable(),
                                 queue->hasExclusiveOwner(),
                                 queue->isAutoDelete(),
                                 queue->getSettings(),
@@ -330,9 +330,9 @@ QueueQueryResult SessionAdapter::QueueHa
 }
 
 void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
-                                               bool passive, bool durable, bool exclusive, 
+                                               bool passive, bool durable, bool exclusive,
                                                bool autoDelete, const qpid::framing::FieldTable& arguments)
-{ 
+{
     AclModule* acl = getBroker().getAcl();
     if (acl) {
         std::map<acl::Property, std::string> params;
@@ -358,7 +358,7 @@ void SessionAdapter::QueueHandlerImpl::d
     queue = getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
-        std::pair<Queue::shared_ptr, bool> queue_created =  
+        std::pair<Queue::shared_ptr, bool> queue_created =
             getBroker().getQueues().declare(name, durable,
                                             autoDelete,
                                             exclusive ? &session : 0);
@@ -395,12 +395,12 @@ void SessionAdapter::QueueHandlerImpl::d
                                                       queue_created.second ? "created" : "existing"));
     }
 
-    if (exclusive && !queue->isExclusiveOwner(&session)) 
+    if (exclusive && !queue->isExclusiveOwner(&session))
         throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
                                                << queue->getName()));
-} 
-        
-        
+}
+
+
 void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
     AclModule* acl = getBroker().getAcl();
     if (acl)
@@ -409,8 +409,8 @@ void SessionAdapter::QueueHandlerImpl::p
              throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
     }
     getQueue(queue)->purge();
-} 
-        
+}
+
 void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
 
     AclModule* acl = getBroker().getAcl();
@@ -421,7 +421,7 @@ void SessionAdapter::QueueHandlerImpl::d
     }
 
     Queue::shared_ptr q = getQueue(queue);
-    if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) 
+    if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session))
         throw ResourceLockedException(QPID_MSG("Cannot delete queue "
                                                << queue << "; it is exclusive to another session"));
     if(ifEmpty && q->getMessageCount() > 0){
@@ -434,6 +434,14 @@ void SessionAdapter::QueueHandlerImpl::d
             QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
             if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
         }
+
+        DeliveryRecords::iterator i;
+        for (i=state.getUnacked().begin(); i != state.getUnacked().end(); ++i) {
+            if (i->getQueue() == q) {
+                i->release(true);
+            }
+        }
+
         q->destroy();
         getBroker().getQueues().destroy(queue);
         q->unbind(getBroker().getExchanges(), q);
@@ -443,9 +451,9 @@ void SessionAdapter::QueueHandlerImpl::d
             agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
         q->notifyDeleted();
     }
-} 
+}
 
-SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 
+SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
     HandlerHelper(s),
     releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
     releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
@@ -482,7 +490,7 @@ SessionAdapter::MessageHandlerImpl::subs
 
     AclModule* acl = getBroker().getAcl();
     if (acl)
-    {        
+    {
          if (!acl->authorise(getConnection().getUserId(),acl::ACT_CONSUME,acl::OBJ_QUEUE,queueName,NULL) )
              throw UnauthorizedAccessException(QPID_MSG("ACL denied Queue subscribe request from " << getConnection().getUserId()));
     }
@@ -495,8 +503,8 @@ SessionAdapter::MessageHandlerImpl::subs
         throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue "
                                                << queue->getName()));
 
-    state.consume(destination, queue, 
-                  acceptMode == 0, acquireMode == 0, exclusive, 
+    state.consume(destination, queue,
+                  acceptMode == 0, acquireMode == 0, exclusive,
                   resumeId, resumeTtl, arguments);
 
     ManagementAgent* agent = getBroker().getManagementAgent();
@@ -533,9 +541,9 @@ void SessionAdapter::MessageHandlerImpl:
         //unknown
         throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit));
     }
-    
+
 }
-    
+
 void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, uint8_t mode)
 {
     if (mode == 0) {
@@ -545,18 +553,18 @@ void SessionAdapter::MessageHandlerImpl:
         //window
         state.setWindowMode(destination);
     } else{
-        throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));        
+        throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));
     }
 }
-    
+
 void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination)
 {
-    state.flush(destination);        
+    state.flush(destination);
 }
 
 void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
 {
-    state.stop(destination);        
+    state.stop(destination);
 }
 
 void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
@@ -584,7 +592,7 @@ framing::MessageResumeResult SessionAdap
 {
     throw NotImplementedException("resuming transfers not yet supported");
 }
-    
+
 
 
 void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
@@ -619,7 +627,7 @@ void SessionAdapter::TxHandlerImpl::comm
 }
 
 void SessionAdapter::TxHandlerImpl::rollback()
-{    
+{
     state.rollback();
 }
 
@@ -656,7 +664,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
             return XaResult(XA_STATUS_XA_OK);
         }
     } catch (const DtxTimeoutException& /*e*/) {
-        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
+        return XaResult(XA_STATUS_XA_RBTIMEOUT);
     }
 }
 
@@ -675,7 +683,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
         }
         return XaResult(XA_STATUS_XA_OK);
     } catch (const DtxTimeoutException& /*e*/) {
-        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
+        return XaResult(XA_STATUS_XA_RBTIMEOUT);
     }
 }
 
@@ -685,7 +693,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
         bool ok = getBroker().getDtxManager().prepare(convert(xid));
         return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
     } catch (const DtxTimeoutException& /*e*/) {
-        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
+        return XaResult(XA_STATUS_XA_RBTIMEOUT);
     }
 }
 
@@ -696,7 +704,7 @@ XaResult SessionAdapter::DtxHandlerImpl:
         bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
         return XaResult(ok ? XA_STATUS_XA_OK : XA_STATUS_XA_RBROLLBACK);
     } catch (const DtxTimeoutException& /*e*/) {
-        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
+        return XaResult(XA_STATUS_XA_RBTIMEOUT);
     }
 }
 
@@ -707,14 +715,14 @@ XaResult SessionAdapter::DtxHandlerImpl:
         getBroker().getDtxManager().rollback(convert(xid));
         return XaResult(XA_STATUS_XA_OK);
     } catch (const DtxTimeoutException& /*e*/) {
-        return XaResult(XA_STATUS_XA_RBTIMEOUT);        
+        return XaResult(XA_STATUS_XA_RBTIMEOUT);
     }
 }
 
 DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
 {
     std::set<std::string> xids;
-    getBroker().getStore().collectPreparedXids(xids);        
+    getBroker().getStore().collectPreparedXids(xids);
     /*
      * create array of long structs
      */
@@ -735,7 +743,7 @@ void SessionAdapter::DtxHandlerImpl::for
 DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
 {
     uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
-    return DtxGetTimeoutResult(timeout);    
+    return DtxGetTimeoutResult(timeout);
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1068042&r1=1068041&r2=1068042&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Feb  7 18:21:24 2011
@@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public
 
     ~QueueCreatePolicyFixture()
     {
-        admin.deleteQueue(address.getName());    
+        admin.deleteQueue(address.getName());
     }
 };
 
@@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : pub
 
     ~ExchangeCreatePolicyFixture()
     {
-        admin.deleteExchange(address.getName());    
+        admin.deleteExchange(address.getName());
     }
 };
 
@@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueu
     s1.close();
     Receiver r1 = fix.session.createReceiver(a1);
     r1.close();
-    
+
     std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
     Sender s2 = fix.session.createSender(a2);
     s2.close();
@@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerificati
 {
     MessagingFixture fix;
     fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}");
-    BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);    
+    BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
 }
 
 QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
@@ -775,21 +775,57 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscri
 QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser)
 {
     MessagingFixture fix;
-    
+
     std::string address =       "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }";
     std::string browseAddress = "exclusive-queue; { mode: browse }";
 
     Receiver receiver = fix.session.createReceiver(address);
     fix.session.sync();
 
-    Connection c2 = fix.newConnection();    
+    Connection c2 = fix.newConnection();
     c2.open();
     Session s2 = c2.createSession();
-   
+
     BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress));
-    c2.close();    
+    c2.close();
 }
 
+
+QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages)
+{
+    MessagingFixture fix;
+    const uint capacity = 5;
+
+    Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+	Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}");
+	Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}");
+
+	receiver1.setCapacity(capacity);
+	receiver2.setCapacity(2*capacity);
+
+    Message out("test-message");
+    for (uint i = 0; i < capacity*2; ++i) {
+        sender.send(out);
+    }
+
+    // Read half the messages, do not acknowledge
+    Message in;
+    for (uint i = 0; i < capacity; ++i) {
+        in = receiver1.fetch(Duration::SECOND * 5);
+        BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+    }
+
+	receiver1.close();
+
+    // Make sure all unacked messages were sent to the alternate
+    // exchange when the queue was deleted.
+    for (uint i = 0; i < capacity*2; ++i) {
+        in = receiver2.fetch(Duration::SECOND * 5);
+        BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+    }
+}
+
+
 QPID_AUTO_TEST_CASE(testAuthenticatedUsername)
 {
     MessagingFixture fix;
@@ -828,7 +864,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
         messages.push_back(msg);
     }
     const uint batch(10); //acknowledge first 10 messages only
-    for (uint i = 0; i < batch; ++i) {    
+    for (uint i = 0; i < batch; ++i) {
         other.acknowledge(messages[i]);
     }
     messages.clear();
@@ -836,7 +872,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
     other.close();
 
     other = fix.connection.createSession();
-    receiver = other.createReceiver(fix.queue);    
+    receiver = other.createReceiver(fix.queue);
     for (uint i = 0; i < (count-batch); ++i) {
         Message msg = receiver.fetch();
         BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
@@ -847,7 +883,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
 
     //check unacknowledged messages are still enqueued
     other = fix.connection.createSession();
-    receiver = other.createReceiver(fix.queue);    
+    receiver = other.createReceiver(fix.queue);
     for (uint i = 0; i < ((count-batch)/2); ++i) {
         Message msg = receiver.fetch();
         BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str());



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org