You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/03 19:35:36 UTC

svn commit: r572394 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/tests/ python/ python/qpid/ python/tests_0-10/

Author: gsim
Date: Mon Sep  3 10:35:35 2007
New Revision: 572394

URL: http://svn.apache.org/viewvc?rev=572394&view=rev
Log:
Initial implementation (plus very simple tests) of message.acquire, message.release, message.reject and message.flush.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h
    incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/qpid/client.py
    incubator/qpid/trunk/qpid/python/qpid/peer.py
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h Mon Sep  3 10:35:35 2007
@@ -57,7 +57,7 @@
              */
             std::list<Range> ranges;
 
-            AccumulatedAck(DeliveryId r) : mark(r) {}
+            explicit AccumulatedAck(DeliveryId r) : mark(r) {}
             void update(DeliveryId firstTag, DeliveryId lastTag);
             void consolidate();
             void clear();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Mon Sep  3 10:35:35 2007
@@ -306,7 +306,7 @@
     //also version specific behaviour now)
     if (newTag.empty()) newTag = tagGenerator.generate();
     DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
-    session.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
+    session.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
 
     if(!nowait) client.consumeOk(newTag);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Mon Sep  3 10:35:35 2007
@@ -112,31 +112,50 @@
    
 }
 
-
-void Queue::requestDispatch(){
-    serializer.execute(dispatchCallback);
+bool Queue::acquire(const QueuedMessage& msg) {
+    Mutex::ScopedLock locker(messageLock);
+    for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+        if (i->position == msg.position) {
+            messages.erase(i);
+            return true;
+        }
+    }
+    return false;
 }
 
+void Queue::requestDispatch(Consumer* c, bool sync){
+    if (!c || c->preAcquires()) {
+        if (sync) {
+            serializer.dispatch();
+        } else {
+            serializer.execute(dispatchCallback);
+        }
+    } else {
+        //note: this is always done on the callers thread, regardless
+        //      of sync; browsers of large queues should use flow control!
+        serviceBrowser(c);
+    }
+}
 
 bool Queue::dispatch(QueuedMessage& msg){
 
  
     RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
  
-    if(consumers.empty()){
+    if(acquirers.empty()){
         return false;
     }else if(exclusive){
         return exclusive->deliver(msg);
     }else{
         //deliver to next consumer
-        next = next % consumers.size();
-        Consumer* c = consumers[next];
+        next = next % acquirers.size();
+        Consumer* c = acquirers[next];
         int start = next;
         while(c){
             next++;
             if(c->deliver(msg)) return true;            
-            next = next % consumers.size();
-            c = next == start ? 0 : consumers[next];            
+            next = next % acquirers.size();
+            c = next == start ? 0 : acquirers[next];            
         }
         return false;
     }
@@ -153,34 +172,79 @@
 	}
         if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
             pop();
-        } else {
+        } else {            
             break;
         }	
-    }    
+     }    
+     RWlock::ScopedRlock locker(consumerLock);
+     for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) {         
+         serviceBrowser(*i);
+     }
+}
+
+void Queue::serviceBrowser(Consumer* browser)
+{
+    //This is a poorly performing implementation:
+    //
+    //  * bad concurrency where browsers exist
+    //  * inefficient for largish queues
+    //
+    //The queue needs to be based on a current data structure that
+    //does not invalidate iterators when modified. Subscribers could
+    //then use an iterator to continue from where they left off
+
+    Mutex::ScopedLock locker(messageLock);
+    if (!messages.empty() && messages.back().position > browser->position) {
+        for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+            if (i->position > browser->position) {
+                if (browser->deliver(*i)) {
+                    browser->position = i->position;
+                } else {
+                    break;
+                }
+            }
+        }
+    }
 }
 
 void Queue::consume(Consumer* c, bool requestExclusive){
     RWlock::ScopedWlock locker(consumerLock);
-    if(exclusive) 
+    if(exclusive) {
         throw ChannelException(
             403, format("Queue '%s' has an exclusive consumer."
                         " No more consumers allowed.") % getName());
+    }
     if(requestExclusive) {
-        if(!consumers.empty())
+        if(acquirers.empty() && browsers.empty()) {
+            exclusive = c;
+        } else {
             throw ChannelException(
-                403, format("Queue '%s' already has conumers."
-                            "Exclusive access denied.") %getName());
-        exclusive = c;
+                403, format("Queue '%s' already has consumers."
+                            "Exclusive access denied.") % getName());
+        }
+    }
+    if (c->preAcquires()) {
+        acquirers.push_back(c);
+    } else {
+        browsers.push_back(c);
     }
-    consumers.push_back(c);
 }
 
 void Queue::cancel(Consumer* c){
     RWlock::ScopedWlock locker(consumerLock);
+    if (c->preAcquires()) {
+        cancel(c, acquirers);
+    } else {
+        cancel(c, browsers);
+    }
+    if(exclusive == c) exclusive = 0;
+}
+
+void Queue::cancel(Consumer* c, Consumers& consumers)
+{
     Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
     if (i != consumers.end()) 
         consumers.erase(i);
-    if(exclusive == c) exclusive = 0;
 }
 
 QueuedMessage Queue::dequeue(){
@@ -233,12 +297,12 @@
 
 uint32_t Queue::getConsumerCount() const{
     RWlock::ScopedRlock locker(consumerLock);
-    return consumers.size();
+    return acquirers.size() + browsers.size();
 }
 
 bool Queue::canAutoDelete() const{
     RWlock::ScopedRlock locker(consumerLock);
-    return autodelete && consumers.size() == 0;
+    return autodelete && acquirers.empty() && browsers.empty();
 }
 
 // return true if store exists, 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Mon Sep  3 10:35:35 2007
@@ -68,7 +68,8 @@
             const bool autodelete;
             MessageStore* const store;
             const ConnectionToken* const owner;
-            Consumers consumers;
+            Consumers acquirers;
+            Consumers browsers;
             Messages messages;
             int next;
             mutable qpid::sys::RWlock consumerLock;
@@ -91,6 +92,8 @@
              * only called by serilizer
 	     */
             void dispatch();
+            void cancel(Consumer* c, Consumers& set);
+            void serviceBrowser(Consumer* c);
  
         protected:
 	   /**
@@ -114,6 +117,9 @@
             void destroy();
             void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args);
             void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
+
+            bool acquire(const QueuedMessage& msg);
+
             /**
              * Delivers a message to the queue. Will record it as
              * enqueued if persistent then process it.
@@ -141,7 +147,7 @@
              * at any time, so this call schedules the despatch based on
 	     * the serilizer policy.
              */
-            void requestDispatch();
+            void requestDispatch(Consumer* c = 0, bool sync = false);
             void consume(Consumer* c, bool exclusive = false);
             void cancel(Consumer* c);
             uint32_t purge();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Mon Sep  3 10:35:35 2007
@@ -36,8 +36,13 @@
         };
         
 
-        class Consumer{
+        class Consumer {
+            const bool acquires;
         public:
+            framing::SequenceNumber position;
+
+            Consumer(bool preAcquires = true) : acquires(preAcquires) {}
+            bool preAcquires() const { return acquires; }
             virtual bool deliver(QueuedMessage& msg) = 0;
             virtual ~Consumer(){}
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Sep  3 10:35:35 2007
@@ -19,7 +19,9 @@
  *
  */
 #include "DeliveryRecord.h"
+#include "DeliverableMessage.h"
 #include "Session.h"
+#include "qpid/log/Statement.h"
 
 using namespace qpid::broker;
 using std::string;
@@ -27,29 +29,32 @@
 DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, 
                                Queue::shared_ptr _queue, 
                                const string _consumerTag, 
-                               const DeliveryId _deliveryTag) : msg(_msg), 
+                               const DeliveryId _id,
+                               bool _acquired) : msg(_msg), 
                                                                 queue(_queue), 
                                                                 consumerTag(_consumerTag),
-                                                                deliveryTag(_deliveryTag),
-                                                                acquired(false),
+                                                                id(_id),
+                                                                acquired(_acquired),
                                                                 pull(false){}
 
 DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, 
                                Queue::shared_ptr _queue, 
-                               const DeliveryId _deliveryTag) : msg(_msg), 
+                               const DeliveryId _id) : msg(_msg), 
                                                                 queue(_queue), 
                                                                 consumerTag(""),
-                                                                deliveryTag(_deliveryTag),
-                                                                acquired(false),
+                                                                id(_id),
+                                                                acquired(true),
                                                                 pull(true){}
 
 
 void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
-    queue->dequeue(ctxt, msg.payload);
+    if (acquired) {
+        queue->dequeue(ctxt, msg.payload);
+    }
 }
 
 bool DeliveryRecord::matches(DeliveryId tag) const{
-    return deliveryTag == tag;
+    return id == tag;
 }
 
 bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
@@ -57,11 +62,11 @@
 }
 
 bool DeliveryRecord::after(DeliveryId tag) const{
-    return deliveryTag > tag;
+    return id > tag;
 }
 
 bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
-    return range->covers(deliveryTag);
+    return range->covers(id);
 }
 
 void DeliveryRecord::redeliver(Session* const session) const{
@@ -69,15 +74,36 @@
         //if message was originally sent as response to get, we must requeue it
         requeue();
     }else{
-        session->deliver(msg.payload, consumerTag, deliveryTag);
+        session->deliver(msg.payload, consumerTag, id);
     }
 }
 
-void DeliveryRecord::requeue() const{
+void DeliveryRecord::requeue() const
+{
     msg.payload->redeliver();
     queue->requeue(msg);
 }
 
+void DeliveryRecord::release() 
+{
+    queue->requeue(msg);
+    acquired = false;
+}
+
+void DeliveryRecord::reject() 
+{    
+    Exchange::shared_ptr alternate = queue->getAlternateExchange();
+    if (alternate) {
+        DeliverableMessage delivery(msg.payload);
+        alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders()));
+        QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " 
+                 << alternate->getName());
+    } else {
+        //just drop it
+        QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+    }
+}
+
 void DeliveryRecord::updateByteCredit(uint32_t& credit) const
 {
     credit += msg.payload->getRequiredCredit();
@@ -102,11 +128,18 @@
     }
 }
 
+void DeliveryRecord::acquire(std::vector<DeliveryId>& results) {
+    if (queue->acquire(msg)) {
+        acquired = true;
+        results.push_back(id);
+    }
+}
+
 namespace qpid {
 namespace broker {
 
 std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) {
-    out << "{" << "id=" << r.deliveryTag.getValue();
+    out << "{" << "id=" << r.id.getValue();
     out << ", consumer=" << r.consumerTag;
     out << ", queue=" << r.queue->getName() << "}";
     return out;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Mon Sep  3 10:35:35 2007
@@ -23,6 +23,7 @@
 
 #include <algorithm>
 #include <list>
+#include <vector>
 #include <ostream>
 #include "AccumulatedAck.h"
 #include "BrokerQueue.h"
@@ -42,13 +43,14 @@
     mutable QueuedMessage msg;
     mutable Queue::shared_ptr queue;
     const std::string consumerTag;
-    const DeliveryId deliveryTag;
+    const DeliveryId id;
     bool acquired;
     const bool pull;
 
   public:
-    DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
-    DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
+    DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, 
+                   const DeliveryId id, bool acquired);
+    DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
             
     void dequeue(TransactionContext* ctxt = 0) const;
     bool matches(DeliveryId tag) const;
@@ -56,6 +58,8 @@
     bool after(DeliveryId tag) const;
     bool coveredBy(const AccumulatedAck* const range) const;
     void requeue() const;
+    void release();
+    void reject();
     void redeliver(Session* const) const;
     void updateByteCredit(uint32_t& credit) const;
     void addTo(Prefetch&) const;
@@ -63,12 +67,33 @@
     const std::string& getConsumerTag() const { return consumerTag; } 
     bool isPull() const { return pull; }
     bool isAcquired() const { return acquired; }
-    void setAcquired(bool isAcquired) { acquired = isAcquired; }
+    //void setAcquired(bool isAcquired) { acquired = isAcquired; }
+    void acquire(std::vector<DeliveryId>& results);
             
   friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
 };
 
 typedef std::list<DeliveryRecord>::iterator ack_iterator; 
+
+struct AckRange
+{
+    ack_iterator start;
+    ack_iterator end;    
+    AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {}
+};
+
+struct AcquireFunctor
+{
+    std::vector<DeliveryId>& results;
+
+    AcquireFunctor(std::vector<DeliveryId>& _results) : results(_results) {}
+
+    void operator()(DeliveryRecord& record)
+    {
+        record.acquire(results);
+    }
+};
+
 }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Mon Sep  3 10:35:35 2007
@@ -25,6 +25,7 @@
 #include "MessageDelivery.h"
 #include "qpid/framing/MessageAppendBody.h"
 #include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "BrokerAdapter.h"
 
 #include <boost/format.hpp>
@@ -92,7 +93,7 @@
                             const string& destination,
                             bool noLocal,
                             u_int8_t confirmMode,
-                            u_int8_t acquireMode,//TODO: implement acquire modes
+                            u_int8_t acquireMode,
                             bool exclusive,
                             const framing::FieldTable& filter )
 {
@@ -101,8 +102,10 @@
         throw ConnectionException(530, "Consumer tags must be unique");
 
     string tag = destination;
+    //NB: am assuming pre-acquired = 0 as discussed on SIG list as is
+    //the previously expected behaviour
     session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
-                    tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
+                    tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
     // Dispatch messages as there is now a consumer.
     queue->requestDispatch();
 }
@@ -156,9 +159,15 @@
 }
 
 void
-MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ )
+MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ )
 {
-    //TODO: implement
+    if (transfers.size() % 2) { //must be even number        
+        throw InvalidArgumentException("Received odd number of elements in list of transfers");
+    }
+    
+    for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+        session.reject(i->getValue(), (++i)->getValue());
+    }
 }
 
 void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
@@ -200,14 +209,31 @@
     session.stop(destination);        
 }
 
-void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/)
+void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
 {
-    throw ConnectionException(540, "Not yet implemented");
+    SequenceNumberSet results;
+
+    if (transfers.size() % 2) { //must be even number        
+        throw InvalidArgumentException("Received odd number of elements in list of transfers");
+    }
+    
+    for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+        session.acquire(i->getValue(), (++i)->getValue(), results);
+    }
+
+    results = results.condense();
+    client.acquired(results);
 }
 
-void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/)
+void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
 {
-    throw ConnectionException(540, "Not yet implemented");
+    if (transfers.size() % 2) { //must be even number        
+        throw InvalidArgumentException("Received odd number of elements in list of transfers");
+    }
+    
+    for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+        session.release(i->getValue(), (++i)->getValue());
+    }
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Mon Sep  3 10:35:35 2007
@@ -34,6 +34,7 @@
 #include "TxPublish.h"
 #include "qpid/QpidError.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
 
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
@@ -91,12 +92,12 @@
 }
 
 void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut, 
-                      Queue::shared_ptr queue, bool nolocal, bool acks,
+                      Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire,
                       bool exclusive, const FieldTable*)
 {
     if(tagInOut.empty())
         tagInOut = tagGenerator.generate();
-    std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
+    std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
     queue->consume(c.get(), exclusive);//may throw exception
     consumers.insert(tagInOut, c.release());
 }
@@ -239,7 +240,9 @@
                                     bool ack,
                                     bool _nolocal,
                                     bool _acquire
-) : parent(_parent), 
+                                    ) : 
+    Consumer(_acquire),
+    parent(_parent), 
     token(_token), 
     name(_name), 
     queue(_queue), 
@@ -266,7 +269,7 @@
             DeliveryId deliveryTag =
                 parent->deliveryAdapter->deliver(msg.payload, token);
             if (ackExpected) {
-                parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
+                parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire));
             }
         }
         return !blocked;
@@ -312,7 +315,7 @@
 void Session::ConsumerImpl::requestDispatch()
 {
     if(blocked)
-        queue->requestDispatch();
+        queue->requestDispatch(this);
 }
 
 void Session::handle(Message::shared_ptr msg) {
@@ -532,9 +535,7 @@
 
 void Session::ConsumerImpl::flush()
 {
-    //TODO: need to wait until any messages that are available for
-    //this consumer have been delivered... i.e. some sort of flush on
-    //the queue...
+    queue->requestDispatch(this, true);
 }
 
 void Session::ConsumerImpl::stop()
@@ -557,6 +558,44 @@
             throw NotFoundException(QPID_MSG("Queue not found: "<<name));
     }
     return queue;
+}
+
+AckRange Session::findRange(DeliveryId first, DeliveryId last)
+{    
+    ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+    ack_iterator end = start;
+     
+    if (first == last) {
+        //just acked single element (move end past it)
+        ++end;
+    } else {
+        //need to find end (position it just after the last record in range)
+        end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+    }
+    return AckRange(start, end);
+}
+
+void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
+{
+    Mutex::ScopedLock locker(deliveryLock);
+    AckRange range = findRange(first, last);
+    for_each(range.start, range.end, AcquireFunctor(acquired));
+}
+
+void Session::release(DeliveryId first, DeliveryId last)
+{
+    Mutex::ScopedLock locker(deliveryLock);
+    AckRange range = findRange(first, last);
+    for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
+}
+
+void Session::reject(DeliveryId first, DeliveryId last)
+{
+    Mutex::ScopedLock locker(deliveryLock);
+    AckRange range = findRange(first, last);
+    for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
+    //need to remove the delivery records as well
+    unacked.erase(range.start, range.end);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Mon Sep  3 10:35:35 2007
@@ -40,6 +40,7 @@
 #include <boost/ptr_container/ptr_vector.hpp>
 
 #include <list>
+#include <vector>
 
 namespace qpid {
 
@@ -80,7 +81,7 @@
       public:
         ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token, 
                      const string& name, Queue::shared_ptr queue,
-                     bool ack, bool nolocal, bool acquire=true);
+                     bool ack, bool nolocal, bool acquire);
         ~ConsumerImpl();
         bool deliver(QueuedMessage& msg);            
         void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
@@ -131,6 +132,8 @@
 
     // FIXME aconway 2007-08-31: remove, temporary hack.
     SemanticHandler* semanticHandler;
+
+    AckRange findRange(DeliveryId first, DeliveryId last);
     
 
   public:
@@ -166,7 +169,7 @@
      *@param tagInOut - if empty it is updated with the generated token.
      */
     void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, 
-                 bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0);
+                 bool nolocal, bool acks, bool acquire, bool exclusive, const framing::FieldTable* = 0);
 
     void cancel(const string& tag);
 
@@ -192,6 +195,9 @@
     void recover(bool requeue);
     void flow(bool active);
     void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);            
+    void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired);
+    void release(DeliveryId first, DeliveryId last);
+    void reject(DeliveryId first, DeliveryId last);
 
     void handle(Message::shared_ptr msg);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp Mon Sep  3 10:35:35 2007
@@ -44,6 +44,22 @@
     return 2 /*count*/ + (size() * 4);
 }
 
+SequenceNumberSet SequenceNumberSet::condense() const
+{
+    SequenceNumberSet result;
+    const_iterator last = end();
+    for (const_iterator i = begin(); i != end(); i++) {
+        if (last == end()) {
+            last = i;
+        } else if (*i - *last > 1) {
+            result.push_back(*last);
+            result.push_back(*i);            
+            last = end();
+        }
+    }
+    return result;
+}
+
 namespace qpid{
 namespace framing{
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h Mon Sep  3 10:35:35 2007
@@ -39,6 +39,7 @@
     void encode(Buffer& buffer) const;
     void decode(Buffer& buffer);
     uint32_t encodedSize() const;   
+    SequenceNumberSet condense() const;
 
     friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&);
 };    

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp Mon Sep  3 10:35:35 2007
@@ -78,7 +78,7 @@
             messages.push_back(msg);
             QueuedMessage qm;
             qm.payload = msg;
-            deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1)));
+            deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1), true));
         }
 
         //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Mon Sep  3 10:35:35 2007
@@ -1,4 +1,3 @@
 tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate
-tests_0-10.message.MessageTests.test_reject
 tests_0-10.basic.BasicTests.test_get
 

Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Mon Sep  3 10:35:35 2007
@@ -126,6 +126,9 @@
   def message_append(self, ch, msg):
     ch.references.get(msg.reference).append(msg.bytes)
 
+  def message_acquired(self, ch, msg):
+    ch.control_queue.put(msg)
+
   def basic_deliver(self, ch, msg):
     self.client.queue(msg.consumer_tag).put(msg)
 

Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Mon Sep  3 10:35:35 2007
@@ -190,6 +190,7 @@
     self.completion = OutgoingCompletion()
     self.incoming_completion = IncomingCompletion(self)
     self.futures = {}
+    self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves
 
     # Use reliable framing if version == 0-9.
     if spec.major == 0 and spec.minor == 9:

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Mon Sep  3 10:35:35 2007
@@ -339,20 +339,19 @@
         msg = queue.get(timeout=1)
         self.assertEqual(large, msg.content.body)
 
-
-
     def test_reject(self):
         channel = self.channel
-        channel.queue_declare(queue = "q", exclusive=True)
+        channel.queue_declare(queue = "q", exclusive=True, alternate_exchange="amq.fanout")
+        channel.queue_declare(queue = "r", exclusive=True)
+        channel.queue_bind(queue = "r", exchange = "amq.fanout")
 
-        channel.message_subscribe(queue = "q", destination = "consumer")
+        channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
         channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
         msg = self.client.queue("consumer").get(timeout = 1)
         self.assertEquals(msg.content.body, "blah, blah")
-        channel.message_cancel(destination = "consumer")
-        msg.reject()
+        channel.message_reject([msg.command_id, msg.command_id])
 
-        channel.message_subscribe(queue = "q", destination = "checker")
+        channel.message_subscribe(queue = "r", destination = "checker")
         msg = self.client.queue("checker").get(timeout = 1)
         self.assertEquals(msg.content.body, "blah, blah")
 
@@ -492,6 +491,71 @@
             msg.complete(cumulative=False)
             self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
             self.assertEmpty(q)
+
+    def test_subscribe_not_acquired(self):
+        """
+        Test the not-acquired modes works as expected for a simple case
+        """
+        #NOTE: I'm using not-acquired == 1 and pre-acquired == 0 as
+        #that keeps the default behaviour as expected. This was
+        #discussed by the SIG, and I'd rather not change all the
+        #existing tests twice.
+        
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True)
+        for i in range(1, 6):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+        channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+        channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+
+        for i in range(6, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+        #both subscribers should see all messages
+        qA = self.client.queue("a")
+        qB = self.client.queue("b")
+        for i in range(1, 11):
+            for q in [qA, qB]:
+                msg = q.get(timeout = 1)
+                self.assertEquals("Message %s" % i, msg.content.body)
+                msg.complete()
+
+        #messages should still be on the queue:
+        self.assertEquals(10, channel.queue_query(queue = "q").message_count)
+
+    def test_acquire(self):
+        """
+        Test explicit acquire function
+        """
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True)
+        channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+
+        channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
+        msg = self.client.queue("a").get(timeout = 1)
+        channel.message_acquire([msg.command_id, msg.command_id])
+        msg.complete()
+
+        #message should have been removed from the queue:
+        self.assertEquals(0, channel.queue_query(queue = "q").message_count)
+
+    def test_release(self):
+        """
+        Test explicit release function
+        """
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True)
+        channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
+
+        channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
+        msg = self.client.queue("a").get(timeout = 1)
+        channel.message_cancel(destination = "a")
+        channel.message_release([msg.command_id, msg.command_id])
+        msg.complete()
+
+        #message should not have been removed from the queue:
+        self.assertEquals(1, channel.queue_query(queue = "q").message_count)
 
     def assertDataEquals(self, channel, msg, expected):
         self.assertEquals(expected, msg.content.body)