You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/08/10 16:51:14 UTC

svn commit: r564611 - in /incubator/qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/tests/ python/tests_0-10/ specs/

Author: gsim
Date: Fri Aug 10 07:51:08 2007
New Revision: 564611

URL: http://svn.apache.org/viewvc?view=rev&rev=564611
Log:
Broker management of message acknowledgements now runs entirely off execution layer.
Flow control support.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
    incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py
    incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Aug 10 07:51:08 2007
@@ -280,6 +280,7 @@
   qpid/broker/Deliverable.h \
   qpid/broker/DeliverableMessage.h \
   qpid/broker/DeliveryAdapter.h \
+  qpid/broker/DeliveryId.h \
   qpid/broker/DeliveryToken.h \
   qpid/broker/DirectExchange.h \
   qpid/broker/DtxAck.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp Fri Aug 10 07:51:08 2007
@@ -21,37 +21,125 @@
 #include "AccumulatedAck.h"
 
 #include <assert.h>
+#include <iostream>
 
-using std::less_equal;
-using std::bind2nd;
+using std::list;
+using std::max;
+using std::min;
 using namespace qpid::broker;
 
-void AccumulatedAck::update(uint64_t firstTag, uint64_t lastTag){
-    assert(firstTag<=lastTag);
-    if (firstTag <= range + 1) {
-        if (lastTag > range) range = lastTag;
+void AccumulatedAck::update(DeliveryId first, DeliveryId last){
+    assert(first <= last);
+    if (last < mark) return;
+
+
+    Range r(first, last);
+    bool handled = false;
+    list<Range>::iterator merged = ranges.end();
+    if (r.mergeable(mark)) {
+        mark = r.end;
+        merged = ranges.begin();
+        handled = true;
+    } else {
+        for (list<Range>::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) {
+            if (i->merge(r)) {
+                merged = i;
+                handled = true;
+            } else if (r.start < i->start) {
+                ranges.insert(i, r);
+                handled = true;
+            }
+        }
+    }
+    if (!handled) {
+        ranges.push_back(r);
     } else {
-    	for (uint64_t tag = firstTag; tag<=lastTag; tag++)
-            individual.push_back(tag);
+        while (!ranges.empty() && ranges.front().end <= mark) { 
+            ranges.pop_front(); 
+        }
+        //new range is incorporated, but may be possible to consolidate
+        if (merged == ranges.begin()) {
+            //consolidate mark
+            while (merged != ranges.end() && merged->mergeable(mark)) {
+                mark = merged->end;
+                merged = ranges.erase(merged);
+            }
+        }
+        if (merged != ranges.end()) {
+            //consolidate ranges
+            list<Range>::iterator i = merged;
+            list<Range>::iterator j = i++;
+            while (i != ranges.end() && j->merge(*i)) {
+                j = i++;
+            }
+        }
     }
 }
 
-void AccumulatedAck::consolidate(){
-    individual.sort();
-    //remove any individual tags that are covered by range
-    individual.remove_if(bind2nd(less_equal<uint64_t>(), range));
-    //update range if possible (using <= allows for duplicates from overlapping ranges)
-    while (individual.front() <= range + 1) {
-        range = individual.front();
-        individual.pop_front();
+void AccumulatedAck::consolidate(){}
+
+void AccumulatedAck::clear(){
+    mark = 0;//not sure that this is valid when wraparound is a possibility
+    ranges.clear();
+}
+
+bool AccumulatedAck::covers(DeliveryId tag) const{
+    if (tag <= mark) return true;
+    for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+        if (i->contains(tag)) return true;
     }
+    return false;
 }
 
-void AccumulatedAck::clear(){
-    range = 0;
-    individual.clear();
+bool Range::contains(DeliveryId i) const 
+{ 
+    return i >= start && i <= end; 
 }
 
-bool AccumulatedAck::covers(uint64_t tag) const{
-    return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end();
+bool Range::intersect(const Range& r) const 
+{ 
+    return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end); 
 }
+
+bool Range::merge(const Range& r) 
+{ 
+    if (intersect(r) || mergeable(r.end) || r.mergeable(end)) {
+        start = min(start, r.start); 
+        end = max(end, r.end); 
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool Range::mergeable(const DeliveryId& s) const
+{ 
+    if (contains(s) || start - s == 1) {
+        return true;
+    } else {
+        return false;
+    }
+}
+
+Range::Range(DeliveryId s, DeliveryId e) : start(s), end(e) {}
+
+
+namespace qpid{
+namespace broker{
+    std::ostream& operator<<(std::ostream& out, const Range& r)
+    {
+        out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]";
+        return out;
+    }
+
+    std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a)
+    { 
+        out << "{mark: " << a.mark.getValue() << ", ranges: (";
+        for (list<Range>::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) {        
+            if (i != a.ranges.begin()) out << ", ";
+            out << *i;
+        }
+        out << ")]";
+        return out;
+    }
+}}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h Fri Aug 10 07:51:08 2007
@@ -24,9 +24,23 @@
 #include <algorithm>
 #include <functional>
 #include <list>
+#include <ostream>
+#include "DeliveryId.h"
 
 namespace qpid {
     namespace broker {
+
+        struct Range
+        {
+            DeliveryId start;
+            DeliveryId end;
+
+            Range(DeliveryId s, DeliveryId e);
+            bool contains(DeliveryId i) const;
+            bool intersect(const Range& r) const;
+            bool merge(const Range& r);
+            bool mergeable(const DeliveryId& r) const;
+        };
         /**
          * Keeps an accumulated record of acked messages (by delivery
          * tag).
@@ -37,19 +51,21 @@
              * If not zero, then everything up to this value has been
              * acked.
              */
-            uint64_t range;
+            DeliveryId mark;
             /**
              * List of individually acked messages that are not
              * included in the range marked by 'range'.
              */
-            std::list<uint64_t> individual;
+            std::list<Range> ranges;
 
-            AccumulatedAck(uint64_t r) : range(r) {}
-            void update(uint64_t firstTag, uint64_t lastTag);
+            AccumulatedAck(DeliveryId r) : mark(r) {}
+            void update(DeliveryId firstTag, DeliveryId lastTag);
             void consolidate();
             void clear();
-            bool covers(uint64_t tag) const;
+            bool covers(DeliveryId tag) const;
         };
+        std::ostream& operator<<(std::ostream&, const Range&);
+        std::ostream& operator<<(std::ostream&, const AccumulatedAck&);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Fri Aug 10 07:51:08 2007
@@ -325,7 +325,7 @@
     //also version specific behaviour now)
     if (newTag.empty()) newTag = tagGenerator.generate();
     DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag));
-    channel.consume(token, newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+    channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
 
     if(!nowait) client.consumeOk(newTag);
 
@@ -365,7 +365,11 @@
 } 
         
 void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
-	channel.ack(deliveryTag, multiple);
+    if (multiple) {
+        channel.ackCumulative(deliveryTag);
+    } else {
+        channel.ackRange(deliveryTag, deliveryTag);
+    }
 } 
         
 void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){} 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Fri Aug 10 07:51:08 2007
@@ -53,7 +53,6 @@
     id(_id),
     connection(con),
     out(_out),
-    currentDeliveryTag(1),
     prefetchSize(0),
     prefetchCount(0),
     tagGenerator("sgen"),
@@ -75,17 +74,13 @@
     return consumers.find(consumerTag) != consumers.end();
 }
 
-// TODO aconway 2007-02-12: Why is connection token passed in instead
-// of using the channel's parent connection?
 void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut, 
-                      Queue::shared_ptr queue, bool acks,
-                      bool exclusive, ConnectionToken* const connection,
-                      const FieldTable*)
+                      Queue::shared_ptr queue, bool nolocal, bool acks,
+                      bool exclusive, const FieldTable*)
 {
     if(tagInOut.empty())
         tagInOut = tagGenerator.generate();
-    std::auto_ptr<ConsumerImpl> c(
-    new ConsumerImpl(this, token, tagInOut, queue, connection, acks));
+    std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
     queue->consume(c.get(), exclusive);//may throw exception
     consumers.insert(tagInOut, c.release());
 }
@@ -210,7 +205,7 @@
 void Channel::record(const DeliveryRecord& delivery)
 {
     unacked.push_back(delivery);
-    delivery.addTo(&outstanding);
+    delivery.addTo(outstanding);
 }
 
 bool Channel::checkPrefetch(Message::shared_ptr& msg)
@@ -221,33 +216,61 @@
     return countOk && sizeOk;
 }
 
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token,
-                                    const string& _tag, Queue::shared_ptr _queue, 
-                                    ConnectionToken* const _connection, bool ack
-                                    ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection),
-                                        ackExpected(ack), blocked(false) {}
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, 
+                                    DeliveryToken::shared_ptr _token,
+                                    const string& _name, 
+                                    Queue::shared_ptr _queue, 
+                                    bool ack,
+                                    bool _nolocal 
+                                    ) : parent(_parent), 
+                                        token(_token), 
+                                        name(_name), 
+                                        queue(_queue), 
+                                        ackExpected(ack), 
+                                        nolocal(_nolocal), 
+                                        blocked(false), 
+                                        windowing(true), 
+                                        msgCredit(0xFFFFFFFF), 
+                                        byteCredit(0xFFFFFFFF) {}
 
 bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg)
 {
-    if(!connection || connection != msg->getPublisher()){//check for no_local
-        if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
+    if (nolocal && &(parent->connection) == msg->getPublisher()) {
+        return false;
+    } else {
+        if (!checkCredit(msg) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))) {
             blocked = true;
-        }else{
+        } else {
             blocked = false;
+
             Mutex::ScopedLock locker(parent->deliveryLock);
 
-            uint64_t deliveryTag = parent->out.deliver(msg, token);
-            if(ackExpected){
-                parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
+            DeliveryId deliveryTag = parent->out.deliver(msg, token);
+            if (ackExpected) {
+                parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
             }
+        }
+        return !blocked;
+    }
+}
 
-            return true;
+bool Channel::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+{
+    Mutex::ScopedLock l(lock);
+    if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
+        return false;
+    } else {
+        if (msgCredit != 0xFFFFFFFF) {
+            msgCredit--;
+        }
+        if (byteCredit != 0xFFFFFFFF) {
+            byteCredit -= msg->getRequiredCredit();
         }
+        return true;
     }
-    return false;
 }
 
-void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
+void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
     Mutex::ScopedLock locker(parent->deliveryLock);
     parent->out.redeliver(msg, token, deliveryTag);
 }
@@ -326,55 +349,71 @@
 
 }
 
-// Used by Basic
-void Channel::ack(uint64_t deliveryTag, bool multiple)
+void Channel::ackCumulative(DeliveryId id)
 {
-    if (multiple)
-        ack(0, deliveryTag);
-    else
-        ack(deliveryTag, deliveryTag);
+    ack(id, id, true);
 }
 
-void Channel::ack(uint64_t firstTag, uint64_t lastTag)
+void Channel::ackRange(DeliveryId first, DeliveryId last)
 {
+    ack(first, last, false);
+}
+
+void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative)
+{
+    Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+    
+    ack_iterator start = cumulative ? unacked.begin() : 
+        find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+    ack_iterator end = start;
+     
+    if (cumulative || first != last) {
+        //need to find end (position it just after the last record in range)
+        end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+    } else {
+        //just acked single element (move end past it)
+        ++end;
+    }
+    
+    for_each(start, end, boost::bind(&Channel::acknowledged, this, _1));
+    
     if (txBuffer.get()) {
-        accumulatedAck.update(firstTag, lastTag);
-        //TODO: I think the outstanding prefetch size & count should be updated at this point...
-        //TODO: ...this may then necessitate dispatching to consumers
+        //in transactional mode, don't dequeue or remove, just
+        //maintain set of acknowledged messages:
+        accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+        
         if (dtxBuffer.get()) {
+            //if enlisted in a dtx, remove the relevant slice from
+            //unacked and record it against that transaction
             TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
             accumulatedAck.clear();
             dtxBuffer->enlist(txAck);    
         }
-
     } else {
-        Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+        for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+        unacked.erase(start, end);
+    }
     
-        ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
-        ack_iterator j = (firstTag == 0) ?
-            unacked.begin() :
-            find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
-        	
-        if(i == unacked.end()){
-            throw ConnectionException(530, "Received ack for unrecognised delivery tag");
-        }else if(i!=j){
-            ack_iterator end = ++i;
-            for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
-            unacked.erase(unacked.begin(), end);
-
-            //recalculate the prefetch:
-            outstanding.reset();
-            for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
-        }else{
-            i->discard();
-            i->subtractFrom(&outstanding);
-            unacked.erase(i);        
-        }
+    //if the prefetch limit had previously been reached, or credit
+    //had expired in windowing mode there may be messages that can
+    //be now be delivered
+    for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+}
+
+void Channel::acknowledged(const DeliveryRecord& delivery)
+{
+    delivery.subtractFrom(outstanding);
+    ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
+    if (i != consumers.end()) {
+        i->acknowledged(delivery);
+    }
+}
 
-        //if the prefetch limit had previously been reached, there may
-        //be messages that can be now be delivered
-        std::for_each(consumers.begin(), consumers.end(),
-                      boost::bind(&ConsumerImpl::requestDispatch, _1));
+void Channel::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+{
+    if (windowing) {
+        if (msgCredit != 0xFFFFFFFF) msgCredit++;
+        if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
     }
 }
 
@@ -384,6 +423,8 @@
 
     if(requeue){
         outstanding.reset();
+        //take copy and clear unacked as requeue may result in redelivery to this channel
+        //which will in turn result in additions to unacked
         std::list<DeliveryRecord> copy = unacked;
         unacked.clear();
         for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
@@ -397,7 +438,7 @@
     Message::shared_ptr msg = queue->dequeue();
     if(msg){
         Mutex::ScopedLock locker(deliveryLock);
-        uint64_t myDeliveryTag = out.deliver(msg, token);
+        DeliveryId myDeliveryTag = out.deliver(msg, token);
         if(ackExpected){
             unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
         }
@@ -408,7 +449,7 @@
 }
 
 void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
-                      uint64_t deliveryTag)
+                      DeliveryId deliveryTag)
 {
     ConsumerImplMap::iterator i = consumers.find(consumerTag);
     if (i != consumers.end()){
@@ -425,4 +466,83 @@
         //there may be messages that can be now be delivered
         std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
     }
+}
+
+
+Channel::ConsumerImpl& Channel::find(const std::string& destination)
+{
+    ConsumerImplMap::iterator i = consumers.find(destination);
+    if (i == consumers.end()) {
+        throw ChannelException(404, boost::format("Unknown destination %1%") % destination);
+    } else {
+        return *i;
+    }
+}
+
+void Channel::setWindowMode(const std::string& destination)
+{
+    find(destination).setWindowMode();
+}
+
+void Channel::setCreditMode(const std::string& destination)
+{
+    find(destination).setCreditMode();
+}
+
+void Channel::addByteCredit(const std::string& destination, uint32_t value)
+{
+    find(destination).addByteCredit(value);
+}
+
+
+void Channel::addMessageCredit(const std::string& destination, uint32_t value)
+{
+    find(destination).addMessageCredit(value);
+}
+
+void Channel::flush(const std::string& destination)
+{
+    ConsumerImpl& c = find(destination);
+    c.flush();
+}
+
+
+void Channel::stop(const std::string& destination)
+{
+    find(destination).stop();
+}
+
+void Channel::ConsumerImpl::setWindowMode()
+{
+    windowing = true;
+}
+
+void Channel::ConsumerImpl::setCreditMode()
+{
+    windowing = false;
+}
+
+void Channel::ConsumerImpl::addByteCredit(uint32_t value)
+{
+    byteCredit += value;
+    requestDispatch();
+}
+
+void Channel::ConsumerImpl::addMessageCredit(uint32_t value)
+{
+    msgCredit += value;
+    requestDispatch();
+}
+
+void Channel::ConsumerImpl::flush()
+{
+    //TODO: need to wait until any messages that are available for
+    //this consumer have been delivered... i.e. some sort of flush on
+    //the queue...
+}
+
+void Channel::ConsumerImpl::stop()
+{
+    msgCredit = 0;
+    byteCredit = 0;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Fri Aug 10 07:51:08 2007
@@ -64,23 +64,35 @@
 {
     class ConsumerImpl : public Consumer
     {
-        Channel* parent;
-        DeliveryToken::shared_ptr token;
-        const string tag;
-        Queue::shared_ptr queue;
-        ConnectionToken* const connection;
+        sys::Mutex lock;
+        Channel* const parent;
+        const DeliveryToken::shared_ptr token;
+        const string name;
+        const Queue::shared_ptr queue;
         const bool ackExpected;
+        const bool nolocal;
         bool blocked;
+        bool windowing;
+        uint32_t msgCredit;
+        uint32_t byteCredit;
 
       public:
         ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token, 
-                     const string& tag, Queue::shared_ptr queue,
-                     ConnectionToken* const connection, bool ack);
+                     const string& name, Queue::shared_ptr queue, bool ack, bool nolocal);
         ~ConsumerImpl();
         bool deliver(Message::shared_ptr& msg);            
-        void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag);
+        void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
         void cancel();
         void requestDispatch();
+
+        void setWindowMode();
+        void setCreditMode();
+        void addByteCredit(uint32_t value);
+        void addMessageCredit(uint32_t value);
+        void flush();
+        void stop();
+        bool checkCredit(Message::shared_ptr& msg);
+        void acknowledged(const DeliveryRecord&);    
     };
 
     typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
@@ -88,7 +100,6 @@
     framing::ChannelId id;
     Connection& connection;
     DeliveryAdapter& out;
-    uint64_t currentDeliveryTag;
     Queue::shared_ptr defaultQueue;
     ConsumerImplMap consumers;
     uint32_t prefetchSize;    
@@ -110,6 +121,9 @@
     void record(const DeliveryRecord& delivery);
     bool checkPrefetch(Message::shared_ptr& msg);
     void checkDtxTimeout();
+    ConsumerImpl& find(const std::string& destination);
+    void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
+    void acknowledged(const DeliveryRecord&);
         
   public:
     Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0);    
@@ -129,10 +143,17 @@
     /**
      *@param tagInOut - if empty it is updated with the generated token.
      */
-    void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks,
-                 bool exclusive, ConnectionToken* const connection = 0,
-                 const framing::FieldTable* = 0);
+    void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, 
+                 bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0);
     void cancel(const string& tag);
+
+    void setWindowMode(const std::string& destination);
+    void setCreditMode(const std::string& destination);
+    void addByteCredit(const std::string& destination, uint32_t value);
+    void addMessageCredit(const std::string& destination, uint32_t value);
+    void flush(const std::string& destination);
+    void stop(const std::string& destination);
+
     bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
     void close();
     void startTx();
@@ -143,11 +164,11 @@
     void endDtx(const std::string& xid, bool fail);
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
-    void ack(uint64_t deliveryTag, bool multiple);
-    void ack(uint64_t deliveryTag, uint64_t endTag);
+    void ackCumulative(DeliveryId deliveryTag);
+    void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
     void recover(bool requeue);
     void flow(bool active);
-    void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag);            
+    void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);            
     void handlePublish(Message* msg);
     void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
     void handleContent(boost::shared_ptr<framing::AMQContentBody>);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp Fri Aug 10 07:51:08 2007
@@ -110,45 +110,43 @@
 }
 
 void BasicMessage::deliver(ChannelAdapter& channel, 
-                           const string& consumerTag, uint64_t deliveryTag, 
+                           const string& consumerTag, DeliveryId id, 
                            uint32_t framesize)
 {
     channel.send(make_shared_ptr(
     	new BasicDeliverBody(
-            channel.getVersion(), consumerTag, deliveryTag,
+            channel.getVersion(), consumerTag, id.getValue(),
             getRedelivered(), getExchange(), getRoutingKey())));
     sendContent(channel, framesize);
 }
 
 void BasicMessage::sendGetOk(ChannelAdapter& channel,
                              uint32_t messageCount,
-                             uint64_t /*responseTo*/, 
-                             uint64_t deliveryTag, 
+                             DeliveryId id, 
                              uint32_t framesize)
 {
     channel.send(make_shared_ptr(
         new BasicGetOkBody(
             channel.getVersion(),
-            //responseTo,
-            deliveryTag, getRedelivered(), getExchange(),
+            id.getValue(), getRedelivered(), getExchange(),
             getRoutingKey(), messageCount))); 
     sendContent(channel, framesize);
 }
 
-void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize)
+void BasicMessage::deliver(framing::ChannelAdapter& channel, DeliveryId id, DeliveryToken::shared_ptr token, uint32_t framesize)
 {
     BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token);
     if (consume) {
-        deliver(channel, consume->consumer, deliveryTag, framesize);
+        deliver(channel, consume->consumer, id, framesize);
     } else {
         BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token);
         if (get) {
-            uint64_t request(1/*actual value doesn't affect anything at present*/);
-            sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize);
+            sendGetOk(channel, get->queue->getMessageCount(), id.getValue(), framesize);
         } else {
             //TODO:
             //either need to be able to convert to a message transfer or
             //throw error of some kind to allow this to be handled higher up
+            throw Exception("Conversion to BasicMessage not defined!");
         }
     }
 }
@@ -291,4 +289,10 @@
 {
     Mutex::ScopedLock locker(contentLock);
     content = _content;
+}
+
+
+uint32_t BasicMessage::getRequiredCredit() const
+{
+    return header->size() + contentSize();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h Fri Aug 10 07:51:08 2007
@@ -73,17 +73,16 @@
 
     static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue);
     static DeliveryToken::shared_ptr createConsumeToken(const string& consumer);
-    void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
+    void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
 
     void deliver(framing::ChannelAdapter&, 
                  const string& consumerTag, 
-                 uint64_t deliveryTag, 
+                 DeliveryId deliveryTag, 
                  uint32_t framesize);
     
     void sendGetOk(framing::ChannelAdapter& channel, 
                    uint32_t messageCount,
-                   uint64_t responseTo, 
-                   uint64_t deliveryTag, 
+                   DeliveryId deliveryTag, 
                    uint32_t framesize);
 
     framing::BasicHeaderProperties* getHeaderProperties();
@@ -132,6 +131,11 @@
      * it uses).
      */
     void setContent(std::auto_ptr<Content>& content);
+
+    /**
+     * Returns the byte credits required to transfer this message.
+     */
+    uint32_t getRequiredCredit() const;
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h Fri Aug 10 07:51:08 2007
@@ -25,6 +25,7 @@
 #include <string>
 #include <boost/shared_ptr.hpp>
 #include "Content.h"
+#include "DeliveryId.h"
 #include "DeliveryToken.h"
 #include "PersistableMessage.h"
 #include "qpid/framing/amqp_types.h"
@@ -92,7 +93,7 @@
     void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
     void redeliver() { redelivered = true; }
 
-    virtual void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag/*only needed for basic class*/, 
+    virtual void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag/*only needed for basic class*/, 
                          DeliveryToken::shared_ptr token, uint32_t framesize) = 0;
 
     virtual bool isComplete() = 0;
@@ -104,6 +105,8 @@
     virtual const ConnectionToken* getPublisher() const {
         return publisher;
     }
+
+    virtual uint32_t getRequiredCredit() const = 0;
 
     virtual void encode(framing::Buffer& buffer) const = 0;
     virtual void encodeHeader(framing::Buffer& buffer) const = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp Fri Aug 10 07:51:08 2007
@@ -188,7 +188,7 @@
 }
 
 
-void MessageMessage::deliver(ChannelAdapter& channel, uint64_t, DeliveryToken::shared_ptr token, uint32_t framesize)
+void MessageMessage::deliver(ChannelAdapter& channel, DeliveryId, DeliveryToken::shared_ptr token, uint32_t framesize)
 {
     transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize);
 }
@@ -319,6 +319,14 @@
 MessageMessage::ReferencePtr MessageMessage::getReference() const {
     return reference;
 }
+
+uint32_t MessageMessage::getRequiredCredit() const
+{
+    //TODO: change when encoding changes. Should be the payload of any
+    //header & body frames.
+    return transfer->size();
+}
+
 
 DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h Fri Aug 10 07:51:08 2007
@@ -53,7 +53,7 @@
     TransferPtr getTransfer() const { return transfer; }
     ReferencePtr getReference() const ;
     
-    void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
+    void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
     void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize);
 
     bool isComplete();
@@ -71,6 +71,7 @@
     uint64_t expectedContentSize();
     void decodeHeader(framing::Buffer& buffer);
     void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
+    uint32_t getRequiredCredit() const;
 
     static DeliveryToken::shared_ptr getToken(const std::string& destination);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h Fri Aug 10 07:51:08 2007
@@ -22,13 +22,13 @@
 #define _DeliveryAdapter_
 
 #include "BrokerMessageBase.h"
+#include "DeliveryId.h"
 #include "DeliveryToken.h"
 #include "qpid/framing/amqp_types.h"
 
 namespace qpid {
 namespace broker {
 
-    typedef framing::RequestId DeliveryId;
     /**
      * The intention behind this interface is to separate the generic
      * handling of some form of message delivery to clients that is

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h?view=auto&rev=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h Fri Aug 10 07:51:08 2007
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeliveryId_
+#define _DeliveryId_
+
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace broker {
+
+    typedef framing::SequenceNumber DeliveryId;
+
+}}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Aug 10 07:51:08 2007
@@ -27,7 +27,7 @@
 DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, 
                                Queue::shared_ptr _queue, 
                                const string _consumerTag, 
-                               const uint64_t _deliveryTag) : msg(_msg), 
+                               const DeliveryId _deliveryTag) : msg(_msg), 
                                                                queue(_queue), 
                                                                consumerTag(_consumerTag),
                                                                deliveryTag(_deliveryTag),
@@ -35,21 +35,29 @@
 
 DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, 
                                Queue::shared_ptr _queue, 
-                               const uint64_t _deliveryTag) : msg(_msg), 
+                               const DeliveryId _deliveryTag) : msg(_msg), 
                                                                queue(_queue), 
                                                                consumerTag(""),
                                                                deliveryTag(_deliveryTag),
                                                                pull(true){}
 
 
-void DeliveryRecord::discard(TransactionContext* ctxt) const{
+void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
     queue->dequeue(ctxt, msg);
 }
 
-bool DeliveryRecord::matches(uint64_t tag) const{
+bool DeliveryRecord::matches(DeliveryId tag) const{
     return deliveryTag == tag;
 }
 
+bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
+    return matches(tag) || after(tag);
+}
+
+bool DeliveryRecord::after(DeliveryId tag) const{
+    return deliveryTag > tag;
+}
+
 bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
     return range->covers(deliveryTag);
 }
@@ -68,20 +76,38 @@
     queue->requeue(msg);
 }
 
-void DeliveryRecord::addTo(Prefetch* const prefetch) const{
+void DeliveryRecord::updateByteCredit(uint32_t& credit) const
+{
+    credit += msg->getRequiredCredit();
+}
+
+
+void DeliveryRecord::addTo(Prefetch& prefetch) const{
     if(!pull){
         //ignore 'pulled' messages (i.e. those that were sent in
         //response to get) when calculating prefetch
-        prefetch->size += msg->contentSize();
-        prefetch->count++;
+        prefetch.size += msg->contentSize();
+        prefetch.count++;
     }    
 }
 
-void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{
+void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{
     if(!pull){
         //ignore 'pulled' messages (i.e. those that were sent in
         //response to get) when calculating prefetch
-        prefetch->size -= msg->contentSize();
-        prefetch->count--;
+        prefetch.size -= msg->contentSize();
+        prefetch.count--;
     }
 }
+
+namespace qpid {
+namespace broker {
+
+std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) {
+    out << "{" << "id=" << r.deliveryTag.getValue();
+    out << ", consumer=" << r.consumerTag;
+    out << ", queue=" << r.queue->getName() << "}";
+    return out;
+}
+
+}}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Aug 10 07:51:08 2007
@@ -23,10 +23,12 @@
 
 #include <algorithm>
 #include <list>
+#include <ostream>
 #include "AccumulatedAck.h"
 #include "BrokerMessage.h"
 #include "Prefetch.h"
 #include "BrokerQueue.h"
+#include "DeliveryId.h"
 
 namespace qpid {
     namespace broker {
@@ -39,20 +41,27 @@
             mutable Message::shared_ptr msg;
             mutable Queue::shared_ptr queue;
             const std::string consumerTag;
-            const uint64_t deliveryTag;
+            const DeliveryId deliveryTag;
             bool pull;
 
         public:
-            DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const uint64_t deliveryTag);
-            DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const uint64_t deliveryTag);
+            DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
+            DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
             
-            void discard(TransactionContext* ctxt = 0) const;
-            bool matches(uint64_t tag) const;
+            void dequeue(TransactionContext* ctxt = 0) const;
+            bool matches(DeliveryId tag) const;
+            bool matchOrAfter(DeliveryId tag) const;
+            bool after(DeliveryId tag) const;
             bool coveredBy(const AccumulatedAck* const range) const;
             void requeue() const;
             void redeliver(Channel* const) const;
-            void addTo(Prefetch* const prefetch) const;
-            void subtractFrom(Prefetch* const prefetch) const;
+            void updateByteCredit(uint32_t& credit) const;
+            void addTo(Prefetch&) const;
+            void subtractFrom(Prefetch&) const;
+            const std::string& getConsumerTag() const { return consumerTag; } 
+            bool isPull() const { return pull; }
+            
+            friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
         };
 
         typedef std::list<DeliveryRecord>::iterator ack_iterator; 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Fri Aug 10 07:51:08 2007
@@ -26,7 +26,7 @@
 using std::mem_fun_ref;
 using namespace qpid::broker;
 
-DtxAck::DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
 {
     remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), 
                    not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
@@ -38,7 +38,7 @@
     try{
         //record dequeue in the store
         for (ack_iterator i = pending.begin(); i != pending.end(); i++) {
-            i->discard(ctxt);
+            i->dequeue(ctxt);
         }
         return true;
     }catch(...){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Fri Aug 10 07:51:08 2007
@@ -34,7 +34,7 @@
             std::list<DeliveryRecord> pending;
 
         public:
-            DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+            DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Fri Aug 10 07:51:08 2007
@@ -27,6 +27,8 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "BrokerAdapter.h"
 
+#include <boost/format.hpp>
+
 namespace qpid {
 namespace broker {
 
@@ -96,7 +98,7 @@
     if(!destination.empty() && channel.exists(destination))
         throw ConnectionException(530, "Consumer tags must be unique");
     string tag = destination;
-    channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
+    channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, !noAck, exclusive, &filter);
     // Dispatch messages as there is now a consumer.
     queue->requestDispatch();
 }
@@ -130,7 +132,7 @@
 void
 MessageHandlerImpl::ok()
 {
-    channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest());
+    throw ConnectionException(540, "Message.Ok no longer supported");    
 }
 
 void
@@ -168,6 +170,47 @@
     } else { 
         throw ConnectionException(540, "References no longer supported");
     }
+}
+
+
+
+void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
+{
+    
+    if (unit == 0) {
+        //message
+        channel.addMessageCredit(destination, value);
+    } else if (unit == 1) {
+        //bytes
+        channel.addByteCredit(destination, value);
+    } else {
+        //unknown
+        throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
+    }
+    
+}
+    
+void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
+{
+    if (mode == 0) {
+        //credit
+        channel.setCreditMode(destination);
+    } else if (mode == 1) {
+        //window
+        channel.setWindowMode(destination);
+    } else{
+        throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);        
+    }
+}
+    
+void MessageHandlerImpl::flush(const std::string& destination)
+{
+    channel.flush(destination);        
+}
+
+void MessageHandlerImpl::stop(const std::string& destination)
+{
+    channel.stop(destination);        
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h Fri Aug 10 07:51:08 2007
@@ -83,6 +83,15 @@
                  const std::string& identifier );
 
     void transfer(const framing::MethodContext& context);
+
+    void flow(const std::string& destination, u_int8_t unit, u_int32_t value);
+    
+    void flowMode(const std::string& destination, u_int8_t mode);
+    
+    void flush(const std::string& destination);
+
+    void stop(const std::string& destination);
+
   private:
     ReferenceRegistry references;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Aug 10 07:51:08 2007
@@ -63,20 +63,24 @@
 void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, 
                                             const qpid::framing::MethodContext& context)
 {
-    if (!method->invoke(this)) {
-        //else do the usual:
-        handleL4(method, context);
-        //(if the frameset is complete) we can move the execution-mark
-        //forward 
-
-        //temporary hack until channel management is moved to its own handler:
-        if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
-            ++(incoming.hwm);
+    try {
+        if (!method->invoke(this)) {
+            //else do the usual:
+            handleL4(method, context);
+            //(if the frameset is complete) we can move the execution-mark
+            //forward 
+            
+            //temporary hack until channel management is moved to its own handler:
+            if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+                ++(incoming.hwm);
+            }
+            
+            //note: need to be more sophisticated than this if we execute
+            //commands that arrive within an active message frameset (that
+            //can't happen until 0-10 framing is implemented)
         }
-
-        //note: need to be more sophisticated than this if we execute
-        //commands that arrive within an active message frameset (that
-        //can't happen until 0-10 framing is implemented)
+    }catch(const std::exception& e){
+        connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
     }
 }
 
@@ -87,15 +91,14 @@
     if (outgoing.lwm < mark) {
         outgoing.lwm = mark;
         //ack messages:
-        channel.ack(mark.getValue(), true);
+        channel.ackCumulative(mark.getValue());
         //std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
     }
     if (range.size() % 2) { //must be even number        
         throw ConnectionException(530, "Received odd number of elements in ranged mark");
     } else {
-        //TODO: need to keep a record of the full range previously acked
         for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) {
-            channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+            channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
         }
     }
 }
@@ -121,22 +124,16 @@
                 throw ConnectionException(504, out.str());
             }
         } else {
-            adapter->setResponseTo(context.getRequestId());
             method->invoke(*adapter, context);
-            adapter->setResponseTo(0);
         }
-    }catch(ChannelException& e){
-        adapter->setResponseTo(0);
+    }catch(const ChannelException& e){
         adapter->getProxy().getChannel().close(
             e.code, e.toString(),
             method->amqpClassId(), method->amqpMethodId());
         connection.closeChannel(getId());
-    }catch(ConnectionException& e){
+    }catch(const ConnectionException& e){
         connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
-    }catch(std::exception& e){
-        connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
     }
-
 }
 
 bool SemanticHandler::isOpen() const 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp Fri Aug 10 07:51:08 2007
@@ -36,7 +36,7 @@
         //dequeue all acked messages from their queues
         for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
             if (i->coveredBy(&acked)) {
-                i->discard(ctxt);
+                i->dequeue(ctxt);
             }
         }
         return true;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Fri Aug 10 07:51:08 2007
@@ -51,6 +51,11 @@
     return old;
 }
 
+SequenceNumber SequenceNumber::operator+(uint32_t i) const
+{
+    return SequenceNumber(value + i);
+}
+
 bool SequenceNumber::operator<(const SequenceNumber& other) const
 {
     return (value - other.value) < 0;
@@ -59,6 +64,16 @@
 bool SequenceNumber::operator>(const SequenceNumber& other) const
 {
     return other < *this;
+}
+
+bool SequenceNumber::operator<=(const SequenceNumber& other) const
+{
+    return *this == other || *this < other; 
+}
+
+bool SequenceNumber::operator>=(const SequenceNumber& other) const
+{
+    return *this == other || *this > other; 
 }
 
 namespace qpid {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h Fri Aug 10 07:51:08 2007
@@ -39,10 +39,13 @@
 
     SequenceNumber& operator++();//prefix ++
     const SequenceNumber operator++(int);//postfix ++
+    SequenceNumber operator+(uint32_t) const;
     bool operator==(const SequenceNumber& other) const;
     bool operator!=(const SequenceNumber& other) const;
     bool operator<(const SequenceNumber& other) const;
     bool operator>(const SequenceNumber& other) const;
+    bool operator<=(const SequenceNumber& other) const;
+    bool operator>=(const SequenceNumber& other) const;
     uint32_t getValue() const { return (uint32_t) value; }
 
     friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp Fri Aug 10 07:51:08 2007
@@ -29,77 +29,182 @@
 
 class AccumulatedAckTest : public CppUnit::TestCase  
 {
-        CPPUNIT_TEST_SUITE(AccumulatedAckTest);
-        CPPUNIT_TEST(testGeneral);
-        CPPUNIT_TEST(testCovers);
-        CPPUNIT_TEST(testUpdateAndConsolidate);
-        CPPUNIT_TEST_SUITE_END();
-
-    public:
-        void testGeneral()
-        {
-            AccumulatedAck ack(0);
-            ack.clear();
-            ack.update(3,3);
-            ack.update(7,7);
-            ack.update(9,9);
-            ack.update(1,2);
-            ack.update(4,5);
-            ack.update(6,6);
-
-            for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i));
-            CPPUNIT_ASSERT(ack.covers(9));
-
-            CPPUNIT_ASSERT(!ack.covers(8));
-            CPPUNIT_ASSERT(!ack.covers(10));
-
-            ack.consolidate();
-
-            for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i));
-            CPPUNIT_ASSERT(ack.covers(9));
-
-            CPPUNIT_ASSERT(!ack.covers(8));
-            CPPUNIT_ASSERT(!ack.covers(10));
+    CPPUNIT_TEST_SUITE(AccumulatedAckTest);
+    CPPUNIT_TEST(testGeneral);
+    CPPUNIT_TEST(testCovers);
+    CPPUNIT_TEST(testCase1);
+    CPPUNIT_TEST(testCase2);
+    CPPUNIT_TEST(testCase3);
+    CPPUNIT_TEST(testCase4);
+    CPPUNIT_TEST(testConsolidation1);
+    CPPUNIT_TEST(testConsolidation2);
+    CPPUNIT_TEST(testConsolidation3);
+    CPPUNIT_TEST_SUITE_END();
+
+public:
+    bool covers(const AccumulatedAck& ack, int i)
+    {
+        return ack.covers(DeliveryId(i));
+    }
+
+    void update(AccumulatedAck& ack, int start, int end)
+    {
+        ack.update(DeliveryId(start), DeliveryId(end));
+    }
+
+    void testGeneral()
+    {
+        AccumulatedAck ack(0);
+        ack.clear();
+        update(ack, 3,3);
+        update(ack, 7,7);
+        update(ack, 9,9);
+        update(ack, 1,2);
+        update(ack, 4,5);
+        update(ack, 6,6);
+
+        for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(covers(ack, i));
+        CPPUNIT_ASSERT(covers(ack, 9));
+
+        CPPUNIT_ASSERT(!covers(ack, 8));
+        CPPUNIT_ASSERT(!covers(ack, 10));
+
+        ack.consolidate();
+
+        for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(covers(ack, i));
+        CPPUNIT_ASSERT(covers(ack, 9));
+
+        CPPUNIT_ASSERT(!covers(ack, 8));
+        CPPUNIT_ASSERT(!covers(ack, 10));
+    }
+
+    void testCovers()
+    {
+        AccumulatedAck ack(5);
+        update(ack, 7, 7);
+        update(ack, 9, 9);
+
+        CPPUNIT_ASSERT(covers(ack, 1));
+        CPPUNIT_ASSERT(covers(ack, 2));
+        CPPUNIT_ASSERT(covers(ack, 3));
+        CPPUNIT_ASSERT(covers(ack, 4));
+        CPPUNIT_ASSERT(covers(ack, 5));
+        CPPUNIT_ASSERT(covers(ack, 7));
+        CPPUNIT_ASSERT(covers(ack, 9));
+
+        CPPUNIT_ASSERT(!covers(ack, 6));
+        CPPUNIT_ASSERT(!covers(ack, 8));
+        CPPUNIT_ASSERT(!covers(ack, 10));
+    }
+
+    void testCase1()
+    {
+        AccumulatedAck ack(3);
+        update(ack, 1,2);
+        for(int i = 1; i <= 3; i++) CPPUNIT_ASSERT(covers(ack, i));
+        CPPUNIT_ASSERT(!covers(ack, 4));
+    }
+
+    void testCase2()
+    {
+        AccumulatedAck ack(3);
+        update(ack, 3,6);
+        for(int i = 1; i <= 6; i++) CPPUNIT_ASSERT(covers(ack, i));
+        CPPUNIT_ASSERT(!covers(ack, 7));
+    }
+
+    void testCase3()
+    {
+        AccumulatedAck ack(3);
+        update(ack, 4,6);
+        for(int i = 1; i <= 6; i++) {
+            CPPUNIT_ASSERT(covers(ack, i));
         }
+        CPPUNIT_ASSERT(!covers(ack, 7));
+    }
 
-        void testCovers()
-        {
-            AccumulatedAck ack(5);
-            ack.individual.push_back(7);
-            ack.individual.push_back(9);
-            
-            CPPUNIT_ASSERT(ack.covers(1));
-            CPPUNIT_ASSERT(ack.covers(2));
-            CPPUNIT_ASSERT(ack.covers(3));
-            CPPUNIT_ASSERT(ack.covers(4));
-            CPPUNIT_ASSERT(ack.covers(5));
-            CPPUNIT_ASSERT(ack.covers(7));
-            CPPUNIT_ASSERT(ack.covers(9));
-
-            CPPUNIT_ASSERT(!ack.covers(6));
-            CPPUNIT_ASSERT(!ack.covers(8));
-            CPPUNIT_ASSERT(!ack.covers(10));
+    void testCase4()
+    {
+        AccumulatedAck ack(3);
+        update(ack, 5,6);
+        for(int i = 1; i <= 6; i++) {
+            if (i == 4) CPPUNIT_ASSERT(!covers(ack, i));
+            else CPPUNIT_ASSERT(covers(ack, i));
         }
+        CPPUNIT_ASSERT(!covers(ack, 7));
+    }
+
+    void testConsolidation1()
+    {
+        AccumulatedAck ack(3);
+        update(ack, 7,7);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+        update(ack, 8,9);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+        update(ack, 1,2);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+        update(ack, 4,5);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 5, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+        update(ack, 6,6);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 9, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size());
+
+        for(int i = 1; i <= 9; i++) CPPUNIT_ASSERT(covers(ack, i));
+        CPPUNIT_ASSERT(!covers(ack, 10));
+    }
+
+    void testConsolidation2()
+    {
+        AccumulatedAck ack(0);
+        update(ack, 10,12);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+        update(ack, 7,9);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 7, ack.ranges.front().start.getValue());
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.ranges.front().end.getValue());
+
+        update(ack, 5,7);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 5, ack.ranges.front().start.getValue());
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.ranges.front().end.getValue());
+
+        update(ack, 3,4);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.ranges.front().start.getValue());
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.ranges.front().end.getValue());
+
+        update(ack, 1,2);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size());
+
+        for(int i = 1; i <= 12; i++) CPPUNIT_ASSERT(covers(ack, i));
+        CPPUNIT_ASSERT(!covers(ack, 13));
+    }
+
+    void testConsolidation3()
+    {
+        AccumulatedAck ack(0);
+        update(ack, 10,12);
+        update(ack, 6,7);
+        update(ack, 3,4);
+        update(ack, 1,15);
+        CPPUNIT_ASSERT_EQUAL((uint32_t) 15, ack.mark.getValue());
+        CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size());
+    }
 
-        void testUpdateAndConsolidate()
-        {
-            AccumulatedAck ack(0);
-            ack.update(1, 1);
-            ack.update(3, 3);
-            ack.update(10, 10);
-            ack.update(8, 8);
-            ack.update(6, 6);
-            ack.update(3, 3);
-            ack.update(2, 2);
-            ack.update(0, 5);
-            ack.consolidate();
-            CPPUNIT_ASSERT_EQUAL((uint64_t) 6, ack.range);
-            CPPUNIT_ASSERT_EQUAL((size_t) 2, ack.individual.size());
-            list<uint64_t>::iterator i = ack.individual.begin();
-            CPPUNIT_ASSERT_EQUAL((uint64_t) 8, *i);
-            i++;
-            CPPUNIT_ASSERT_EQUAL((uint64_t) 10, *i);
-        }
 };
 
 // Make this test suite a plugin.

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp Fri Aug 10 07:51:08 2007
@@ -77,9 +77,9 @@
         }
 
         //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
-        acked.range = 5;
-        acked.individual.push_back(7);
-        acked.individual.push_back(9);
+        acked.mark = 5;
+        acked.update(7, 7);
+        acked.update(9, 9);
     }      
 
     void testPrepare()

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri Aug 10 07:51:08 2007
@@ -645,7 +645,145 @@
 
         self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
         self.assertEmpty(self.client.queue("consumer"))
+
+    def test_credit_flow_messages(self):
+        """
+        Test basic credit based flow control with unit = message
+        """
+        #declare an exclusive queue
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True)
+        #create consumer (for now that defaults to infinite credit)
+        channel.message_consume(queue = "q", destination = "c")
+        channel.message_flow_mode(mode = 0, destination = "c")
+        #set credit to zero (can remove this once move to proper default for subscribe method)
+        channel.message_stop(destination = "c")
+        #send batch of messages to queue
+        for i in range(1, 11):
+            channel.message_transfer(routing_key = "q", body = "Message %d" % i)
         
+        #set message credit to finite amount (less than enough for all messages)
+        channel.message_flow(unit = 0, value = 5, destination = "c")
+        #set infinite byte credit
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+        #check that expected number were received
+        q = self.client.queue("c")
+        for i in range(1, 6):
+            self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+        self.assertEmpty(q)
+        
+        #increase credit again and check more are received
+        for i in range(6, 11):
+            channel.message_flow(unit = 0, value = 1, destination = "c")
+            self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+            self.assertEmpty(q)
+
+    def test_credit_flow_bytes(self):
+        """
+        Test basic credit based flow control with unit = bytes
+        """
+        #declare an exclusive queue
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True)
+        #create consumer (for now that defaults to infinite credit)
+        channel.message_consume(queue = "q", destination = "c")
+        channel.message_flow_mode(mode = 0, destination = "c")
+        #set credit to zero (can remove this once move to proper default for subscribe method)
+        channel.message_stop(destination = "c")
+        #send batch of messages to queue
+        for i in range(1, 11):
+            channel.message_transfer(routing_key = "q", body = "abcdefgh")
+
+        #each message is currently interpreted as requiring 75 bytes of credit
+        #set byte credit to finite amount (less than enough for all messages)
+        channel.message_flow(unit = 1, value = 75*5, destination = "c")
+        #set infinite message credit
+        channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+        #check that expected number were received
+        q = self.client.queue("c")
+        for i in range(1, 6):
+            self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+        self.assertEmpty(q)
+        
+        #increase credit again and check more are received
+        for i in range(6, 11):
+            channel.message_flow(unit = 1, value = 75, destination = "c")
+            self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+            self.assertEmpty(q)
+
+
+    def test_window_flow_messages(self):
+        """
+        Test basic window based flow control with unit = message
+        """
+        #declare an exclusive queue
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True)
+        #create consumer (for now that defaults to infinite credit)
+        channel.message_consume(queue = "q", destination = "c")
+        channel.message_flow_mode(mode = 1, destination = "c")
+        #set credit to zero (can remove this once move to proper default for subscribe method)
+        channel.message_stop(destination = "c")
+        #send batch of messages to queue
+        for i in range(1, 11):
+            channel.message_transfer(routing_key = "q", body = "Message %d" % i)
+        
+        #set message credit to finite amount (less than enough for all messages)
+        channel.message_flow(unit = 0, value = 5, destination = "c")
+        #set infinite byte credit
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+        #check that expected number were received
+        q = self.client.queue("c")
+        for i in range(1, 6):
+            msg = q.get(timeout = 1)
+            self.assertDataEquals(channel, msg, "Message %d" % i)
+        self.assertEmpty(q)
+        
+        #acknowledge messages and check more are received
+        msg.complete(cumulative=True)
+        for i in range(6, 11):
+            self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+        self.assertEmpty(q)
+
+
+    def test_window_flow_bytes(self):
+        """
+        Test basic window based flow control with unit = bytes
+        """
+        #declare an exclusive queue
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True)
+        #create consumer (for now that defaults to infinite credit)
+        channel.message_consume(queue = "q", destination = "c")
+        channel.message_flow_mode(mode = 1, destination = "c")
+        #set credit to zero (can remove this once move to proper default for subscribe method)
+        channel.message_stop(destination = "c")
+        #send batch of messages to queue
+        for i in range(1, 11):
+            channel.message_transfer(routing_key = "q", body = "abcdefgh")
+
+        #each message is currently interpreted as requiring 75 bytes of credit
+        #set byte credit to finite amount (less than enough for all messages)
+        channel.message_flow(unit = 1, value = 75*5, destination = "c")
+        #set infinite message credit
+        channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+        #check that expected number were received
+        q = self.client.queue("c")
+        msgs = []
+        for i in range(1, 6):
+            msg = q.get(timeout = 1)
+            msgs.append(msg)
+            self.assertDataEquals(channel, msg, "abcdefgh")
+        self.assertEmpty(q)
+        
+        #ack each message individually and check more are received
+        for i in range(6, 11):
+            msg = msgs.pop()
+            msg.complete(cumulative=False)
+            self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+            self.assertEmpty(q)
+
+
         
     def assertDataEquals(self, channel, msg, expected):
         if isinstance(msg.body, ReferenceId):

Modified: incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml (original)
+++ incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml Fri Aug 10 07:51:08 2007
@@ -6900,6 +6900,128 @@
         </doc>
       </field>
     </method>
+    <!-- - Method: message.flow-mode - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+    <method name="flow-mode" index="120" label="set the flow control mode">
+      <doc>
+        Sets the mode of flow control used for a given destination.
+
+        With credit based flow control, the sender of messages continually maintains its current
+        credit balance with the recipient. The credit balance consists of two values, a message
+        count, and a byte count. Whenever message data is sent, both counts must be decremented.
+        If either value reaches zero, the flow of message data must stop. Additional credit is
+        received via the message.flow method.
+
+        The sender MUST NOT send partial framesets. This means that if there is not enough byte
+        credit available to send a complete message, the sender must either wait or use chunked
+        transfer to send the first part of the message data in a complete frameset.
+
+        Window based flow control is identical to credit based flow control, however message
+        acknowledgment implicitly grants a single unit of message credit, and the size of the
+        message in byte credits for each acknowledged message.
+      </doc>
+
+      <rule name="byte-accounting">
+        <doc>
+          The byte count is decremented by the payload size of each transmitted frame with
+          segment type header or body appearing within a message.transfer command. Note that
+          the payload size is the frame size less the frame header size (frame-size - 12).
+        </doc>
+      </rule>
+
+      <rule name="mode-switching">
+        <doc>
+          Mode switching may only occur if both outstanding credit balances are zero. There are
+          three ways for a recipient of messages to be sure that the sender 's credit balance is
+          zero:
+
+            1) The recipient may send a message.stop command to the sender. When the recipient
+               receives confirmation of completion for the message.stop command, it knows that the
+               sender's credit is zero.
+
+            2) The recipient may perform the same steps described in (1) with the message.flush
+               command substituted for the message.stop command.
+
+            3) Immediately after receiving a message.consume, the credit for that destination
+               defaults to zero.
+        </doc>
+      </rule>
+
+      <chassis name="server" implement="MUST" />
+      <chassis name="client" implement="MUST" />
+
+      <field name="destination" domain="destination" />
+      <field name="mode" domain="octet">
+        <doc>
+          One of:
+            - credit (0): choose credit based flow control
+            - window (1): choose window based flow control
+        </doc>
+      </field>
+    </method>
+
+    <!-- - Method: message.flow  - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+    <method name="flow" index="130" label="control message flow">
+      <doc>
+        This method controls the flow of message data to a given destination. It is used by the
+        recipient of messages to dynamically match the incoming rate of message flow to its
+        processing or forwarding capacity. Upon receipt of this method, the sender must add "value"
+        number of the specified unit to the available credit balance for the specified destination.
+        A value of (0xFFFFFFFF) indicates an infinite amount of credit. This disables any limit for
+        the given unit until the credit balance is zeroed with message.stop or message.flush.
+      </doc>
+
+      <!-- throws no-such-destination -->
+
+      <chassis name="server" implement="MUST" />
+      <chassis name="client" implement="MUST" />
+
+      <field name="destination" domain="destination"/>
+      <field name="unit" domain="octet">
+        <doc>
+          Specifies the unit of credit balance.
+
+          One of:
+            - message (0)
+            - byte    (1)
+        </doc>
+      </field>
+      <field name="value" domain="long">
+        <doc>
+          A value of (0xFFFFFFFF) indicates an infinite amount of credit.
+        </doc>
+      </field>
+    </method>
+
+    <!-- - Method: message.flush - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+    <method name="flush" index="140">
+      <doc>
+        Forces the sender to exhaust his credit supply. The sender's credit will always be zero when
+        this method completes. The message does not complete until all the message transfers occur.
+      </doc>
+
+      <chassis name="server" implement="MUST" />
+
+      <field name="destination" domain="destination" />
+    </method>
+
+    <!-- - Method: message.stop  - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+    <method name="stop" index="150">
+      <doc>
+        On receipt of this method, a producer of messages MUST set his credit to zero for the given
+        destination. This obeys the generic semantics of command completion, i.e. when confirmation
+        is issued credit MUST be zero and no further messages will be sent until such a time as
+        further credit is received.
+      </doc>
+
+      <chassis name="server" implement="MUST" />
+      <chassis name="client" implement="MUST" />
+
+      <field name="destination" domain="destination" />
+    </method>
 
   </class>