You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/08/10 16:51:14 UTC
svn commit: r564611 - in /incubator/qpid/trunk/qpid: cpp/src/
cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/tests/
python/tests_0-10/ specs/
Author: gsim
Date: Fri Aug 10 07:51:08 2007
New Revision: 564611
URL: http://svn.apache.org/viewvc?view=rev&rev=564611
Log:
Broker management of message acknowledgements now runs entirely off execution layer.
Flow control support.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Aug 10 07:51:08 2007
@@ -280,6 +280,7 @@
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.h \
qpid/broker/DeliveryAdapter.h \
+ qpid/broker/DeliveryId.h \
qpid/broker/DeliveryToken.h \
qpid/broker/DirectExchange.h \
qpid/broker/DtxAck.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp Fri Aug 10 07:51:08 2007
@@ -21,37 +21,125 @@
#include "AccumulatedAck.h"
#include <assert.h>
+#include <iostream>
-using std::less_equal;
-using std::bind2nd;
+using std::list;
+using std::max;
+using std::min;
using namespace qpid::broker;
-void AccumulatedAck::update(uint64_t firstTag, uint64_t lastTag){
- assert(firstTag<=lastTag);
- if (firstTag <= range + 1) {
- if (lastTag > range) range = lastTag;
+void AccumulatedAck::update(DeliveryId first, DeliveryId last){
+ assert(first <= last);
+ if (last < mark) return;
+
+
+ Range r(first, last);
+ bool handled = false;
+ list<Range>::iterator merged = ranges.end();
+ if (r.mergeable(mark)) {
+ mark = r.end;
+ merged = ranges.begin();
+ handled = true;
+ } else {
+ for (list<Range>::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) {
+ if (i->merge(r)) {
+ merged = i;
+ handled = true;
+ } else if (r.start < i->start) {
+ ranges.insert(i, r);
+ handled = true;
+ }
+ }
+ }
+ if (!handled) {
+ ranges.push_back(r);
} else {
- for (uint64_t tag = firstTag; tag<=lastTag; tag++)
- individual.push_back(tag);
+ while (!ranges.empty() && ranges.front().end <= mark) {
+ ranges.pop_front();
+ }
+ //new range is incorporated, but may be possible to consolidate
+ if (merged == ranges.begin()) {
+ //consolidate mark
+ while (merged != ranges.end() && merged->mergeable(mark)) {
+ mark = merged->end;
+ merged = ranges.erase(merged);
+ }
+ }
+ if (merged != ranges.end()) {
+ //consolidate ranges
+ list<Range>::iterator i = merged;
+ list<Range>::iterator j = i++;
+ while (i != ranges.end() && j->merge(*i)) {
+ j = i++;
+ }
+ }
}
}
-void AccumulatedAck::consolidate(){
- individual.sort();
- //remove any individual tags that are covered by range
- individual.remove_if(bind2nd(less_equal<uint64_t>(), range));
- //update range if possible (using <= allows for duplicates from overlapping ranges)
- while (individual.front() <= range + 1) {
- range = individual.front();
- individual.pop_front();
+void AccumulatedAck::consolidate(){}
+
+void AccumulatedAck::clear(){
+ mark = 0;//not sure that this is valid when wraparound is a possibility
+ ranges.clear();
+}
+
+bool AccumulatedAck::covers(DeliveryId tag) const{
+ if (tag <= mark) return true;
+ for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ if (i->contains(tag)) return true;
}
+ return false;
}
-void AccumulatedAck::clear(){
- range = 0;
- individual.clear();
+bool Range::contains(DeliveryId i) const
+{
+ return i >= start && i <= end;
}
-bool AccumulatedAck::covers(uint64_t tag) const{
- return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end();
+bool Range::intersect(const Range& r) const
+{
+ return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end);
}
+
+bool Range::merge(const Range& r)
+{
+ if (intersect(r) || mergeable(r.end) || r.mergeable(end)) {
+ start = min(start, r.start);
+ end = max(end, r.end);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool Range::mergeable(const DeliveryId& s) const
+{
+ if (contains(s) || start - s == 1) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+Range::Range(DeliveryId s, DeliveryId e) : start(s), end(e) {}
+
+
+namespace qpid{
+namespace broker{
+ std::ostream& operator<<(std::ostream& out, const Range& r)
+ {
+ out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]";
+ return out;
+ }
+
+ std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a)
+ {
+ out << "{mark: " << a.mark.getValue() << ", ranges: (";
+ for (list<Range>::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) {
+ if (i != a.ranges.begin()) out << ", ";
+ out << *i;
+ }
+ out << ")]";
+ return out;
+ }
+}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h Fri Aug 10 07:51:08 2007
@@ -24,9 +24,23 @@
#include <algorithm>
#include <functional>
#include <list>
+#include <ostream>
+#include "DeliveryId.h"
namespace qpid {
namespace broker {
+
+ struct Range
+ {
+ DeliveryId start;
+ DeliveryId end;
+
+ Range(DeliveryId s, DeliveryId e);
+ bool contains(DeliveryId i) const;
+ bool intersect(const Range& r) const;
+ bool merge(const Range& r);
+ bool mergeable(const DeliveryId& r) const;
+ };
/**
* Keeps an accumulated record of acked messages (by delivery
* tag).
@@ -37,19 +51,21 @@
* If not zero, then everything up to this value has been
* acked.
*/
- uint64_t range;
+ DeliveryId mark;
/**
* List of individually acked messages that are not
* included in the range marked by 'range'.
*/
- std::list<uint64_t> individual;
+ std::list<Range> ranges;
- AccumulatedAck(uint64_t r) : range(r) {}
- void update(uint64_t firstTag, uint64_t lastTag);
+ AccumulatedAck(DeliveryId r) : mark(r) {}
+ void update(DeliveryId firstTag, DeliveryId lastTag);
void consolidate();
void clear();
- bool covers(uint64_t tag) const;
+ bool covers(DeliveryId tag) const;
};
+ std::ostream& operator<<(std::ostream&, const Range&);
+ std::ostream& operator<<(std::ostream&, const AccumulatedAck&);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Fri Aug 10 07:51:08 2007
@@ -325,7 +325,7 @@
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag));
- channel.consume(token, newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+ channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
if(!nowait) client.consumeOk(newTag);
@@ -365,7 +365,11 @@
}
void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
- channel.ack(deliveryTag, multiple);
+ if (multiple) {
+ channel.ackCumulative(deliveryTag);
+ } else {
+ channel.ackRange(deliveryTag, deliveryTag);
+ }
}
void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Fri Aug 10 07:51:08 2007
@@ -53,7 +53,6 @@
id(_id),
connection(con),
out(_out),
- currentDeliveryTag(1),
prefetchSize(0),
prefetchCount(0),
tagGenerator("sgen"),
@@ -75,17 +74,13 @@
return consumers.find(consumerTag) != consumers.end();
}
-// TODO aconway 2007-02-12: Why is connection token passed in instead
-// of using the channel's parent connection?
void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool acks,
- bool exclusive, ConnectionToken* const connection,
- const FieldTable*)
+ Queue::shared_ptr queue, bool nolocal, bool acks,
+ bool exclusive, const FieldTable*)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(
- new ConsumerImpl(this, token, tagInOut, queue, connection, acks));
+ std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -210,7 +205,7 @@
void Channel::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
- delivery.addTo(&outstanding);
+ delivery.addTo(outstanding);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg)
@@ -221,33 +216,61 @@
return countOk && sizeOk;
}
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token,
- const string& _tag, Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack
- ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection),
- ackExpected(ack), blocked(false) {}
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent,
+ DeliveryToken::shared_ptr _token,
+ const string& _name,
+ Queue::shared_ptr _queue,
+ bool ack,
+ bool _nolocal
+ ) : parent(_parent),
+ token(_token),
+ name(_name),
+ queue(_queue),
+ ackExpected(ack),
+ nolocal(_nolocal),
+ blocked(false),
+ windowing(true),
+ msgCredit(0xFFFFFFFF),
+ byteCredit(0xFFFFFFFF) {}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg)
{
- if(!connection || connection != msg->getPublisher()){//check for no_local
- if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
+ if (nolocal && &(parent->connection) == msg->getPublisher()) {
+ return false;
+ } else {
+ if (!checkCredit(msg) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))) {
blocked = true;
- }else{
+ } else {
blocked = false;
+
Mutex::ScopedLock locker(parent->deliveryLock);
- uint64_t deliveryTag = parent->out.deliver(msg, token);
- if(ackExpected){
- parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
+ DeliveryId deliveryTag = parent->out.deliver(msg, token);
+ if (ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
}
+ }
+ return !blocked;
+ }
+}
- return true;
+bool Channel::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+{
+ Mutex::ScopedLock l(lock);
+ if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
+ return false;
+ } else {
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit--;
+ }
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit -= msg->getRequiredCredit();
}
+ return true;
}
- return false;
}
-void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
+void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
Mutex::ScopedLock locker(parent->deliveryLock);
parent->out.redeliver(msg, token, deliveryTag);
}
@@ -326,55 +349,71 @@
}
-// Used by Basic
-void Channel::ack(uint64_t deliveryTag, bool multiple)
+void Channel::ackCumulative(DeliveryId id)
{
- if (multiple)
- ack(0, deliveryTag);
- else
- ack(deliveryTag, deliveryTag);
+ ack(id, id, true);
}
-void Channel::ack(uint64_t firstTag, uint64_t lastTag)
+void Channel::ackRange(DeliveryId first, DeliveryId last)
{
+ ack(first, last, false);
+}
+
+void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative)
+{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator start = cumulative ? unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
+
+ if (cumulative || first != last) {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ } else {
+ //just acked single element (move end past it)
+ ++end;
+ }
+
+ for_each(start, end, boost::bind(&Channel::acknowledged, this, _1));
+
if (txBuffer.get()) {
- accumulatedAck.update(firstTag, lastTag);
- //TODO: I think the outstanding prefetch size & count should be updated at this point...
- //TODO: ...this may then necessitate dispatching to consumers
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+
if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
}
-
} else {
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+ for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(start, end);
+ }
- ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
- ack_iterator j = (firstTag == 0) ?
- unacked.begin() :
- find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
-
- if(i == unacked.end()){
- throw ConnectionException(530, "Received ack for unrecognised delivery tag");
- }else if(i!=j){
- ack_iterator end = ++i;
- for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
- unacked.erase(unacked.begin(), end);
-
- //recalculate the prefetch:
- outstanding.reset();
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
- }else{
- i->discard();
- i->subtractFrom(&outstanding);
- unacked.erase(i);
- }
+ //if the prefetch limit had previously been reached, or credit
+ //had expired in windowing mode there may be messages that can
+ //be now be delivered
+ for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+}
+
+void Channel::acknowledged(const DeliveryRecord& delivery)
+{
+ delivery.subtractFrom(outstanding);
+ ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
+ if (i != consumers.end()) {
+ i->acknowledged(delivery);
+ }
+}
- //if the prefetch limit had previously been reached, there may
- //be messages that can be now be delivered
- std::for_each(consumers.begin(), consumers.end(),
- boost::bind(&ConsumerImpl::requestDispatch, _1));
+void Channel::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+{
+ if (windowing) {
+ if (msgCredit != 0xFFFFFFFF) msgCredit++;
+ if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
}
}
@@ -384,6 +423,8 @@
if(requeue){
outstanding.reset();
+ //take copy and clear unacked as requeue may result in redelivery to this channel
+ //which will in turn result in additions to unacked
std::list<DeliveryRecord> copy = unacked;
unacked.clear();
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
@@ -397,7 +438,7 @@
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- uint64_t myDeliveryTag = out.deliver(msg, token);
+ DeliveryId myDeliveryTag = out.deliver(msg, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -408,7 +449,7 @@
}
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
- uint64_t deliveryTag)
+ DeliveryId deliveryTag)
{
ConsumerImplMap::iterator i = consumers.find(consumerTag);
if (i != consumers.end()){
@@ -425,4 +466,83 @@
//there may be messages that can be now be delivered
std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
}
+}
+
+
+Channel::ConsumerImpl& Channel::find(const std::string& destination)
+{
+ ConsumerImplMap::iterator i = consumers.find(destination);
+ if (i == consumers.end()) {
+ throw ChannelException(404, boost::format("Unknown destination %1%") % destination);
+ } else {
+ return *i;
+ }
+}
+
+void Channel::setWindowMode(const std::string& destination)
+{
+ find(destination).setWindowMode();
+}
+
+void Channel::setCreditMode(const std::string& destination)
+{
+ find(destination).setCreditMode();
+}
+
+void Channel::addByteCredit(const std::string& destination, uint32_t value)
+{
+ find(destination).addByteCredit(value);
+}
+
+
+void Channel::addMessageCredit(const std::string& destination, uint32_t value)
+{
+ find(destination).addMessageCredit(value);
+}
+
+void Channel::flush(const std::string& destination)
+{
+ ConsumerImpl& c = find(destination);
+ c.flush();
+}
+
+
+void Channel::stop(const std::string& destination)
+{
+ find(destination).stop();
+}
+
+void Channel::ConsumerImpl::setWindowMode()
+{
+ windowing = true;
+}
+
+void Channel::ConsumerImpl::setCreditMode()
+{
+ windowing = false;
+}
+
+void Channel::ConsumerImpl::addByteCredit(uint32_t value)
+{
+ byteCredit += value;
+ requestDispatch();
+}
+
+void Channel::ConsumerImpl::addMessageCredit(uint32_t value)
+{
+ msgCredit += value;
+ requestDispatch();
+}
+
+void Channel::ConsumerImpl::flush()
+{
+ //TODO: need to wait until any messages that are available for
+ //this consumer have been delivered... i.e. some sort of flush on
+ //the queue...
+}
+
+void Channel::ConsumerImpl::stop()
+{
+ msgCredit = 0;
+ byteCredit = 0;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Fri Aug 10 07:51:08 2007
@@ -64,23 +64,35 @@
{
class ConsumerImpl : public Consumer
{
- Channel* parent;
- DeliveryToken::shared_ptr token;
- const string tag;
- Queue::shared_ptr queue;
- ConnectionToken* const connection;
+ sys::Mutex lock;
+ Channel* const parent;
+ const DeliveryToken::shared_ptr token;
+ const string name;
+ const Queue::shared_ptr queue;
const bool ackExpected;
+ const bool nolocal;
bool blocked;
+ bool windowing;
+ uint32_t msgCredit;
+ uint32_t byteCredit;
public:
ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token,
- const string& tag, Queue::shared_ptr queue,
- ConnectionToken* const connection, bool ack);
+ const string& name, Queue::shared_ptr queue, bool ack, bool nolocal);
~ConsumerImpl();
bool deliver(Message::shared_ptr& msg);
- void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag);
+ void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
void cancel();
void requestDispatch();
+
+ void setWindowMode();
+ void setCreditMode();
+ void addByteCredit(uint32_t value);
+ void addMessageCredit(uint32_t value);
+ void flush();
+ void stop();
+ bool checkCredit(Message::shared_ptr& msg);
+ void acknowledged(const DeliveryRecord&);
};
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
@@ -88,7 +100,6 @@
framing::ChannelId id;
Connection& connection;
DeliveryAdapter& out;
- uint64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
ConsumerImplMap consumers;
uint32_t prefetchSize;
@@ -110,6 +121,9 @@
void record(const DeliveryRecord& delivery);
bool checkPrefetch(Message::shared_ptr& msg);
void checkDtxTimeout();
+ ConsumerImpl& find(const std::string& destination);
+ void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
+ void acknowledged(const DeliveryRecord&);
public:
Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0);
@@ -129,10 +143,17 @@
/**
*@param tagInOut - if empty it is updated with the generated token.
*/
- void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks,
- bool exclusive, ConnectionToken* const connection = 0,
- const framing::FieldTable* = 0);
+ void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue,
+ bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0);
void cancel(const string& tag);
+
+ void setWindowMode(const std::string& destination);
+ void setCreditMode(const std::string& destination);
+ void addByteCredit(const std::string& destination, uint32_t value);
+ void addMessageCredit(const std::string& destination, uint32_t value);
+ void flush(const std::string& destination);
+ void stop(const std::string& destination);
+
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
void close();
void startTx();
@@ -143,11 +164,11 @@
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void ack(uint64_t deliveryTag, bool multiple);
- void ack(uint64_t deliveryTag, uint64_t endTag);
+ void ackCumulative(DeliveryId deliveryTag);
+ void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
void recover(bool requeue);
void flow(bool active);
- void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag);
+ void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
void handlePublish(Message* msg);
void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<framing::AMQContentBody>);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.cpp Fri Aug 10 07:51:08 2007
@@ -110,45 +110,43 @@
}
void BasicMessage::deliver(ChannelAdapter& channel,
- const string& consumerTag, uint64_t deliveryTag,
+ const string& consumerTag, DeliveryId id,
uint32_t framesize)
{
channel.send(make_shared_ptr(
new BasicDeliverBody(
- channel.getVersion(), consumerTag, deliveryTag,
+ channel.getVersion(), consumerTag, id.getValue(),
getRedelivered(), getExchange(), getRoutingKey())));
sendContent(channel, framesize);
}
void BasicMessage::sendGetOk(ChannelAdapter& channel,
uint32_t messageCount,
- uint64_t /*responseTo*/,
- uint64_t deliveryTag,
+ DeliveryId id,
uint32_t framesize)
{
channel.send(make_shared_ptr(
new BasicGetOkBody(
channel.getVersion(),
- //responseTo,
- deliveryTag, getRedelivered(), getExchange(),
+ id.getValue(), getRedelivered(), getExchange(),
getRoutingKey(), messageCount)));
sendContent(channel, framesize);
}
-void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize)
+void BasicMessage::deliver(framing::ChannelAdapter& channel, DeliveryId id, DeliveryToken::shared_ptr token, uint32_t framesize)
{
BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token);
if (consume) {
- deliver(channel, consume->consumer, deliveryTag, framesize);
+ deliver(channel, consume->consumer, id, framesize);
} else {
BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token);
if (get) {
- uint64_t request(1/*actual value doesn't affect anything at present*/);
- sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize);
+ sendGetOk(channel, get->queue->getMessageCount(), id.getValue(), framesize);
} else {
//TODO:
//either need to be able to convert to a message transfer or
//throw error of some kind to allow this to be handled higher up
+ throw Exception("Conversion to BasicMessage not defined!");
}
}
}
@@ -291,4 +289,10 @@
{
Mutex::ScopedLock locker(contentLock);
content = _content;
+}
+
+
+uint32_t BasicMessage::getRequiredCredit() const
+{
+ return header->size() + contentSize();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessage.h Fri Aug 10 07:51:08 2007
@@ -73,17 +73,16 @@
static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue);
static DeliveryToken::shared_ptr createConsumeToken(const string& consumer);
- void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
+ void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
void deliver(framing::ChannelAdapter&,
const string& consumerTag,
- uint64_t deliveryTag,
+ DeliveryId deliveryTag,
uint32_t framesize);
void sendGetOk(framing::ChannelAdapter& channel,
uint32_t messageCount,
- uint64_t responseTo,
- uint64_t deliveryTag,
+ DeliveryId deliveryTag,
uint32_t framesize);
framing::BasicHeaderProperties* getHeaderProperties();
@@ -132,6 +131,11 @@
* it uses).
*/
void setContent(std::auto_ptr<Content>& content);
+
+ /**
+ * Returns the byte credits required to transfer this message.
+ */
+ uint32_t getRequiredCredit() const;
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageBase.h Fri Aug 10 07:51:08 2007
@@ -25,6 +25,7 @@
#include <string>
#include <boost/shared_ptr.hpp>
#include "Content.h"
+#include "DeliveryId.h"
#include "DeliveryToken.h"
#include "PersistableMessage.h"
#include "qpid/framing/amqp_types.h"
@@ -92,7 +93,7 @@
void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
void redeliver() { redelivered = true; }
- virtual void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag/*only needed for basic class*/,
+ virtual void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag/*only needed for basic class*/,
DeliveryToken::shared_ptr token, uint32_t framesize) = 0;
virtual bool isComplete() = 0;
@@ -104,6 +105,8 @@
virtual const ConnectionToken* getPublisher() const {
return publisher;
}
+
+ virtual uint32_t getRequiredCredit() const = 0;
virtual void encode(framing::Buffer& buffer) const = 0;
virtual void encodeHeader(framing::Buffer& buffer) const = 0;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp Fri Aug 10 07:51:08 2007
@@ -188,7 +188,7 @@
}
-void MessageMessage::deliver(ChannelAdapter& channel, uint64_t, DeliveryToken::shared_ptr token, uint32_t framesize)
+void MessageMessage::deliver(ChannelAdapter& channel, DeliveryId, DeliveryToken::shared_ptr token, uint32_t framesize)
{
transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize);
}
@@ -319,6 +319,14 @@
MessageMessage::ReferencePtr MessageMessage::getReference() const {
return reference;
}
+
+uint32_t MessageMessage::getRequiredCredit() const
+{
+ //TODO: change when encoding changes. Should be the payload of any
+ //header & body frames.
+ return transfer->size();
+}
+
DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination)
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h Fri Aug 10 07:51:08 2007
@@ -53,7 +53,7 @@
TransferPtr getTransfer() const { return transfer; }
ReferencePtr getReference() const ;
- void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
+ void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize);
void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize);
bool isComplete();
@@ -71,6 +71,7 @@
uint64_t expectedContentSize();
void decodeHeader(framing::Buffer& buffer);
void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
+ uint32_t getRequiredCredit() const;
static DeliveryToken::shared_ptr getToken(const std::string& destination);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h Fri Aug 10 07:51:08 2007
@@ -22,13 +22,13 @@
#define _DeliveryAdapter_
#include "BrokerMessageBase.h"
+#include "DeliveryId.h"
#include "DeliveryToken.h"
#include "qpid/framing/amqp_types.h"
namespace qpid {
namespace broker {
- typedef framing::RequestId DeliveryId;
/**
* The intention behind this interface is to separate the generic
* handling of some form of message delivery to clients that is
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h?view=auto&rev=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h Fri Aug 10 07:51:08 2007
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeliveryId_
+#define _DeliveryId_
+
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace broker {
+
+ typedef framing::SequenceNumber DeliveryId;
+
+}}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryId.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Aug 10 07:51:08 2007
@@ -27,7 +27,7 @@
DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
Queue::shared_ptr _queue,
const string _consumerTag,
- const uint64_t _deliveryTag) : msg(_msg),
+ const DeliveryId _deliveryTag) : msg(_msg),
queue(_queue),
consumerTag(_consumerTag),
deliveryTag(_deliveryTag),
@@ -35,21 +35,29 @@
DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
Queue::shared_ptr _queue,
- const uint64_t _deliveryTag) : msg(_msg),
+ const DeliveryId _deliveryTag) : msg(_msg),
queue(_queue),
consumerTag(""),
deliveryTag(_deliveryTag),
pull(true){}
-void DeliveryRecord::discard(TransactionContext* ctxt) const{
+void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
queue->dequeue(ctxt, msg);
}
-bool DeliveryRecord::matches(uint64_t tag) const{
+bool DeliveryRecord::matches(DeliveryId tag) const{
return deliveryTag == tag;
}
+bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
+ return matches(tag) || after(tag);
+}
+
+bool DeliveryRecord::after(DeliveryId tag) const{
+ return deliveryTag > tag;
+}
+
bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
return range->covers(deliveryTag);
}
@@ -68,20 +76,38 @@
queue->requeue(msg);
}
-void DeliveryRecord::addTo(Prefetch* const prefetch) const{
+void DeliveryRecord::updateByteCredit(uint32_t& credit) const
+{
+ credit += msg->getRequiredCredit();
+}
+
+
+void DeliveryRecord::addTo(Prefetch& prefetch) const{
if(!pull){
//ignore 'pulled' messages (i.e. those that were sent in
//response to get) when calculating prefetch
- prefetch->size += msg->contentSize();
- prefetch->count++;
+ prefetch.size += msg->contentSize();
+ prefetch.count++;
}
}
-void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{
+void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{
if(!pull){
//ignore 'pulled' messages (i.e. those that were sent in
//response to get) when calculating prefetch
- prefetch->size -= msg->contentSize();
- prefetch->count--;
+ prefetch.size -= msg->contentSize();
+ prefetch.count--;
}
}
+
+namespace qpid {
+namespace broker {
+
+std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) {
+ out << "{" << "id=" << r.deliveryTag.getValue();
+ out << ", consumer=" << r.consumerTag;
+ out << ", queue=" << r.queue->getName() << "}";
+ return out;
+}
+
+}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Aug 10 07:51:08 2007
@@ -23,10 +23,12 @@
#include <algorithm>
#include <list>
+#include <ostream>
#include "AccumulatedAck.h"
#include "BrokerMessage.h"
#include "Prefetch.h"
#include "BrokerQueue.h"
+#include "DeliveryId.h"
namespace qpid {
namespace broker {
@@ -39,20 +41,27 @@
mutable Message::shared_ptr msg;
mutable Queue::shared_ptr queue;
const std::string consumerTag;
- const uint64_t deliveryTag;
+ const DeliveryId deliveryTag;
bool pull;
public:
- DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const uint64_t deliveryTag);
- DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const uint64_t deliveryTag);
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
- void discard(TransactionContext* ctxt = 0) const;
- bool matches(uint64_t tag) const;
+ void dequeue(TransactionContext* ctxt = 0) const;
+ bool matches(DeliveryId tag) const;
+ bool matchOrAfter(DeliveryId tag) const;
+ bool after(DeliveryId tag) const;
bool coveredBy(const AccumulatedAck* const range) const;
void requeue() const;
void redeliver(Channel* const) const;
- void addTo(Prefetch* const prefetch) const;
- void subtractFrom(Prefetch* const prefetch) const;
+ void updateByteCredit(uint32_t& credit) const;
+ void addTo(Prefetch&) const;
+ void subtractFrom(Prefetch&) const;
+ const std::string& getConsumerTag() const { return consumerTag; }
+ bool isPull() const { return pull; }
+
+ friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
typedef std::list<DeliveryRecord>::iterator ack_iterator;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Fri Aug 10 07:51:08 2007
@@ -26,7 +26,7 @@
using std::mem_fun_ref;
using namespace qpid::broker;
-DtxAck::DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
{
remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()),
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
@@ -38,7 +38,7 @@
try{
//record dequeue in the store
for (ack_iterator i = pending.begin(); i != pending.end(); i++) {
- i->discard(ctxt);
+ i->dequeue(ctxt);
}
return true;
}catch(...){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Fri Aug 10 07:51:08 2007
@@ -34,7 +34,7 @@
std::list<DeliveryRecord> pending;
public:
- DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Fri Aug 10 07:51:08 2007
@@ -27,6 +27,8 @@
#include "qpid/framing/MessageTransferBody.h"
#include "BrokerAdapter.h"
+#include <boost/format.hpp>
+
namespace qpid {
namespace broker {
@@ -96,7 +98,7 @@
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
+ channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, !noAck, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -130,7 +132,7 @@
void
MessageHandlerImpl::ok()
{
- channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest());
+ throw ConnectionException(540, "Message.Ok no longer supported");
}
void
@@ -168,6 +170,47 @@
} else {
throw ConnectionException(540, "References no longer supported");
}
+}
+
+
+
+void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
+{
+
+ if (unit == 0) {
+ //message
+ channel.addMessageCredit(destination, value);
+ } else if (unit == 1) {
+ //bytes
+ channel.addByteCredit(destination, value);
+ } else {
+ //unknown
+ throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
+ }
+
+}
+
+void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
+{
+ if (mode == 0) {
+ //credit
+ channel.setCreditMode(destination);
+ } else if (mode == 1) {
+ //window
+ channel.setWindowMode(destination);
+ } else{
+ throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
+ }
+}
+
+void MessageHandlerImpl::flush(const std::string& destination)
+{
+ channel.flush(destination);
+}
+
+void MessageHandlerImpl::stop(const std::string& destination)
+{
+ channel.stop(destination);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h Fri Aug 10 07:51:08 2007
@@ -83,6 +83,15 @@
const std::string& identifier );
void transfer(const framing::MethodContext& context);
+
+ void flow(const std::string& destination, u_int8_t unit, u_int32_t value);
+
+ void flowMode(const std::string& destination, u_int8_t mode);
+
+ void flush(const std::string& destination);
+
+ void stop(const std::string& destination);
+
private:
ReferenceRegistry references;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Aug 10 07:51:08 2007
@@ -63,20 +63,24 @@
void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const qpid::framing::MethodContext& context)
{
- if (!method->invoke(this)) {
- //else do the usual:
- handleL4(method, context);
- //(if the frameset is complete) we can move the execution-mark
- //forward
-
- //temporary hack until channel management is moved to its own handler:
- if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- ++(incoming.hwm);
+ try {
+ if (!method->invoke(this)) {
+ //else do the usual:
+ handleL4(method, context);
+ //(if the frameset is complete) we can move the execution-mark
+ //forward
+
+ //temporary hack until channel management is moved to its own handler:
+ if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) {
+ ++(incoming.hwm);
+ }
+
+ //note: need to be more sophisticated than this if we execute
+ //commands that arrive within an active message frameset (that
+ //can't happen until 0-10 framing is implemented)
}
-
- //note: need to be more sophisticated than this if we execute
- //commands that arrive within an active message frameset (that
- //can't happen until 0-10 framing is implemented)
+ }catch(const std::exception& e){
+ connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
}
@@ -87,15 +91,14 @@
if (outgoing.lwm < mark) {
outgoing.lwm = mark;
//ack messages:
- channel.ack(mark.getValue(), true);
+ channel.ackCumulative(mark.getValue());
//std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
- //TODO: need to keep a record of the full range previously acked
for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) {
- channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
}
@@ -121,22 +124,16 @@
throw ConnectionException(504, out.str());
}
} else {
- adapter->setResponseTo(context.getRequestId());
method->invoke(*adapter, context);
- adapter->setResponseTo(0);
}
- }catch(ChannelException& e){
- adapter->setResponseTo(0);
+ }catch(const ChannelException& e){
adapter->getProxy().getChannel().close(
e.code, e.toString(),
method->amqpClassId(), method->amqpMethodId());
connection.closeChannel(getId());
- }catch(ConnectionException& e){
+ }catch(const ConnectionException& e){
connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
-
}
bool SemanticHandler::isOpen() const
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp Fri Aug 10 07:51:08 2007
@@ -36,7 +36,7 @@
//dequeue all acked messages from their queues
for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
if (i->coveredBy(&acked)) {
- i->discard(ctxt);
+ i->dequeue(ctxt);
}
}
return true;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Fri Aug 10 07:51:08 2007
@@ -51,6 +51,11 @@
return old;
}
+SequenceNumber SequenceNumber::operator+(uint32_t i) const
+{
+ return SequenceNumber(value + i);
+}
+
bool SequenceNumber::operator<(const SequenceNumber& other) const
{
return (value - other.value) < 0;
@@ -59,6 +64,16 @@
bool SequenceNumber::operator>(const SequenceNumber& other) const
{
return other < *this;
+}
+
+bool SequenceNumber::operator<=(const SequenceNumber& other) const
+{
+ return *this == other || *this < other;
+}
+
+bool SequenceNumber::operator>=(const SequenceNumber& other) const
+{
+ return *this == other || *this > other;
}
namespace qpid {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h Fri Aug 10 07:51:08 2007
@@ -39,10 +39,13 @@
SequenceNumber& operator++();//prefix ++
const SequenceNumber operator++(int);//postfix ++
+ SequenceNumber operator+(uint32_t) const;
bool operator==(const SequenceNumber& other) const;
bool operator!=(const SequenceNumber& other) const;
bool operator<(const SequenceNumber& other) const;
bool operator>(const SequenceNumber& other) const;
+ bool operator<=(const SequenceNumber& other) const;
+ bool operator>=(const SequenceNumber& other) const;
uint32_t getValue() const { return (uint32_t) value; }
friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/AccumulatedAckTest.cpp Fri Aug 10 07:51:08 2007
@@ -29,77 +29,182 @@
class AccumulatedAckTest : public CppUnit::TestCase
{
- CPPUNIT_TEST_SUITE(AccumulatedAckTest);
- CPPUNIT_TEST(testGeneral);
- CPPUNIT_TEST(testCovers);
- CPPUNIT_TEST(testUpdateAndConsolidate);
- CPPUNIT_TEST_SUITE_END();
-
- public:
- void testGeneral()
- {
- AccumulatedAck ack(0);
- ack.clear();
- ack.update(3,3);
- ack.update(7,7);
- ack.update(9,9);
- ack.update(1,2);
- ack.update(4,5);
- ack.update(6,6);
-
- for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i));
- CPPUNIT_ASSERT(ack.covers(9));
-
- CPPUNIT_ASSERT(!ack.covers(8));
- CPPUNIT_ASSERT(!ack.covers(10));
-
- ack.consolidate();
-
- for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i));
- CPPUNIT_ASSERT(ack.covers(9));
-
- CPPUNIT_ASSERT(!ack.covers(8));
- CPPUNIT_ASSERT(!ack.covers(10));
+ CPPUNIT_TEST_SUITE(AccumulatedAckTest);
+ CPPUNIT_TEST(testGeneral);
+ CPPUNIT_TEST(testCovers);
+ CPPUNIT_TEST(testCase1);
+ CPPUNIT_TEST(testCase2);
+ CPPUNIT_TEST(testCase3);
+ CPPUNIT_TEST(testCase4);
+ CPPUNIT_TEST(testConsolidation1);
+ CPPUNIT_TEST(testConsolidation2);
+ CPPUNIT_TEST(testConsolidation3);
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+ bool covers(const AccumulatedAck& ack, int i)
+ {
+ return ack.covers(DeliveryId(i));
+ }
+
+ void update(AccumulatedAck& ack, int start, int end)
+ {
+ ack.update(DeliveryId(start), DeliveryId(end));
+ }
+
+ void testGeneral()
+ {
+ AccumulatedAck ack(0);
+ ack.clear();
+ update(ack, 3,3);
+ update(ack, 7,7);
+ update(ack, 9,9);
+ update(ack, 1,2);
+ update(ack, 4,5);
+ update(ack, 6,6);
+
+ for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(covers(ack, i));
+ CPPUNIT_ASSERT(covers(ack, 9));
+
+ CPPUNIT_ASSERT(!covers(ack, 8));
+ CPPUNIT_ASSERT(!covers(ack, 10));
+
+ ack.consolidate();
+
+ for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(covers(ack, i));
+ CPPUNIT_ASSERT(covers(ack, 9));
+
+ CPPUNIT_ASSERT(!covers(ack, 8));
+ CPPUNIT_ASSERT(!covers(ack, 10));
+ }
+
+ void testCovers()
+ {
+ AccumulatedAck ack(5);
+ update(ack, 7, 7);
+ update(ack, 9, 9);
+
+ CPPUNIT_ASSERT(covers(ack, 1));
+ CPPUNIT_ASSERT(covers(ack, 2));
+ CPPUNIT_ASSERT(covers(ack, 3));
+ CPPUNIT_ASSERT(covers(ack, 4));
+ CPPUNIT_ASSERT(covers(ack, 5));
+ CPPUNIT_ASSERT(covers(ack, 7));
+ CPPUNIT_ASSERT(covers(ack, 9));
+
+ CPPUNIT_ASSERT(!covers(ack, 6));
+ CPPUNIT_ASSERT(!covers(ack, 8));
+ CPPUNIT_ASSERT(!covers(ack, 10));
+ }
+
+ void testCase1()
+ {
+ AccumulatedAck ack(3);
+ update(ack, 1,2);
+ for(int i = 1; i <= 3; i++) CPPUNIT_ASSERT(covers(ack, i));
+ CPPUNIT_ASSERT(!covers(ack, 4));
+ }
+
+ void testCase2()
+ {
+ AccumulatedAck ack(3);
+ update(ack, 3,6);
+ for(int i = 1; i <= 6; i++) CPPUNIT_ASSERT(covers(ack, i));
+ CPPUNIT_ASSERT(!covers(ack, 7));
+ }
+
+ void testCase3()
+ {
+ AccumulatedAck ack(3);
+ update(ack, 4,6);
+ for(int i = 1; i <= 6; i++) {
+ CPPUNIT_ASSERT(covers(ack, i));
}
+ CPPUNIT_ASSERT(!covers(ack, 7));
+ }
- void testCovers()
- {
- AccumulatedAck ack(5);
- ack.individual.push_back(7);
- ack.individual.push_back(9);
-
- CPPUNIT_ASSERT(ack.covers(1));
- CPPUNIT_ASSERT(ack.covers(2));
- CPPUNIT_ASSERT(ack.covers(3));
- CPPUNIT_ASSERT(ack.covers(4));
- CPPUNIT_ASSERT(ack.covers(5));
- CPPUNIT_ASSERT(ack.covers(7));
- CPPUNIT_ASSERT(ack.covers(9));
-
- CPPUNIT_ASSERT(!ack.covers(6));
- CPPUNIT_ASSERT(!ack.covers(8));
- CPPUNIT_ASSERT(!ack.covers(10));
+ void testCase4()
+ {
+ AccumulatedAck ack(3);
+ update(ack, 5,6);
+ for(int i = 1; i <= 6; i++) {
+ if (i == 4) CPPUNIT_ASSERT(!covers(ack, i));
+ else CPPUNIT_ASSERT(covers(ack, i));
}
+ CPPUNIT_ASSERT(!covers(ack, 7));
+ }
+
+ void testConsolidation1()
+ {
+ AccumulatedAck ack(3);
+ update(ack, 7,7);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+ update(ack, 8,9);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+ update(ack, 1,2);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+ update(ack, 4,5);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 5, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+ update(ack, 6,6);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 9, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size());
+
+ for(int i = 1; i <= 9; i++) CPPUNIT_ASSERT(covers(ack, i));
+ CPPUNIT_ASSERT(!covers(ack, 10));
+ }
+
+ void testConsolidation2()
+ {
+ AccumulatedAck ack(0);
+ update(ack, 10,12);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+
+ update(ack, 7,9);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 7, ack.ranges.front().start.getValue());
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.ranges.front().end.getValue());
+
+ update(ack, 5,7);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 5, ack.ranges.front().start.getValue());
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.ranges.front().end.getValue());
+
+ update(ack, 3,4);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size());
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.ranges.front().start.getValue());
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.ranges.front().end.getValue());
+
+ update(ack, 1,2);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size());
+
+ for(int i = 1; i <= 12; i++) CPPUNIT_ASSERT(covers(ack, i));
+ CPPUNIT_ASSERT(!covers(ack, 13));
+ }
+
+ void testConsolidation3()
+ {
+ AccumulatedAck ack(0);
+ update(ack, 10,12);
+ update(ack, 6,7);
+ update(ack, 3,4);
+ update(ack, 1,15);
+ CPPUNIT_ASSERT_EQUAL((uint32_t) 15, ack.mark.getValue());
+ CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size());
+ }
- void testUpdateAndConsolidate()
- {
- AccumulatedAck ack(0);
- ack.update(1, 1);
- ack.update(3, 3);
- ack.update(10, 10);
- ack.update(8, 8);
- ack.update(6, 6);
- ack.update(3, 3);
- ack.update(2, 2);
- ack.update(0, 5);
- ack.consolidate();
- CPPUNIT_ASSERT_EQUAL((uint64_t) 6, ack.range);
- CPPUNIT_ASSERT_EQUAL((size_t) 2, ack.individual.size());
- list<uint64_t>::iterator i = ack.individual.begin();
- CPPUNIT_ASSERT_EQUAL((uint64_t) 8, *i);
- i++;
- CPPUNIT_ASSERT_EQUAL((uint64_t) 10, *i);
- }
};
// Make this test suite a plugin.
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp Fri Aug 10 07:51:08 2007
@@ -77,9 +77,9 @@
}
//assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
- acked.range = 5;
- acked.individual.push_back(7);
- acked.individual.push_back(9);
+ acked.mark = 5;
+ acked.update(7, 7);
+ acked.update(9, 9);
}
void testPrepare()
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri Aug 10 07:51:08 2007
@@ -645,7 +645,145 @@
self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
self.assertEmpty(self.client.queue("consumer"))
+
+ def test_credit_flow_messages(self):
+ """
+ Test basic credit based flow control with unit = message
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_consume(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 0, destination = "c")
+ #set credit to zero (can remove this once move to proper default for subscribe method)
+ channel.message_stop(destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(routing_key = "q", body = "Message %d" % i)
+ #set message credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 0, value = 5, destination = "c")
+ #set infinite byte credit
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(6, 11):
+ channel.message_flow(unit = 0, value = 1, destination = "c")
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ def test_credit_flow_bytes(self):
+ """
+ Test basic credit based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_consume(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 0, destination = "c")
+ #set credit to zero (can remove this once move to proper default for subscribe method)
+ channel.message_stop(destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(routing_key = "q", body = "abcdefgh")
+
+ #each message is currently interpreted as requiring 75 bytes of credit
+ #set byte credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 1, value = 75*5, destination = "c")
+ #set infinite message credit
+ channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(6, 11):
+ channel.message_flow(unit = 1, value = 75, destination = "c")
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+
+ def test_window_flow_messages(self):
+ """
+ Test basic window based flow control with unit = message
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_consume(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 1, destination = "c")
+ #set credit to zero (can remove this once move to proper default for subscribe method)
+ channel.message_stop(destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(routing_key = "q", body = "Message %d" % i)
+
+ #set message credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 0, value = 5, destination = "c")
+ #set infinite byte credit
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ msg = q.get(timeout = 1)
+ self.assertDataEquals(channel, msg, "Message %d" % i)
+ self.assertEmpty(q)
+
+ #acknowledge messages and check more are received
+ msg.complete(cumulative=True)
+ for i in range(6, 11):
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+
+ def test_window_flow_bytes(self):
+ """
+ Test basic window based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_consume(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 1, destination = "c")
+ #set credit to zero (can remove this once move to proper default for subscribe method)
+ channel.message_stop(destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(routing_key = "q", body = "abcdefgh")
+
+ #each message is currently interpreted as requiring 75 bytes of credit
+ #set byte credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 1, value = 75*5, destination = "c")
+ #set infinite message credit
+ channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ msgs = []
+ for i in range(1, 6):
+ msg = q.get(timeout = 1)
+ msgs.append(msg)
+ self.assertDataEquals(channel, msg, "abcdefgh")
+ self.assertEmpty(q)
+
+ #ack each message individually and check more are received
+ for i in range(6, 11):
+ msg = msgs.pop()
+ msg.complete(cumulative=False)
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+
def assertDataEquals(self, channel, msg, expected):
if isinstance(msg.body, ReferenceId):
Modified: incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml?view=diff&rev=564611&r1=564610&r2=564611
==============================================================================
--- incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml (original)
+++ incubator/qpid/trunk/qpid/specs/amqp-transitional.0-10.xml Fri Aug 10 07:51:08 2007
@@ -6900,6 +6900,128 @@
</doc>
</field>
</method>
+ <!-- - Method: message.flow-mode - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="flow-mode" index="120" label="set the flow control mode">
+ <doc>
+ Sets the mode of flow control used for a given destination.
+
+ With credit based flow control, the sender of messages continually maintains its current
+ credit balance with the recipient. The credit balance consists of two values, a message
+ count, and a byte count. Whenever message data is sent, both counts must be decremented.
+ If either value reaches zero, the flow of message data must stop. Additional credit is
+ received via the message.flow method.
+
+ The sender MUST NOT send partial framesets. This means that if there is not enough byte
+ credit available to send a complete message, the sender must either wait or use chunked
+ transfer to send the first part of the message data in a complete frameset.
+
+ Window based flow control is identical to credit based flow control, however message
+ acknowledgment implicitly grants a single unit of message credit, and the size of the
+ message in byte credits for each acknowledged message.
+ </doc>
+
+ <rule name="byte-accounting">
+ <doc>
+ The byte count is decremented by the payload size of each transmitted frame with
+ segment type header or body appearing within a message.transfer command. Note that
+ the payload size is the frame size less the frame header size (frame-size - 12).
+ </doc>
+ </rule>
+
+ <rule name="mode-switching">
+ <doc>
+ Mode switching may only occur if both outstanding credit balances are zero. There are
+ three ways for a recipient of messages to be sure that the sender 's credit balance is
+ zero:
+
+ 1) The recipient may send a message.stop command to the sender. When the recipient
+ receives confirmation of completion for the message.stop command, it knows that the
+ sender's credit is zero.
+
+ 2) The recipient may perform the same steps described in (1) with the message.flush
+ command substituted for the message.stop command.
+
+ 3) Immediately after receiving a message.consume, the credit for that destination
+ defaults to zero.
+ </doc>
+ </rule>
+
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="destination" domain="destination" />
+ <field name="mode" domain="octet">
+ <doc>
+ One of:
+ - credit (0): choose credit based flow control
+ - window (1): choose window based flow control
+ </doc>
+ </field>
+ </method>
+
+ <!-- - Method: message.flow - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="flow" index="130" label="control message flow">
+ <doc>
+ This method controls the flow of message data to a given destination. It is used by the
+ recipient of messages to dynamically match the incoming rate of message flow to its
+ processing or forwarding capacity. Upon receipt of this method, the sender must add "value"
+ number of the specified unit to the available credit balance for the specified destination.
+ A value of (0xFFFFFFFF) indicates an infinite amount of credit. This disables any limit for
+ the given unit until the credit balance is zeroed with message.stop or message.flush.
+ </doc>
+
+ <!-- throws no-such-destination -->
+
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="destination" domain="destination"/>
+ <field name="unit" domain="octet">
+ <doc>
+ Specifies the unit of credit balance.
+
+ One of:
+ - message (0)
+ - byte (1)
+ </doc>
+ </field>
+ <field name="value" domain="long">
+ <doc>
+ A value of (0xFFFFFFFF) indicates an infinite amount of credit.
+ </doc>
+ </field>
+ </method>
+
+ <!-- - Method: message.flush - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="flush" index="140">
+ <doc>
+ Forces the sender to exhaust his credit supply. The sender's credit will always be zero when
+ this method completes. The message does not complete until all the message transfers occur.
+ </doc>
+
+ <chassis name="server" implement="MUST" />
+
+ <field name="destination" domain="destination" />
+ </method>
+
+ <!-- - Method: message.stop - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+
+ <method name="stop" index="150">
+ <doc>
+ On receipt of this method, a producer of messages MUST set his credit to zero for the given
+ destination. This obeys the generic semantics of command completion, i.e. when confirmation
+ is issued credit MUST be zero and no further messages will be sent until such a time as
+ further credit is received.
+ </doc>
+
+ <chassis name="server" implement="MUST" />
+ <chassis name="client" implement="MUST" />
+
+ <field name="destination" domain="destination" />
+ </method>
</class>