You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [10/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 21 01:19:00 2011
@@ -70,12 +70,14 @@ SemanticState::SemanticState(DeliveryAda
       deliveryAdapter(da),
       tagGenerator("sgen"),
       dtxSelected(false),
-      authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
+      authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
       userID(getSession().getConnection().getUserId()),
       userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
       isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
       closeComplete(false)
-{}
+{
+    acl = getSession().getBroker().getAcl();
+}
 
 SemanticState::~SemanticState() {
     closed();
@@ -86,7 +88,7 @@ void SemanticState::closed() {
         //prevent requeued messages being redelivered to consumers
         for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
             disable(i->second);
-        }
+        }        
         if (dtxBuffer.get()) {
             dtxBuffer->fail();
         }
@@ -105,24 +107,16 @@ bool SemanticState::exists(const string&
     return consumers.find(consumerTag) != consumers.end();
 }
 
-namespace {
-    const std::string SEPARATOR("::");
-}
-    
-void SemanticState::consume(const string& tag,
+void SemanticState::consume(const string& tag, 
                             Queue::shared_ptr queue, bool ackRequired, bool acquire,
                             bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
 {
-    // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination).
-    // Create a globally unique name so the broker can identify individual consumers
-    std::string name = session.getSessionId().str() + SEPARATOR + tag;
-    ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments));
+    ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments));
     queue->consume(c, exclusive);//may throw exception
     consumers[tag] = c;
 }
 
-bool SemanticState::cancel(const string& tag)
-{
+void SemanticState::cancel(const string& tag){
     ConsumerImplMap::iterator i = consumers.find(tag);
     if (i != consumers.end()) {
         cancel(i->second);
@@ -130,13 +124,7 @@ bool SemanticState::cancel(const string&
         //should cancel all unacked messages for this consumer so that
         //they are not redelivered on recovery
         for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
-        //can also remove any records that are now redundant
-        DeliveryRecords::iterator removed =
-            remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
-        unacked.erase(removed, unacked.end());
-        return true;
-    } else {
-        return false;
+        
     }
 }
 
@@ -179,8 +167,8 @@ void SemanticState::startDtx(const std::
     if (!dtxSelected) {
         throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
     }
-    dtxBuffer.reset(new DtxBuffer(xid));
-    txBuffer = dtxBuffer;
+    dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
+    txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
     if (join) {
         mgr.join(xid, dtxBuffer);
     } else {
@@ -206,7 +194,7 @@ void SemanticState::endDtx(const std::st
         dtxBuffer->fail();
     } else {
         dtxBuffer->markEnded();
-    }
+    }    
     dtxBuffer.reset();
 }
 
@@ -248,7 +236,7 @@ void SemanticState::resumeDtx(const std:
 
     checkDtxTimeout();
     dtxBuffer->setSuspended(false);
-    txBuffer = dtxBuffer;
+    txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
 }
 
 void SemanticState::checkDtxTimeout()
@@ -266,33 +254,31 @@ void SemanticState::record(const Deliver
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
-                                          const string& _name,
-                                          Queue::shared_ptr _queue,
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, 
+                                          const string& _name, 
+                                          Queue::shared_ptr _queue, 
                                           bool ack,
                                           bool _acquire,
                                           bool _exclusive,
-                                          const string& _tag,
                                           const string& _resumeId,
                                           uint64_t _resumeTtl,
                                           const framing::FieldTable& _arguments
 
 
-) :
-    Consumer(_name, _acquire),
-    parent(_parent),
-    queue(_queue),
-    ackExpected(ack),
+) : 
+    Consumer(_acquire),
+    parent(_parent), 
+    name(_name), 
+    queue(_queue), 
+    ackExpected(ack), 
     acquire(_acquire),
-    blocked(true),
+    blocked(true), 
     windowing(true),
-    windowActive(false),
     exclusive(_exclusive),
     resumeId(_resumeId),
-    tag(_tag),
     resumeTtl(_resumeTtl),
     arguments(_arguments),
-    msgCredit(0),
+    msgCredit(0), 
     byteCredit(0),
     notifyEnabled(true),
     syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -303,10 +289,10 @@ SemanticState::ConsumerImpl::ConsumerImp
     {
         ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
         qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
-
+        
         if (agent != 0)
         {
-            mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
+            mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
                                                 !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
             agent->addObject (mgmtObject);
             mgmtObject->set_creditMode("WINDOW");
@@ -338,16 +324,16 @@ bool SemanticState::ConsumerImpl::delive
 {
     assertClusterSafe();
     allocateCredit(msg.payload);
-    DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing);
+    DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
     parent->deliver(record, sync);
+    if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered
     if (windowing || ackExpected || !acquire) {
         parent->record(record);
-    }
-    if (acquire && !ackExpected) {  // auto acquire && auto accept
-        queue->dequeue(0 /*ctxt*/, msg);
-        record.setEnded();
+    } 
+    if (acquire && !ackExpected) {
+        queue->dequeue(0, msg);
     }
     if (mgmtObject) { mgmtObject->inc_delivered(); }
     return true;
@@ -365,7 +351,7 @@ bool SemanticState::ConsumerImpl::accept
     // checkCredit fails because the message is to big, we should
     // remain on queue's listener list for possible smaller messages
     // in future.
-    //
+    // 
     blocked = !(filter(msg) && checkCredit(msg));
     return !blocked;
 }
@@ -377,7 +363,7 @@ struct ConsumerName {
 };
 
 ostream& operator<<(ostream& o, const ConsumerName& pc) {
-    return o << pc.consumer.getTag() << " on "
+    return o << pc.consumer.getName() << " on "
              << pc.consumer.getParent().getSession().getSessionId();
 }
 }
@@ -386,7 +372,7 @@ void SemanticState::ConsumerImpl::alloca
 {
     assertClusterSafe();
     uint32_t originalMsgCredit = msgCredit;
-    uint32_t originalByteCredit = byteCredit;
+    uint32_t originalByteCredit = byteCredit;        
     if (msgCredit != 0xFFFFFFFF) {
         msgCredit--;
     }
@@ -396,7 +382,7 @@ void SemanticState::ConsumerImpl::alloca
     QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
              << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
              << " now bytes: " << byteCredit << " msgs: " << msgCredit);
-
+    
 }
 
 bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
@@ -410,7 +396,7 @@ bool SemanticState::ConsumerImpl::checkC
     return enoughCredit;
 }
 
-SemanticState::ConsumerImpl::~ConsumerImpl()
+SemanticState::ConsumerImpl::~ConsumerImpl() 
 {
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
@@ -428,7 +414,7 @@ void SemanticState::unsubscribe(Consumer
     Queue::shared_ptr queue = c->getQueue();
     if(queue) {
         queue->cancel(c);
-        if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
+        if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {            
             Queue::tryAutoDelete(session.getBroker(), queue);
         }
     }
@@ -470,23 +456,23 @@ const std::string nullstring;
 }
 
 void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
-    msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
-
+    msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
+    
     std::string exchangeName = msg->getExchangeName();
-    if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
+    if (!cacheExchange || cacheExchange->getName() != exchangeName)
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);
     cacheExchange->setProperties(msg);
 
     /* verify the userid if specified: */
     std::string id =
     	msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
+    
     if (authMsg &&  !id.empty() && !(id == userID || (isDefaultRealm && id == userName)))
     {
         QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
         throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id));
     }
 
-    AclModule* acl = getSession().getBroker().getAcl();
     if (acl && acl->doTransferAcl())
     {
         if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
@@ -498,7 +484,7 @@ void SemanticState::route(intrusive_ptr<
 
     if (!strategy.delivered) {
         //TODO:if discard-unroutable, just drop it
-        //TODO:else if accept-mode is explicit, reject it
+        //TODO:else if accept-mode is explicit, reject it 
         //else route it to alternate exchange
         if (cacheExchange->getAlternate()) {
             cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -527,7 +513,7 @@ void SemanticState::ConsumerImpl::reques
 }
 
 bool SemanticState::complete(DeliveryRecord& delivery)
-{
+{    
     ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
         i->second->complete(delivery);
@@ -539,7 +525,7 @@ void SemanticState::ConsumerImpl::comple
 {
     if (!delivery.isComplete()) {
         delivery.complete();
-        if (windowing && windowActive) {
+        if (windowing) {
             if (msgCredit != 0xFFFFFFFF) msgCredit++;
             if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
         }
@@ -555,7 +541,7 @@ void SemanticState::recover(bool requeue
         unacked.clear();
         for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
     }else{
-        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
+        for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));        
         //unconfirmed messages re redelivered and therefore have their
         //id adjusted, confirmed messages are not and so the ordering
         //w.r.t id is lost
@@ -568,61 +554,50 @@ void SemanticState::deliver(DeliveryReco
     return deliveryAdapter.deliver(msg, sync);
 }
 
-const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
-{
-    ConsumerImpl::shared_ptr consumer;
-    if (!find(destination, consumer)) {
-        throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId()));
-    } else {
-        return consumer;
-    }
-}
-
-bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const
+SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
 {
-    // @todo KAG gsim: shouldn't the consumers map be locked????
-    ConsumerImplMap::const_iterator i = consumers.find(destination);
+    ConsumerImplMap::iterator i = consumers.find(destination);
     if (i == consumers.end()) {
-        return false;
+        throw NotFoundException(QPID_MSG("Unknown destination " << destination));
+    } else {
+        return *(i->second);
     }
-    consumer = i->second;
-    return true;
 }
 
 void SemanticState::setWindowMode(const std::string& destination)
 {
-    find(destination)->setWindowMode();
+    find(destination).setWindowMode();
 }
 
 void SemanticState::setCreditMode(const std::string& destination)
 {
-    find(destination)->setCreditMode();
+    find(destination).setCreditMode();
 }
 
 void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
 {
-    ConsumerImpl::shared_ptr c = find(destination);
-    c->addByteCredit(value);
-    c->requestDispatch();
+    ConsumerImpl& c = find(destination);
+    c.addByteCredit(value);
+    c.requestDispatch();
 }
 
 
 void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
 {
-    ConsumerImpl::shared_ptr c = find(destination);
-    c->addMessageCredit(value);
-    c->requestDispatch();
+    ConsumerImpl& c = find(destination);
+    c.addMessageCredit(value);
+    c.requestDispatch();
 }
 
 void SemanticState::flush(const std::string& destination)
 {
-    find(destination)->flush();
+    find(destination).flush();
 }
 
 
 void SemanticState::stop(const std::string& destination)
 {
-    find(destination)->stop();
+    find(destination).stop();
 }
 
 void SemanticState::ConsumerImpl::setWindowMode()
@@ -646,7 +621,6 @@ void SemanticState::ConsumerImpl::setCre
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
     assertClusterSafe();
-    if (windowing) windowActive = true;
     if (byteCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) byteCredit = value;
         else byteCredit += value;
@@ -656,7 +630,6 @@ void SemanticState::ConsumerImpl::addByt
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
     assertClusterSafe();
-    if (windowing) windowActive = true;
     if (msgCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) msgCredit = value;
         else msgCredit += value;
@@ -677,8 +650,7 @@ void SemanticState::ConsumerImpl::flush(
 {
     while(haveCredit() && queue->dispatch(shared_from_this()))
         ;
-    msgCredit = 0;
-    byteCredit = 0;
+    stop();
 }
 
 void SemanticState::ConsumerImpl::stop()
@@ -686,7 +658,6 @@ void SemanticState::ConsumerImpl::stop()
     assertClusterSafe();
     msgCredit = 0;
     byteCredit = 0;
-    windowActive = false;
 }
 
 Queue::shared_ptr SemanticState::getQueue(const string& name) const {
@@ -702,7 +673,7 @@ Queue::shared_ptr SemanticState::getQueu
 }
 
 AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{
+{   
     return DeliveryRecord::findRange(unacked, first, last);
 }
 
@@ -720,21 +691,14 @@ void SemanticState::release(DeliveryId f
     DeliveryRecords::reverse_iterator start(range.end);
     DeliveryRecords::reverse_iterator end(range.start);
     for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered));
-
-    DeliveryRecords::iterator removed =
-        remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
-    unacked.erase(removed, range.end);
 }
 
 void SemanticState::reject(DeliveryId first, DeliveryId last)
 {
     AckRange range = findRange(first, last);
     for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
-    //may need to remove the delivery records as well
-    for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId() <= last; ) {
-        if (i->isRedundant()) i = unacked.erase(i);
-        else i++;
-    }
+    //need to remove the delivery records as well
+    unacked.erase(range.start, range.end);
 }
 
 bool SemanticState::ConsumerImpl::doOutput()
@@ -797,13 +761,13 @@ void SemanticState::accepted(const Seque
         //in transactional mode, don't dequeue or remove, just
         //maintain set of acknowledged messages:
         accumulatedAck.add(commands);
-
+        
         if (dtxBuffer.get()) {
             //if enlisted in a dtx, copy the relevant slice from
             //unacked and record it against that transaction
             TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
             accumulatedAck.clear();
-            dtxBuffer->enlist(txAck);
+            dtxBuffer->enlist(txAck);    
 
             //mark the relevant messages as 'ended' in unacked
             //if the messages are already completed, they can be
@@ -825,6 +789,7 @@ void SemanticState::accepted(const Seque
 }
 
 void SemanticState::completed(const SequenceSet& commands) {
+    assertClusterSafe();
     DeliveryRecords::iterator removed =
         remove_if(unacked.begin(), unacked.end(),
                   isInSequenceSetAnd(commands,
@@ -835,6 +800,7 @@ void SemanticState::completed(const Sequ
 
 void SemanticState::attached()
 {
+    assertClusterSafe();
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->enableNotify();
         session.getConnection().outputTasks.addOutputTask(i->second.get());
@@ -844,6 +810,7 @@ void SemanticState::attached()
 
 void SemanticState::detached()
 {
+    assertClusterSafe();
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->disableNotify();
         session.getConnection().outputTasks.removeOutputTask(i->second.get());

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SemanticState.h Fri Oct 21 01:19:00 2011
@@ -65,7 +65,7 @@ class SessionContext;
  *
  * Message delivery is driven by ConsumerImpl::doOutput(), which is
  * called when a client's socket is ready to write data.
- *
+ * 
  */
 class SemanticState : private boost::noncopyable {
   public:
@@ -75,15 +75,14 @@ class SemanticState : private boost::non
     {
         mutable qpid::sys::Mutex lock;
         SemanticState* const parent;
+        const std::string name;
         const boost::shared_ptr<Queue> queue;
         const bool ackExpected;
         const bool acquire;
         bool blocked;
         bool windowing;
-        bool windowActive;
         bool exclusive;
         std::string resumeId;
-        const std::string tag;  // <destination> from AMQP 0-10 Message.subscribe command
         uint64_t resumeTtl;
         framing::FieldTable arguments;
         uint32_t msgCredit;
@@ -100,16 +99,15 @@ class SemanticState : private boost::non
       public:
         typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
 
-        ConsumerImpl(SemanticState* parent,
+        ConsumerImpl(SemanticState* parent, 
                      const std::string& name, boost::shared_ptr<Queue> queue,
                      bool ack, bool acquire, bool exclusive,
-                     const std::string& tag, const std::string& resumeId,
-                     uint64_t resumeTtl, const framing::FieldTable& arguments);
+                     const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
         ~ConsumerImpl();
         OwnershipToken* getSession();
-        bool deliver(QueuedMessage& msg);
-        bool filter(boost::intrusive_ptr<Message> msg);
-        bool accept(boost::intrusive_ptr<Message> msg);
+        bool deliver(QueuedMessage& msg);            
+        bool filter(boost::intrusive_ptr<Message> msg);            
+        bool accept(boost::intrusive_ptr<Message> msg);            
 
         void disableNotify();
         void enableNotify();
@@ -124,13 +122,15 @@ class SemanticState : private boost::non
         void addMessageCredit(uint32_t value);
         void flush();
         void stop();
-        void complete(DeliveryRecord&);
+        void complete(DeliveryRecord&);    
         boost::shared_ptr<Queue> getQueue() const { return queue; }
         bool isBlocked() const { return blocked; }
         bool setBlocked(bool set) { std::swap(set, blocked); return set; }
 
         bool doOutput();
 
+        std::string getName() const { return name; }
+
         bool isAckExpected() const { return ackExpected; }
         bool isAcquire() const { return acquire; }
         bool isWindowing() const { return windowing; }
@@ -138,7 +138,6 @@ class SemanticState : private boost::non
         uint32_t getMsgCredit() const { return msgCredit; }
         uint32_t getByteCredit() const { return byteCredit; }
         std::string getResumeId() const { return resumeId; };
-        const std::string& getTag() const { return tag; }
         uint64_t getResumeTtl() const { return resumeTtl; }
         const framing::FieldTable& getArguments() const { return arguments; }
 
@@ -149,10 +148,9 @@ class SemanticState : private boost::non
         management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
     };
 
-    typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
-
   private:
     typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
+    typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
 
     SessionContext& session;
     DeliveryAdapter& deliveryAdapter;
@@ -165,6 +163,7 @@ class SemanticState : private boost::non
     DtxBufferMap suspendedXids;
     framing::SequenceSet accumulatedAck;
     boost::shared_ptr<Exchange> cacheExchange;
+    AclModule* acl;
     const bool authMsg;
     const std::string userID;
     const std::string userName;
@@ -182,16 +181,14 @@ class SemanticState : private boost::non
     void disable(ConsumerImpl::shared_ptr);
 
   public:
-
     SemanticState(DeliveryAdapter&, SessionContext&);
     ~SemanticState();
 
     SessionContext& getSession() { return session; }
     const SessionContext& getSession() const { return session; }
 
-    const ConsumerImpl::shared_ptr find(const std::string& destination) const;
-    bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
-
+    ConsumerImpl& find(const std::string& destination);
+    
     /**
      * Get named queue, never returns 0.
      * @return: named queue
@@ -199,16 +196,16 @@ class SemanticState : private boost::non
      * @exception: ConnectionException if name="" and session has no default.
      */
     boost::shared_ptr<Queue> getQueue(const std::string& name) const;
-
+    
     bool exists(const std::string& consumerTag);
 
-    void consume(const std::string& destination,
-                 boost::shared_ptr<Queue> queue,
+    void consume(const std::string& destination, 
+                 boost::shared_ptr<Queue> queue, 
                  bool ackRequired, bool acquire, bool exclusive,
                  const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
                  const framing::FieldTable& = framing::FieldTable());
 
-    bool cancel(const std::string& tag);
+    void cancel(const std::string& tag);
 
     void setWindowMode(const std::string& destination);
     void setCreditMode(const std::string& destination);
@@ -221,13 +218,12 @@ class SemanticState : private boost::non
     void commit(MessageStore* const store);
     void rollback();
     void selectDtx();
-    bool getDtxSelected() const { return dtxSelected; }
     void startDtx(const std::string& xid, DtxManager& mgr, bool join);
     void endDtx(const std::string& xid, bool fail);
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
     void recover(bool requeue);
-    void deliver(DeliveryRecord& message, bool sync);
+    void deliver(DeliveryRecord& message, bool sync);            
     void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
     void release(DeliveryId first, DeliveryId last, bool setRedelivered);
     void reject(DeliveryId first, DeliveryId last);
@@ -248,12 +244,9 @@ class SemanticState : private boost::non
     DeliveryRecords& getUnacked() { return unacked; }
     framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
     TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
-    DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; }
     void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
-    void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
     void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
     void record(const DeliveryRecord& delivery);
-    DtxBufferMap& getSuspendedXids() { return suspendedXids; }
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.cpp Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/management/ManagementAgent.h"
-#include "qpid/broker/SessionState.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
 #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
@@ -65,56 +64,53 @@ void SessionAdapter::ExchangeHandlerImpl
                                                   const string& alternateExchange, 
                                                   bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
 
+    AclModule* acl = getBroker().getAcl();
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_TYPE, type));
+        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
+        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+        if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
+            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId()));
+    }
+    
     //TODO: implement autoDelete
     Exchange::shared_ptr alternate;
     if (!alternateExchange.empty()) {
         alternate = getBroker().getExchanges().get(alternateExchange);
     }
     if(passive){
-        AclModule* acl = getBroker().getAcl();
-        if (acl) {
-            //TODO: why does a passive declare require create
-            //permission? The purpose of the passive flag is to state
-            //that the exchange should *not* created. For
-            //authorisation a passive declare is similar to
-            //exchange-query.
-            std::map<acl::Property, std::string> params;
-            params.insert(make_pair(acl::PROP_TYPE, type));
-            params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
-            params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
-            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,&params) )
-                throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId()));
-        }
         Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
         checkType(actual, type);
         checkAlternate(actual, alternate);
-    }else{
+    }else{        
         if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) {
             throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")"));
         }
         try{
-            std::pair<Exchange::shared_ptr, bool> response =
-                getBroker().createExchange(exchange, type, durable, alternateExchange, args,
-                                           getConnection().getUserId(), getConnection().getUrl());
-            if (!response.second) {
-                //exchange already there, not created
+            std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
+            if (response.second) {
+                if (alternate) {
+                    response.first->setAlternate(alternate);
+                    alternate->incAlternateUsers();
+                }
+                if (durable) {
+                    getBroker().getStore().create(*response.first, args);
+                }
+            } else {
                 checkType(response.first, type);
                 checkAlternate(response.first, alternate);
-                ManagementAgent* agent = getBroker().getManagementAgent();
-                if (agent)
-                    agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(),
-                                                                 getConnection().getUserId(),
-                                                                 exchange,
-                                                                 type,
-                                                                 alternateExchange,
-                                                                 durable,
-                                                                 false,
-                                                                 ManagementAgent::toMap(args),
-                                                                 "existing"));
             }
+
+            ManagementAgent* agent = getBroker().getManagementAgent();
+            if (agent)
+                agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
+                                                             alternateExchange, durable, false, ManagementAgent::toMap(args),
+                                                             response.second ? "created" : "existing"));
+
         }catch(UnknownExchangeTypeException& /*e*/){
-            throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type));
+            throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
         }
     }
 }
@@ -138,8 +134,22 @@ void SessionAdapter::ExchangeHandlerImpl
                 
 void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
 {
-    //TODO: implement if-unused
-    getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
+    AclModule* acl = getBroker().getAcl();
+    if (acl) {
+        if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId()));
+    }
+
+    //TODO: implement unused
+    Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
+    if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+    if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
+    if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
+    getBroker().getExchanges().destroy(name);
+
+    ManagementAgent* agent = getBroker().getManagementAgent();
+    if (agent)
+        agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
 }
 
 ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -159,19 +169,67 @@ ExchangeQueryResult SessionAdapter::Exch
 }
 
 void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, 
-                                               const string& exchangeName, const string& routingKey, 
-                                               const FieldTable& arguments)
+                                           const string& exchangeName, const string& routingKey, 
+                                           const FieldTable& arguments)
 {
-    getBroker().bind(queueName, exchangeName, routingKey, arguments,
-                     getConnection().getUserId(), getConnection().getUrl());
+    AclModule* acl = getBroker().getAcl();
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
+
+        if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
+            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId()));
+    }
+
+    Queue::shared_ptr queue = getQueue(queueName);
+    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
+    if(exchange){
+        string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+        if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
+            queue->bound(exchangeName, routingKey, arguments);
+            if (exchange->isDurable() && queue->isDurable()) {
+                getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
+            }
+
+            ManagementAgent* agent = getBroker().getManagementAgent();
+            if (agent)
+                agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
+                                                  queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
+        }
+    }else{
+        throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
+    }
 }
  
 void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
                                                  const string& exchangeName,
                                                  const string& routingKey)
 {
-    getBroker().unbind(queueName, exchangeName, routingKey,
-                       getConnection().getUserId(), getConnection().getUrl());
+    AclModule* acl = getBroker().getAcl();
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
+        params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey));
+        if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
+            throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId()));
+    }
+
+    Queue::shared_ptr queue = getQueue(queueName);
+    if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
+
+    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
+    if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
+
+    //TODO: revise unbind to rely solely on binding key (not args)
+    if (exchange->unbind(queue, routingKey, 0)) {
+        if (exchange->isDurable() && queue->isDurable())
+            getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
+
+        ManagementAgent* agent = getBroker().getManagementAgent();
+        if (agent)
+            agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
+    }
 }
 
 ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -274,42 +332,52 @@ QueueQueryResult SessionAdapter::QueueHa
 void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
                                                bool passive, bool durable, bool exclusive, 
                                                bool autoDelete, const qpid::framing::FieldTable& arguments)
-{
+{ 
+    AclModule* acl = getBroker().getAcl();
+    if (acl) {
+        std::map<acl::Property, std::string> params;
+        params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
+        params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) ));
+        params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
+        params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
+        params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
+        params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
+        params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
+        params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
+
+        if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
+            throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
+    }
+
+    Exchange::shared_ptr alternate;
+    if (!alternateExchange.empty()) {
+        alternate = getBroker().getExchanges().get(alternateExchange);
+    }
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
-        AclModule* acl = getBroker().getAcl();
-        if (acl) {
-            //TODO: why does a passive declare require create
-            //permission? The purpose of the passive flag is to state
-            //that the queue should *not* created. For
-            //authorisation a passive declare is similar to
-            //queue-query (or indeed a qmf query).
-            std::map<acl::Property, std::string> params;
-            params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
-            params.insert(make_pair(acl::PROP_PASSIVE, _TRUE));
-            params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE)));
-            params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE)));
-            params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE)));
-            params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type")));
-            params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count"))));
-            params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size"))));
-            if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
-                throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId()));
-        }
-        queue = getQueue(name);
+    queue = getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
-        std::pair<Queue::shared_ptr, bool> queue_created =
-            getBroker().createQueue(name, durable,
-                                    autoDelete,
-                                    exclusive ? &session : 0,
-                                    alternateExchange,
-                                    arguments,
-                                    getConnection().getUserId(),
-                                    getConnection().getUrl());
+        std::pair<Queue::shared_ptr, bool> queue_created =  
+            getBroker().getQueues().declare(name, durable,
+                                            autoDelete,
+                                            exclusive ? &session : 0);
         queue = queue_created.first;
         assert(queue);
         if (queue_created.second) { // This is a new queue
+            if (alternate) {
+                queue->setAlternateExchange(alternate);
+                alternate->incAlternateUsers();
+            }
+
+            //apply settings & create persistent record if required
+            try { queue_created.first->create(arguments); }
+            catch (...) { getBroker().getQueues().destroy(name); throw; }
+
+            //add default binding:
+            getBroker().getExchanges().getDefault()->bind(queue, name, 0);
+            queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
+
             //handle automatic cleanup:
             if (exclusive) {
                 exclusiveQueues.push_back(queue);
@@ -318,20 +386,21 @@ void SessionAdapter::QueueHandlerImpl::d
             if (exclusive && queue->setExclusiveOwner(&session)) {
                 exclusiveQueues.push_back(queue);
             }
-            ManagementAgent* agent = getBroker().getManagementAgent();
-            if (agent)
-                agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
-                                                      name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
-                                                      "existing"));
         }
 
+        ManagementAgent* agent = getBroker().getManagementAgent();
+        if (agent)
+            agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
+                                                      name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
+                                                      queue_created.second ? "created" : "existing"));
     }
 
-    if (exclusive && !queue->isExclusiveOwner(&session))
+    if (exclusive && !queue->isExclusiveOwner(&session)) 
         throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
                                                << queue->getName()));
-}
-
+} 
+        
+        
 void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
     AclModule* acl = getBroker().getAcl();
     if (acl)
@@ -340,32 +409,40 @@ void SessionAdapter::QueueHandlerImpl::p
              throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId()));
     }
     getQueue(queue)->purge();
-}
+} 
+        
+void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
 
-void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty)
-{
-    if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) {
+    AclModule* acl = getBroker().getAcl();
+    if (acl)
+    {
+         if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) )
+             throw UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId()));
+    }
+
+    Queue::shared_ptr q = getQueue(queue);
+    if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) 
         throw ResourceLockedException(QPID_MSG("Cannot delete queue "
-                                               << queue->getName() << "; it is exclusive to another session"));
-    } else if(ifEmpty && queue->getMessageCount() > 0) {
-        throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
-                                                   << queue->getName() << "; queue not empty"));
-    } else if(ifUnused && queue->getConsumerCount() > 0) {
-        throw PreconditionFailedException(QPID_MSG("Cannot delete queue "
-                                                   << queue->getName() << "; queue in use"));
-    } else if (queue->isExclusiveOwner(&session)) {
+                                               << queue << "; it is exclusive to another session"));
+    if(ifEmpty && q->getMessageCount() > 0){
+        throw PreconditionFailedException("Queue not empty.");
+    }else if(ifUnused && q->getConsumerCount() > 0){
+        throw PreconditionFailedException("Queue in use.");
+    }else{
         //remove the queue from the list of exclusive queues if necessary
-        QueueVector::iterator i = std::find(exclusiveQueues.begin(),
-                                            exclusiveQueues.end(),
-                                            queue);
-        if (i < exclusiveQueues.end()) exclusiveQueues.erase(i);
-    }    
-}
-        
-void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
-{
-    getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
-                            boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
+        if(q->isExclusiveOwner(&getConnection())){
+            QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
+            if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
+        }
+        q->destroy();
+        getBroker().getQueues().destroy(queue);
+        q->unbind(getBroker().getExchanges(), q);
+
+        ManagementAgent* agent = getBroker().getManagementAgent();
+        if (agent)
+            agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
+        q->notifyDeleted();
+    }
 } 
 
 SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 
@@ -431,9 +508,7 @@ SessionAdapter::MessageHandlerImpl::subs
 void
 SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
 {
-    if (!state.cancel(destination)) {
-        throw NotFoundException(QPID_MSG("No such subscription: " << destination));
-    }
+    state.cancel(destination);
 
     ManagementAgent* agent = getBroker().getManagementAgent();
     if (agent)
@@ -512,12 +587,7 @@ framing::MessageResumeResult SessionAdap
     
 
 
-void SessionAdapter::ExecutionHandlerImpl::sync()
-{
-    session.addPendingExecutionSync();
-    /** @todo KAG - need a generic mechanism to allow a command to returning "not completed" status back to SessionState */
-
-}
+void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op
 
 void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/)
 {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionAdapter.h Fri Oct 21 01:19:00 2011
@@ -138,7 +138,6 @@ class Queue;
         bool isLocal(const ConnectionToken* t) const; 
 
         void destroyExclusiveQueues();
-        void checkDelete(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
         template <class F> void eachExclusiveQueue(F f) 
         { 
             std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionContext.h Fri Oct 21 01:19:00 2011
@@ -46,7 +46,6 @@ class SessionContext : public OwnershipT
     virtual Broker& getBroker() = 0;
     virtual uint16_t getChannel() const = 0;
     virtual const SessionId& getSessionId() const = 0;
-    virtual void addPendingExecutionSync() = 0;
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionHandler.cpp Fri Oct 21 01:19:00 2011
@@ -40,6 +40,11 @@ SessionHandler::SessionHandler(Connectio
 
 SessionHandler::~SessionHandler() {}
 
+namespace {
+ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
+MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
+} // namespace
+
 void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
     // NOTE: must tell the error listener _before_ calling connection.close()
     if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.cpp Fri Oct 21 01:19:00 2011
@@ -25,7 +25,6 @@
 #include "qpid/broker/SessionManager.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/broker/RateFlowcontrol.h"
-#include "qpid/sys/ClusterSafe.h"
 #include "qpid/sys/Timer.h"
 #include "qpid/framing/AMQContentBody.h"
 #include "qpid/framing/AMQHeaderBody.h"
@@ -61,9 +60,9 @@ SessionState::SessionState(
       semanticState(*this, *this),
       adapter(semanticState),
       msgBuilder(&broker.getStore()),
+      enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
       mgmtObject(0),
-      rateFlowcontrol(0),
-      asyncCommandCompleter(new AsyncCommandCompleter(this))
+      rateFlowcontrol(0)
 {
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
@@ -96,7 +95,6 @@ void SessionState::addManagementObject()
 }
 
 SessionState::~SessionState() {
-    asyncCommandCompleter->cancel();
     semanticState.closed();
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
@@ -127,7 +125,6 @@ bool SessionState::isLocal(const Connect
 
 void SessionState::detach() {
     QPID_LOG(debug, getId() << ": detached on broker.");
-    asyncCommandCompleter->detached();
     disableOutput();
     handler = 0;
     if (mgmtObject != 0)
@@ -148,7 +145,6 @@ void SessionState::attach(SessionHandler
         mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
         mgmtObject->set_channelId (h.getChannel());
     }
-    asyncCommandCompleter->attached();
 }
 
 void SessionState::abort() {
@@ -206,17 +202,15 @@ Manageable::status_t SessionState::Manag
 }
 
 void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
-    currentCommandComplete = true;      // assumed, can be overridden by invoker method (this sucks).
     Invoker::Result invocation = invoke(adapter, *method);
-    if (currentCommandComplete) receiverCompleted(id);
-
+    receiverCompleted(id);
     if (!invocation.wasHandled()) {
         throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
     } else if (invocation.hasResult()) {
         getProxy().getExecution().result(id, invocation.getResult());
     }
-
-    if (method->isSync() && currentCommandComplete) {
+    if (method->isSync()) {
+        incomplete.process(enqueuedOp, true);
         sendAcceptAndCompletion();
     }
 }
@@ -259,14 +253,23 @@ void SessionState::handleContent(AMQFram
             header.setEof(false);
             msg->getFrames().append(header);
         }
-        if (broker.isTimestamping())
-            msg->setTimestamp();
         msg->setPublisher(&getConnection());
-        msg->getIngressCompletion().begin();
         semanticState.handle(msg);
         msgBuilder.end();
-        IncompleteIngressMsgXfer xfer(this, msg);
-        msg->getIngressCompletion().end(xfer);  // allows msg to complete xfer
+
+        if (msg->isEnqueueComplete()) {
+            enqueued(msg);
+        } else {
+            incomplete.add(msg);
+        }
+
+        //hold up execution until async enqueue is complete
+        if (msg->getFrames().getMethod()->isSync()) {
+            incomplete.process(enqueuedOp, true);
+            sendAcceptAndCompletion();
+        } else {
+            incomplete.process(enqueuedOp, false);
+        }
     }
 
     // Handle producer session flow control
@@ -316,41 +319,11 @@ void SessionState::sendAcceptAndCompleti
     sendCompletion();
 }
 
-/** Invoked when the given inbound message is finished being processed
- * by all interested parties (eg. it is done being enqueued to all queues,
- * its credit has been accounted for, etc).  At this point, msg is considered
- * by this receiver as 'completed' (as defined by AMQP 0_10)
- */
-void SessionState::completeRcvMsg(SequenceNumber id,
-                                  bool requiresAccept,
-                                  bool requiresSync)
+void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
 {
-    // Mark this as a cluster-unsafe scope since it can be called in
-    // journal threads or connection threads as part of asynchronous
-    // command completion.
-    sys::ClusterUnsafeScope cus;
-
-    bool callSendCompletion = false;
-    receiverCompleted(id);
-    if (requiresAccept)
-        // will cause msg's seq to appear in the next message.accept we send.
-        accepted.add(id);
-
-    // Are there any outstanding Execution.Sync commands pending the
-    // completion of this msg?  If so, complete them.
-    while (!pendingExecutionSyncs.empty() &&
-           receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
-        const SequenceNumber id = pendingExecutionSyncs.front();
-        pendingExecutionSyncs.pop();
-        QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
-        receiverCompleted(id);
-        callSendCompletion = true;   // likely peer is pending for this completion.
-    }
-
-    // if the sender has requested immediate notification of the completion...
-    if (requiresSync || callSendCompletion) {
-        sendAcceptAndCompletion();
-    }
+    receiverCompleted(msg->getCommandId());
+    if (msg->requiresAccept())
+        accepted.add(msg->getCommandId());
 }
 
 void SessionState::handleIn(AMQFrame& frame) {
@@ -423,176 +396,4 @@ framing::AMQP_ClientProxy& SessionState:
     return handler->getClusterOrderProxy();
 }
 
-
-// Current received command is an execution.sync command.
-// Complete this command only when all preceding commands have completed.
-// (called via the invoker() in handleCommand() above)
-void SessionState::addPendingExecutionSync()
-{
-    SequenceNumber syncCommandId = receiverGetCurrent();
-    if (receiverGetIncomplete().front() < syncCommandId) {
-        currentCommandComplete = false;
-        pendingExecutionSyncs.push(syncCommandId);
-        asyncCommandCompleter->flushPendingMessages();
-        QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
-    }
-}
-
-
-/** factory for creating a reference-counted IncompleteIngressMsgXfer object
- * which will be attached to a message that will be completed asynchronously.
- */
-boost::intrusive_ptr<AsyncCompletion::Callback>
-SessionState::IncompleteIngressMsgXfer::clone()
-{
-    // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
-    // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
-    if (requiresSync)
-        msg->flush();
-    else {
-        // otherwise, we need to track this message in order to flush it if an execution.sync arrives
-        // before it has been completed (see flushPendingMessages())
-        pending = true;
-        completerContext->addPendingMessage(msg);
-    }
-
-    return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this));
-}
-
-
-/** Invoked by the asynchronous completer associated with a received
- * msg that is pending Completion.  May be invoked by the IO thread
- * (sync == true), or some external thread (!sync).
- */
-void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
-{
-    if (pending) completerContext->deletePendingMessage(id);
-    if (!sync) {
-        /** note well: this path may execute in any thread.  It is safe to access
-         * the scheduledCompleterContext, since *this has a shared pointer to it.
-         * but not session!
-         */
-        session = 0;
-        QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
-        completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
-    } else {
-        // this path runs directly from the ac->end() call in handleContent() above,
-        // so *session is definately valid.
-        if (session->isAttached()) {
-            QPID_LOG(debug, ": receive completed for msg seq=" << id);
-            session->completeRcvMsg(id, requiresAccept, requiresSync);
-        }
-    }
-    completerContext = boost::intrusive_ptr<AsyncCommandCompleter>();
-}
-
-
-/** Scheduled from an asynchronous command's completed callback to run on
- * the IO thread.
- */
-void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
-{
-    ctxt->completeCommands();
-}
-
-
-/** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-    std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
-    bool unique = pendingMsgs.insert(item).second;
-    if (!unique) {
-      assert(false);
-    }
-}
-
-
-/** pending message has completed */
-void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id)
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-    pendingMsgs.erase(id);
-}
-
-
-/** done when an execution.sync arrives */
-void SessionState::AsyncCommandCompleter::flushPendingMessages()
-{
-    std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
-    {
-        qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-        pendingMsgs.swap(copy);    // we've only tracked these in case a flush is needed, so nuke 'em now.
-    }
-    // drop lock, so it is safe to call "flush()"
-    for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
-         i != copy.end(); ++i) {
-        i->second->flush();
-    }
-}
-
-
-/** mark an ingress Message.Transfer command as completed.
- * This method must be thread safe - it may run on any thread.
- */
-void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
-                                                                bool requiresAccept,
-                                                                bool requiresSync)
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-
-    if (session && isAttached) {
-        MessageInfo msg(cmd, requiresAccept, requiresSync);
-        completedMsgs.push_back(msg);
-        if (completedMsgs.size() == 1) {
-            session->getConnection().requestIOProcessing(boost::bind(&schedule,
-                                                                     session->asyncCommandCompleter));
-        }
-    }
-}
-
-
-/** Cause the session to complete all completed commands.
- * Executes on the IO thread.
- */
-void SessionState::AsyncCommandCompleter::completeCommands()
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-
-    // when session is destroyed, it clears the session pointer via cancel().
-    if (session && session->isAttached()) {
-        for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
-             msg != completedMsgs.end(); ++msg) {
-            session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
-        }
-    }
-    completedMsgs.clear();
-}
-
-
-/** cancel any pending calls to scheduleComplete */
-void SessionState::AsyncCommandCompleter::cancel()
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-    session = 0;
-}
-
-
-/** inform the completer that the session has attached,
- * allows command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::attached()
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-    isAttached = true;
-}
-
-
-/** inform the completer that the session has detached,
- * disables command completion scheduling from any thread */
-void SessionState::AsyncCommandCompleter::detached()
-{
-    qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-    isAttached = false;
-}
-
 }} // namespace qpid::broker

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/SessionState.h Fri Oct 21 01:19:00 2011
@@ -30,15 +30,13 @@
 #include "qmf/org/apache/qpid/broker/Session.h"
 #include "qpid/broker/SessionAdapter.h"
 #include "qpid/broker/DeliveryAdapter.h"
-#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/IncompleteMessageList.h"
 #include "qpid/broker/MessageBuilder.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SemanticState.h"
-#include "qpid/sys/Monitor.h"
 
 #include <boost/noncopyable.hpp>
 #include <boost/scoped_ptr.hpp>
-#include <boost/intrusive_ptr.hpp>
 
 #include <set>
 #include <vector>
@@ -125,10 +123,6 @@ class SessionState : public qpid::Sessio
 
     const SessionId& getSessionId() const { return getId(); }
 
-    // Used by ExecutionHandler sync command processing.  Notifies
-    // the SessionState of a received Execution.Sync command.
-    void addPendingExecutionSync();
-
     // Used to delay creation of management object for sessions
     // belonging to inter-broker bridges
     void addManagementObject();
@@ -136,10 +130,7 @@ class SessionState : public qpid::Sessio
   private:
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
     void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
-
-    // indicate that the given ingress msg has been completely received by the
-    // broker, and the msg's message.transfer command can be considered completed.
-    void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync);
+    void enqueued(boost::intrusive_ptr<Message> msg);
 
     void handleIn(framing::AMQFrame& frame);
     void handleOut(framing::AMQFrame& frame);
@@ -165,6 +156,8 @@ class SessionState : public qpid::Sessio
     SemanticState semanticState;
     SessionAdapter adapter;
     MessageBuilder msgBuilder;
+    IncompleteMessageList incomplete;
+    IncompleteMessageList::CompletionListener enqueuedOp;
     qmf::org::apache::qpid::broker::Session* mgmtObject;
     qpid::framing::SequenceSet accepted;
 
@@ -173,110 +166,6 @@ class SessionState : public qpid::Sessio
     boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
     boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
 
-    // sequence numbers for pending received Execution.Sync commands
-    std::queue<SequenceNumber> pendingExecutionSyncs;
-    bool currentCommandComplete;
-
-    /** This class provides a context for completing asynchronous commands in a thread
-     * safe manner.  Asynchronous commands save their completion state in this class.
-     * This class then schedules the completeCommands() method in the IO thread.
-     * While running in the IO thread, completeCommands() may safely complete all
-     * saved commands without the risk of colliding with other operations on this
-     * SessionState.
-     */
-    class AsyncCommandCompleter : public RefCounted {
-    private:
-        SessionState *session;
-        bool isAttached;
-        qpid::sys::Mutex completerLock;
-
-        // special-case message.transfer commands for optimization
-        struct MessageInfo {
-            SequenceNumber cmd; // message.transfer command id
-            bool requiresAccept;
-            bool requiresSync;
-        MessageInfo(SequenceNumber c, bool a, bool s)
-        : cmd(c), requiresAccept(a), requiresSync(s) {}
-        };
-        std::vector<MessageInfo> completedMsgs;
-        // If an ingress message does not require a Sync, we need to
-        // hold a reference to it in case an Execution.Sync command is received and we
-        // have to manually flush the message.
-        std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
-
-        /** complete all pending commands, runs in IO thread */
-        void completeCommands();
-
-        /** for scheduling a run of "completeCommands()" on the IO thread */
-        static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
-
-    public:
-        AsyncCommandCompleter(SessionState *s) : session(s), isAttached(s->isAttached()) {};
-        ~AsyncCommandCompleter() {};
-
-        /** track a message pending ingress completion */
-        void addPendingMessage(boost::intrusive_ptr<Message> m);
-        void deletePendingMessage(SequenceNumber id);
-        void flushPendingMessages();
-        /** schedule the processing of a completed ingress message.transfer command */
-        void scheduleMsgCompletion(SequenceNumber cmd,
-                                   bool requiresAccept,
-                                   bool requiresSync);
-        void cancel();  // called by SessionState destructor.
-        void attached();  // called by SessionState on attach()
-        void detached();  // called by SessionState on detach()
-    };
-    boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
-
-    /** Abstract class that represents a single asynchronous command that is
-     * pending completion.
-     */
-    class AsyncCommandContext : public AsyncCompletion::Callback
-    {
-     public:
-        AsyncCommandContext( SessionState *ss, SequenceNumber _id )
-          : id(_id), completerContext(ss->asyncCommandCompleter) {}
-        virtual ~AsyncCommandContext() {}
-
-     protected:
-        SequenceNumber id;
-        boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
-    };
-
-    /** incomplete Message.transfer commands - inbound to broker from client
-     */
-    class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
-    {
-     public:
-        IncompleteIngressMsgXfer( SessionState *ss,
-                                  boost::intrusive_ptr<Message> m )
-          : AsyncCommandContext(ss, m->getCommandId()),
-          session(ss),
-          msg(m),
-          requiresAccept(m->requiresAccept()),
-          requiresSync(m->getFrames().getMethod()->isSync()),
-          pending(false) {}
-        IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
-          : AsyncCommandContext(x.session, x.msg->getCommandId()),
-          session(x.session),
-          msg(x.msg),
-          requiresAccept(x.requiresAccept),
-          requiresSync(x.requiresSync),
-          pending(x.pending) {}
-
-  virtual ~IncompleteIngressMsgXfer() {};
-
-        virtual void completed(bool);
-        virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
-
-     private:
-        SessionState *session;  // only valid if sync flag in callback is true
-        boost::intrusive_ptr<Message> msg;
-        bool requiresAccept;
-        bool requiresSync;
-        bool pending;   // true if msg saved on pending list...
-    };
-
     friend class SessionManager;
 };
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.cpp Fri Oct 21 01:19:00 2011
@@ -28,52 +28,6 @@
 
 namespace qpid {
 namespace broker {
-namespace {
-const qmf::org::apache::qpid::broker::EventQueueThresholdExceeded EVENT("dummy", 0, 0);
-bool isQMFv2(const boost::intrusive_ptr<Message> message)
-{
-    const qpid::framing::MessageProperties* props = message->getProperties<qpid::framing::MessageProperties>();
-    return props && props->getAppId() == "qmf2";
-}
-
-bool isThresholdEvent(const boost::intrusive_ptr<Message> message)
-{
-    if (message->getIsManagementMessage()) {
-        //is this a qmf event? if so is it a threshold event?
-        if (isQMFv2(message)) {
-            const qpid::framing::FieldTable* headers = message->getApplicationHeaders();
-            if (headers && headers->getAsString("qmf.content") == "_event") {
-                //decode as list
-                std::string content = message->getFrames().getContent();
-                qpid::types::Variant::List list;
-                qpid::amqp_0_10::ListCodec::decode(content, list);
-                if (list.empty() || list.front().getType() != qpid::types::VAR_MAP) return false;
-                qpid::types::Variant::Map map = list.front().asMap();
-                try {
-                    std::string eventName = map["_schema_id"].asMap()["_class_name"].asString();
-                    return eventName == EVENT.getEventName();
-                } catch (const std::exception& e) {
-                    QPID_LOG(error, "Error checking for recursive threshold alert: " << e.what());
-                }
-            }
-        } else {
-            std::string content = message->getFrames().getContent();
-            qpid::framing::Buffer buffer(const_cast<char*>(content.data()), content.size());
-            if (buffer.getOctet() == 'A' && buffer.getOctet() == 'M' && buffer.getOctet() == '2' && buffer.getOctet() == 'e') {
-                buffer.getLong();//sequence
-                std::string packageName;
-                buffer.getShortString(packageName);
-                if (packageName != EVENT.getPackageName()) return false;
-                std::string eventName;
-                buffer.getShortString(eventName);
-                return eventName == EVENT.getEventName();
-            }
-        }
-    }
-    return false;
-}
-}
-
 ThresholdAlerts::ThresholdAlerts(const std::string& n,
                                  qpid::management::ManagementAgent& a,
                                  const uint32_t ct,
@@ -90,14 +44,8 @@ void ThresholdAlerts::enqueued(const Que
     if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
         if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
             || qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) {
-            //Note: Raising an event may result in messages being
-            //enqueued on queues; it may even be that this event
-            //causes a message to be enqueued on the queue we are
-            //tracking, and so we need to avoid recursing
-            if (isThresholdEvent(m.payload)) return;
-            lastAlert = qpid::sys::now();
             agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
-            QPID_LOG(info, "Threshold event triggered for " << name << ", count=" << count << ", size=" << size);
+            lastAlert = qpid::sys::now();
         }
     }
 }
@@ -127,12 +75,12 @@ void ThresholdAlerts::observe(Queue& que
 }
 
 void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::framing::FieldTable& settings, uint16_t limitRatio)
+                              const qpid::framing::FieldTable& settings)
 
 {
     qpid::types::Variant::Map map;
     qpid::amqp_0_10::translate(settings, map);
-    observe(queue, agent, map, limitRatio);
+    observe(queue, agent, map);
 }
 
 template <class T>
@@ -170,19 +118,19 @@ class Option
 };
 
 void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::types::Variant::Map& settings, uint16_t limitRatio)
+                              const qpid::types::Variant::Map& settings)
 
 {
     //Note: aliases are keys defined by java broker
     Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
     repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
 
-    //If no explicit threshold settings were given use specified
-    //percentage of any limit from the policy.
+    //If no explicit threshold settings were given use 80% of any
+    //limit from the policy.
     const QueuePolicy* policy = queue.getPolicy();
-    Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy && limitRatio ? (policy->getMaxCount()*limitRatio/100) : 0));
+    Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy ? policy->getMaxCount()*0.8 : 0));
     countThreshold.addAlias("x-qpid-maximum-message-count");
-    Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy && limitRatio ? (policy->getMaxSize()*limitRatio/100) : 0));
+    Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy ? policy->getMaxSize()*0.8 : 0));
     sizeThreshold.addAlias("x-qpid-maximum-message-size");
 
     observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/ThresholdAlerts.h Fri Oct 21 01:19:00 2011
@@ -50,17 +50,14 @@ class ThresholdAlerts : public QueueObse
                     const long repeatInterval);
     void enqueued(const QueuedMessage&);
     void dequeued(const QueuedMessage&);
-    void acquired(const QueuedMessage&) {};
-    void requeued(const QueuedMessage&) {};
-
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
                         const uint64_t countThreshold,
                         const uint64_t sizeThreshold,
                         const long repeatInterval);
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::framing::FieldTable& settings, uint16_t limitRatio);
+                        const qpid::framing::FieldTable& settings);
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::types::Variant::Map& settings, uint16_t limitRatio);
+                        const qpid::types::Variant::Map& settings);
   private:
     const std::string name;
     qpid::management::ManagementAgent& agent;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.cpp Fri Oct 21 01:19:00 2011
@@ -221,7 +221,6 @@ TopicExchange::TopicExchange(const std::
 
 bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
 {
-    ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
     string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
     string fedTags(args ? args->getAsString(qpidFedTags) : "");
     string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
@@ -250,21 +249,21 @@ bool TopicExchange::bind(Queue::shared_p
             if (mgmtExchange != 0) {
                 mgmtExchange->inc_bindingCount();
             }
-            QPID_LOG(debug, "Binding key [" << routingPattern << "] to queue " << queue->getName()
-                     << " on exchange " << getName() << " (origin=" << fedOrigin << ")");
+            QPID_LOG(debug, "Bound key [" << routingPattern << "] to queue " << queue->getName()
+                     << " (origin=" << fedOrigin << ")");
         }
     } else if (fedOp == fedOpUnbind) {
-        RWlock::ScopedWlock l(lock);
-        BindingKey* bk = getQueueBinding(queue, routingPattern);
-        if (bk) {
-            QPID_LOG(debug, "FedOpUnbind [" << routingPattern << "] from exchange " << getName()
-                     << " on queue=" << queue->getName() << " origin=" << fedOrigin);
-            propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
-            // if this was the last binding for the queue, delete the binding
-            if (bk->fedBinding.countFedBindings(queue->getName()) == 0) {
-                deleteBinding(queue, routingPattern, bk);
+        bool reallyUnbind = false;
+        {
+            RWlock::ScopedWlock l(lock);
+            BindingKey* bk = bindingTree.getBindingKey(routingPattern);
+            if (bk) {
+                propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
+                reallyUnbind = bk->fedBinding.countFedBindings(queue->getName()) == 0;
             }
         }
+        if (reallyUnbind)
+            unbind(queue, routingPattern, 0);
     } else if (fedOp == fedOpReorigin) {
         /** gather up all the keys that need rebinding in a local vector
          * while holding the lock.  Then propagate once the lock is
@@ -282,38 +281,20 @@ bool TopicExchange::bind(Queue::shared_p
         }
     }
 
-    cc.clearCache(); // clear the cache before we IVE route.
     routeIVE();
     if (propagate)
         propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
     return true;
 }
 
-bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args)
-{
-    string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
-    QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName()
-             << " on exchange " << getName() << " origin=" << fedOrigin << ")" );
-
-    ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
+bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){
     RWlock::ScopedWlock l(lock);
     string routingKey = normalize(constRoutingKey);
-    BindingKey* bk = getQueueBinding(queue, routingKey);
+    BindingKey* bk = bindingTree.getBindingKey(routingKey);
     if (!bk) return false;
-    bool propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin);
-    deleteBinding(queue, routingKey, bk);
-    if (propagate)
-        propagateFedOp(routingKey, string(), fedOpUnbind, string());
-    return true;
-}
-
-
-bool TopicExchange::deleteBinding(Queue::shared_ptr queue,
-                                  const std::string& routingKey,
-                                  BindingKey *bk)
-{
-    // Note well: write lock held by caller
     Binding::vector& qv(bk->bindingVector);
+    bool propagate = false;
+
     Binding::vector::iterator q;
     for (q = qv.begin(); q != qv.end(); q++)
         if ((*q)->queue == queue)
@@ -322,55 +303,42 @@ bool TopicExchange::deleteBinding(Queue:
     qv.erase(q);
     assert(nBindings > 0);
     nBindings--;
-
+    propagate = bk->fedBinding.delOrigin();
     if(qv.empty()) {
         bindingTree.removeBindingKey(routingKey);
     }
     if (mgmtExchange != 0) {
         mgmtExchange->dec_bindingCount();
     }
-    QPID_LOG(debug, "Unbound key [" << routingKey << "] from queue " << queue->getName()
-             << " on exchange " << getName());
+    QPID_LOG(debug, "Unbound [" << routingKey << "] from queue " << queue->getName());
+
+    if (propagate)
+        propagateFedOp(routingKey, string(), fedOpUnbind, string());
     return true;
 }
 
-/** returns a pointer to the BindingKey if the given queue is bound to this
- * exchange using the routing pattern. 0 if queue binding does not exist.
- */
-TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern)
+bool TopicExchange::isBound(Queue::shared_ptr queue, const string& pattern)
 {
     // Note well: lock held by caller....
     BindingKey *bk = bindingTree.getBindingKey(pattern);  // Exact match against binding pattern
-    if (!bk) return 0;
+    if (!bk) return false;
     Binding::vector& qv(bk->bindingVector);
     Binding::vector::iterator q;
     for (q = qv.begin(); q != qv.end(); q++)
         if ((*q)->queue == queue)
             break;
-    return (q != qv.end()) ? bk : 0;
+    return q != qv.end();
 }
 
 void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
 {
     // Note: PERFORMANCE CRITICAL!!!
-    BindingList b;
-    std::map<std::string, BindingList>::iterator it;
-    {  // only lock the cache for read
-       RWlock::ScopedRlock cl(cacheLock);
-       it = bindingCache.find(routingKey);
-       if (it != bindingCache.end()) {
-           b = it->second;
-       }
-    }
+    BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
     PreRoute pr(msg, this);
-    if (!b.get())  // no cache hit
+    BindingsFinderIter bindingsFinder(b);
     {
         RWlock::ScopedRlock l(lock);
-    	b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
-        BindingsFinderIter bindingsFinder(b);
         bindingTree.iterateMatch(routingKey, bindingsFinder);
-        RWlock::ScopedWlock cwl(cacheLock);
-        bindingCache[routingKey] = b; // update cache
     }
     doRoute(msg, b);
 }
@@ -380,7 +348,7 @@ bool TopicExchange::isBound(Queue::share
     RWlock::ScopedRlock l(lock);
     if (routingKey && queue) {
         string key(normalize(*routingKey));
-        return getQueueBinding(queue, key) != 0;
+        return isBound(queue, key);
     } else if (!routingKey && !queue) {
         return nBindings > 0;
     } else if (routingKey) {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TopicExchange.h Fri Oct 21 01:19:00 2011
@@ -56,7 +56,7 @@ class TopicExchange : public virtual Exc
     //    |   +-->d-->...
     //    +-->x-->y-->...
     //
-    class QPID_BROKER_CLASS_EXTERN BindingNode {
+    class BindingNode {
     public:
 
         typedef boost::shared_ptr<BindingNode> shared_ptr;
@@ -135,31 +135,8 @@ class TopicExchange : public virtual Exc
     BindingNode bindingTree;
     unsigned long nBindings;
     qpid::sys::RWlock lock;     // protects bindingTree and nBindings
-    qpid::sys::RWlock cacheLock;     // protects cache
-    std::map<std::string, BindingList> bindingCache; // cache of matched routes.
-    class ClearCache {
-    private:
-        qpid::sys::RWlock* cacheLock;
-        std::map<std::string, BindingList>* bindingCache;
-	bool cleared; 
-    public:
-        ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),
-             bindingCache(bc),cleared(false) {};
-        void clearCache() {
-             qpid::sys::RWlock::ScopedWlock l(*cacheLock);
-             if (!cleared) {
-                 bindingCache->clear();
-                 cleared =true;
-             }
-        };
-        ~ClearCache(){ 
-	     clearCache();
-        };
-    };
-    BindingKey *getQueueBinding(Queue::shared_ptr queue, const std::string& pattern);
-    bool deleteBinding(Queue::shared_ptr queue,
-                       const std::string& routingKey,
-                       BindingKey *bk);
+
+    bool isBound(Queue::shared_ptr queue, const std::string& pattern);
 
     class ReOriginIter;
     class BindingsFinderIter;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TxBuffer.cpp Fri Oct 21 01:19:00 2011
@@ -76,5 +76,5 @@ bool TxBuffer::commitLocal(Transactional
 }
 
 void TxBuffer::accept(TxOpConstVisitor& v) const {
-    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
+    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); 
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/TxPublish.cpp Fri Oct 21 01:19:00 2011
@@ -90,7 +90,14 @@ void TxPublish::deliverTo(const boost::s
 
 void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
 {
-    queue->enqueue(ctxt, msg);
+    if (!queue->enqueue(ctxt, msg)){
+        /**
+         * if not store then mark message for ack and deleivery once
+         * commit happens, as async IO will never set it when no store
+         * exists
+         */
+	msg->enqueueComplete();
+    }
 }
 
 TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org