You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/03 19:35:36 UTC
svn commit: r572394 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/src/qpid/framing/ cpp/src/tests/ python/ python/qpid/ python/tests_0-10/
Author: gsim
Date: Mon Sep 3 10:35:35 2007
New Revision: 572394
URL: http://svn.apache.org/viewvc?rev=572394&view=rev
Log:
Initial implementation (plus very simple tests) of message.acquire, message.release, message.reject and message.flush.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h
incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
incubator/qpid/trunk/qpid/python/qpid/client.py
incubator/qpid/trunk/qpid/python/qpid/peer.py
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h Mon Sep 3 10:35:35 2007
@@ -57,7 +57,7 @@
*/
std::list<Range> ranges;
- AccumulatedAck(DeliveryId r) : mark(r) {}
+ explicit AccumulatedAck(DeliveryId r) : mark(r) {}
void update(DeliveryId firstTag, DeliveryId lastTag);
void consolidate();
void clear();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Mon Sep 3 10:35:35 2007
@@ -306,7 +306,7 @@
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- session.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields);
+ session.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
if(!nowait) client.consumeOk(newTag);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Mon Sep 3 10:35:35 2007
@@ -112,31 +112,50 @@
}
-
-void Queue::requestDispatch(){
- serializer.execute(dispatchCallback);
+bool Queue::acquire(const QueuedMessage& msg) {
+ Mutex::ScopedLock locker(messageLock);
+ for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (i->position == msg.position) {
+ messages.erase(i);
+ return true;
+ }
+ }
+ return false;
}
+void Queue::requestDispatch(Consumer* c, bool sync){
+ if (!c || c->preAcquires()) {
+ if (sync) {
+ serializer.dispatch();
+ } else {
+ serializer.execute(dispatchCallback);
+ }
+ } else {
+ //note: this is always done on the callers thread, regardless
+ // of sync; browsers of large queues should use flow control!
+ serviceBrowser(c);
+ }
+}
bool Queue::dispatch(QueuedMessage& msg){
RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
- if(consumers.empty()){
+ if(acquirers.empty()){
return false;
}else if(exclusive){
return exclusive->deliver(msg);
}else{
//deliver to next consumer
- next = next % consumers.size();
- Consumer* c = consumers[next];
+ next = next % acquirers.size();
+ Consumer* c = acquirers[next];
int start = next;
while(c){
next++;
if(c->deliver(msg)) return true;
- next = next % consumers.size();
- c = next == start ? 0 : consumers[next];
+ next = next % acquirers.size();
+ c = next == start ? 0 : acquirers[next];
}
return false;
}
@@ -153,34 +172,79 @@
}
if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
pop();
- } else {
+ } else {
break;
}
- }
+ }
+ RWlock::ScopedRlock locker(consumerLock);
+ for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) {
+ serviceBrowser(*i);
+ }
+}
+
+void Queue::serviceBrowser(Consumer* browser)
+{
+ //This is a poorly performing implementation:
+ //
+ // * bad concurrency where browsers exist
+ // * inefficient for largish queues
+ //
+ //The queue needs to be based on a current data structure that
+ //does not invalidate iterators when modified. Subscribers could
+ //then use an iterator to continue from where they left off
+
+ Mutex::ScopedLock locker(messageLock);
+ if (!messages.empty() && messages.back().position > browser->position) {
+ for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (i->position > browser->position) {
+ if (browser->deliver(*i)) {
+ browser->position = i->position;
+ } else {
+ break;
+ }
+ }
+ }
+ }
}
void Queue::consume(Consumer* c, bool requestExclusive){
RWlock::ScopedWlock locker(consumerLock);
- if(exclusive)
+ if(exclusive) {
throw ChannelException(
403, format("Queue '%s' has an exclusive consumer."
" No more consumers allowed.") % getName());
+ }
if(requestExclusive) {
- if(!consumers.empty())
+ if(acquirers.empty() && browsers.empty()) {
+ exclusive = c;
+ } else {
throw ChannelException(
- 403, format("Queue '%s' already has conumers."
- "Exclusive access denied.") %getName());
- exclusive = c;
+ 403, format("Queue '%s' already has consumers."
+ "Exclusive access denied.") % getName());
+ }
+ }
+ if (c->preAcquires()) {
+ acquirers.push_back(c);
+ } else {
+ browsers.push_back(c);
}
- consumers.push_back(c);
}
void Queue::cancel(Consumer* c){
RWlock::ScopedWlock locker(consumerLock);
+ if (c->preAcquires()) {
+ cancel(c, acquirers);
+ } else {
+ cancel(c, browsers);
+ }
+ if(exclusive == c) exclusive = 0;
+}
+
+void Queue::cancel(Consumer* c, Consumers& consumers)
+{
Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
if (i != consumers.end())
consumers.erase(i);
- if(exclusive == c) exclusive = 0;
}
QueuedMessage Queue::dequeue(){
@@ -233,12 +297,12 @@
uint32_t Queue::getConsumerCount() const{
RWlock::ScopedRlock locker(consumerLock);
- return consumers.size();
+ return acquirers.size() + browsers.size();
}
bool Queue::canAutoDelete() const{
RWlock::ScopedRlock locker(consumerLock);
- return autodelete && consumers.size() == 0;
+ return autodelete && acquirers.empty() && browsers.empty();
}
// return true if store exists,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Mon Sep 3 10:35:35 2007
@@ -68,7 +68,8 @@
const bool autodelete;
MessageStore* const store;
const ConnectionToken* const owner;
- Consumers consumers;
+ Consumers acquirers;
+ Consumers browsers;
Messages messages;
int next;
mutable qpid::sys::RWlock consumerLock;
@@ -91,6 +92,8 @@
* only called by serilizer
*/
void dispatch();
+ void cancel(Consumer* c, Consumers& set);
+ void serviceBrowser(Consumer* c);
protected:
/**
@@ -114,6 +117,9 @@
void destroy();
void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args);
void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
+
+ bool acquire(const QueuedMessage& msg);
+
/**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
@@ -141,7 +147,7 @@
* at any time, so this call schedules the despatch based on
* the serilizer policy.
*/
- void requestDispatch();
+ void requestDispatch(Consumer* c = 0, bool sync = false);
void consume(Consumer* c, bool exclusive = false);
void cancel(Consumer* c);
uint32_t purge();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Mon Sep 3 10:35:35 2007
@@ -36,8 +36,13 @@
};
- class Consumer{
+ class Consumer {
+ const bool acquires;
public:
+ framing::SequenceNumber position;
+
+ Consumer(bool preAcquires = true) : acquires(preAcquires) {}
+ bool preAcquires() const { return acquires; }
virtual bool deliver(QueuedMessage& msg) = 0;
virtual ~Consumer(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Sep 3 10:35:35 2007
@@ -19,7 +19,9 @@
*
*/
#include "DeliveryRecord.h"
+#include "DeliverableMessage.h"
#include "Session.h"
+#include "qpid/log/Statement.h"
using namespace qpid::broker;
using std::string;
@@ -27,29 +29,32 @@
DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
Queue::shared_ptr _queue,
const string _consumerTag,
- const DeliveryId _deliveryTag) : msg(_msg),
+ const DeliveryId _id,
+ bool _acquired) : msg(_msg),
queue(_queue),
consumerTag(_consumerTag),
- deliveryTag(_deliveryTag),
- acquired(false),
+ id(_id),
+ acquired(_acquired),
pull(false){}
DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
Queue::shared_ptr _queue,
- const DeliveryId _deliveryTag) : msg(_msg),
+ const DeliveryId _id) : msg(_msg),
queue(_queue),
consumerTag(""),
- deliveryTag(_deliveryTag),
- acquired(false),
+ id(_id),
+ acquired(true),
pull(true){}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
- queue->dequeue(ctxt, msg.payload);
+ if (acquired) {
+ queue->dequeue(ctxt, msg.payload);
+ }
}
bool DeliveryRecord::matches(DeliveryId tag) const{
- return deliveryTag == tag;
+ return id == tag;
}
bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
@@ -57,11 +62,11 @@
}
bool DeliveryRecord::after(DeliveryId tag) const{
- return deliveryTag > tag;
+ return id > tag;
}
bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
- return range->covers(deliveryTag);
+ return range->covers(id);
}
void DeliveryRecord::redeliver(Session* const session) const{
@@ -69,15 +74,36 @@
//if message was originally sent as response to get, we must requeue it
requeue();
}else{
- session->deliver(msg.payload, consumerTag, deliveryTag);
+ session->deliver(msg.payload, consumerTag, id);
}
}
-void DeliveryRecord::requeue() const{
+void DeliveryRecord::requeue() const
+{
msg.payload->redeliver();
queue->requeue(msg);
}
+void DeliveryRecord::release()
+{
+ queue->requeue(msg);
+ acquired = false;
+}
+
+void DeliveryRecord::reject()
+{
+ Exchange::shared_ptr alternate = queue->getAlternateExchange();
+ if (alternate) {
+ DeliverableMessage delivery(msg.payload);
+ alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders()));
+ QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " << queue->getName());
+ }
+}
+
void DeliveryRecord::updateByteCredit(uint32_t& credit) const
{
credit += msg.payload->getRequiredCredit();
@@ -102,11 +128,18 @@
}
}
+void DeliveryRecord::acquire(std::vector<DeliveryId>& results) {
+ if (queue->acquire(msg)) {
+ acquired = true;
+ results.push_back(id);
+ }
+}
+
namespace qpid {
namespace broker {
std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) {
- out << "{" << "id=" << r.deliveryTag.getValue();
+ out << "{" << "id=" << r.id.getValue();
out << ", consumer=" << r.consumerTag;
out << ", queue=" << r.queue->getName() << "}";
return out;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Mon Sep 3 10:35:35 2007
@@ -23,6 +23,7 @@
#include <algorithm>
#include <list>
+#include <vector>
#include <ostream>
#include "AccumulatedAck.h"
#include "BrokerQueue.h"
@@ -42,13 +43,14 @@
mutable QueuedMessage msg;
mutable Queue::shared_ptr queue;
const std::string consumerTag;
- const DeliveryId deliveryTag;
+ const DeliveryId id;
bool acquired;
const bool pull;
public:
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
- DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
+ DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag,
+ const DeliveryId id, bool acquired);
+ DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
void dequeue(TransactionContext* ctxt = 0) const;
bool matches(DeliveryId tag) const;
@@ -56,6 +58,8 @@
bool after(DeliveryId tag) const;
bool coveredBy(const AccumulatedAck* const range) const;
void requeue() const;
+ void release();
+ void reject();
void redeliver(Session* const) const;
void updateByteCredit(uint32_t& credit) const;
void addTo(Prefetch&) const;
@@ -63,12 +67,33 @@
const std::string& getConsumerTag() const { return consumerTag; }
bool isPull() const { return pull; }
bool isAcquired() const { return acquired; }
- void setAcquired(bool isAcquired) { acquired = isAcquired; }
+ //void setAcquired(bool isAcquired) { acquired = isAcquired; }
+ void acquire(std::vector<DeliveryId>& results);
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
typedef std::list<DeliveryRecord>::iterator ack_iterator;
+
+struct AckRange
+{
+ ack_iterator start;
+ ack_iterator end;
+ AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {}
+};
+
+struct AcquireFunctor
+{
+ std::vector<DeliveryId>& results;
+
+ AcquireFunctor(std::vector<DeliveryId>& _results) : results(_results) {}
+
+ void operator()(DeliveryRecord& record)
+ {
+ record.acquire(results);
+ }
+};
+
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Mon Sep 3 10:35:35 2007
@@ -25,6 +25,7 @@
#include "MessageDelivery.h"
#include "qpid/framing/MessageAppendBody.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include "BrokerAdapter.h"
#include <boost/format.hpp>
@@ -92,7 +93,7 @@
const string& destination,
bool noLocal,
u_int8_t confirmMode,
- u_int8_t acquireMode,//TODO: implement acquire modes
+ u_int8_t acquireMode,
bool exclusive,
const framing::FieldTable& filter )
{
@@ -101,8 +102,10 @@
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
+ //NB: am assuming pre-acquired = 0 as discussed on SIG list as is
+ //the previously expected behaviour
session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
- tag, queue, noLocal, confirmMode == 1, exclusive, &filter);
+ tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -156,9 +159,15 @@
}
void
-MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*code*/, const string& /*text*/ )
+MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ )
{
- //TODO: implement
+ if (transfers.size() % 2) { //must be even number
+ throw InvalidArgumentException("Received odd number of elements in list of transfers");
+ }
+
+ for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+ session.reject(i->getValue(), (++i)->getValue());
+ }
}
void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
@@ -200,14 +209,31 @@
session.stop(destination);
}
-void MessageHandlerImpl::acquire(const SequenceNumberSet& /*transfers*/, u_int8_t /*mode*/)
+void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
{
- throw ConnectionException(540, "Not yet implemented");
+ SequenceNumberSet results;
+
+ if (transfers.size() % 2) { //must be even number
+ throw InvalidArgumentException("Received odd number of elements in list of transfers");
+ }
+
+ for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+ session.acquire(i->getValue(), (++i)->getValue(), results);
+ }
+
+ results = results.condense();
+ client.acquired(results);
}
-void MessageHandlerImpl::release(const SequenceNumberSet& /*transfers*/)
+void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
{
- throw ConnectionException(540, "Not yet implemented");
+ if (transfers.size() % 2) { //must be even number
+ throw InvalidArgumentException("Received odd number of elements in list of transfers");
+ }
+
+ for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
+ session.release(i->getValue(), (++i)->getValue());
+ }
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Mon Sep 3 10:35:35 2007
@@ -34,6 +34,7 @@
#include "TxPublish.h"
#include "qpid/QpidError.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -91,12 +92,12 @@
}
void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool nolocal, bool acks,
+ Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire,
bool exclusive, const FieldTable*)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
+ std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -239,7 +240,9 @@
bool ack,
bool _nolocal,
bool _acquire
-) : parent(_parent),
+ ) :
+ Consumer(_acquire),
+ parent(_parent),
token(_token),
name(_name),
queue(_queue),
@@ -266,7 +269,7 @@
DeliveryId deliveryTag =
parent->deliveryAdapter->deliver(msg.payload, token);
if (ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
+ parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire));
}
}
return !blocked;
@@ -312,7 +315,7 @@
void Session::ConsumerImpl::requestDispatch()
{
if(blocked)
- queue->requestDispatch();
+ queue->requestDispatch(this);
}
void Session::handle(Message::shared_ptr msg) {
@@ -532,9 +535,7 @@
void Session::ConsumerImpl::flush()
{
- //TODO: need to wait until any messages that are available for
- //this consumer have been delivered... i.e. some sort of flush on
- //the queue...
+ queue->requestDispatch(this, true);
}
void Session::ConsumerImpl::stop()
@@ -557,6 +558,44 @@
throw NotFoundException(QPID_MSG("Queue not found: "<<name));
}
return queue;
+}
+
+AckRange Session::findRange(DeliveryId first, DeliveryId last)
+{
+ ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
+
+ if (first == last) {
+ //just acked single element (move end past it)
+ ++end;
+ } else {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ }
+ return AckRange(start, end);
+}
+
+void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, AcquireFunctor(acquired));
+}
+
+void Session::release(DeliveryId first, DeliveryId last)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
+}
+
+void Session::reject(DeliveryId first, DeliveryId last)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject));
+ //need to remove the delivery records as well
+ unacked.erase(range.start, range.end);
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Mon Sep 3 10:35:35 2007
@@ -40,6 +40,7 @@
#include <boost/ptr_container/ptr_vector.hpp>
#include <list>
+#include <vector>
namespace qpid {
@@ -80,7 +81,7 @@
public:
ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
- bool ack, bool nolocal, bool acquire=true);
+ bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
@@ -131,6 +132,8 @@
// FIXME aconway 2007-08-31: remove, temporary hack.
SemanticHandler* semanticHandler;
+
+ AckRange findRange(DeliveryId first, DeliveryId last);
public:
@@ -166,7 +169,7 @@
*@param tagInOut - if empty it is updated with the generated token.
*/
void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue,
- bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0);
+ bool nolocal, bool acks, bool acquire, bool exclusive, const framing::FieldTable* = 0);
void cancel(const string& tag);
@@ -192,6 +195,9 @@
void recover(bool requeue);
void flow(bool active);
void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag);
+ void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired);
+ void release(DeliveryId first, DeliveryId last);
+ void reject(DeliveryId first, DeliveryId last);
void handle(Message::shared_ptr msg);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp Mon Sep 3 10:35:35 2007
@@ -44,6 +44,22 @@
return 2 /*count*/ + (size() * 4);
}
+SequenceNumberSet SequenceNumberSet::condense() const
+{
+ SequenceNumberSet result;
+ const_iterator last = end();
+ for (const_iterator i = begin(); i != end(); i++) {
+ if (last == end()) {
+ last = i;
+ } else if (*i - *last > 1) {
+ result.push_back(*last);
+ result.push_back(*i);
+ last = end();
+ }
+ }
+ return result;
+}
+
namespace qpid{
namespace framing{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumberSet.h Mon Sep 3 10:35:35 2007
@@ -39,6 +39,7 @@
void encode(Buffer& buffer) const;
void decode(Buffer& buffer);
uint32_t encodedSize() const;
+ SequenceNumberSet condense() const;
friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp Mon Sep 3 10:35:35 2007
@@ -78,7 +78,7 @@
messages.push_back(msg);
QueuedMessage qm;
qm.payload = msg;
- deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1)));
+ deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1), true));
}
//assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Mon Sep 3 10:35:35 2007
@@ -1,4 +1,3 @@
tests_0-10.alternate-exchange.AlternateExchangeTests.test_immediate
-tests_0-10.message.MessageTests.test_reject
tests_0-10.basic.BasicTests.test_get
Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Mon Sep 3 10:35:35 2007
@@ -126,6 +126,9 @@
def message_append(self, ch, msg):
ch.references.get(msg.reference).append(msg.bytes)
+ def message_acquired(self, ch, msg):
+ ch.control_queue.put(msg)
+
def basic_deliver(self, ch, msg):
self.client.queue(msg.consumer_tag).put(msg)
Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Mon Sep 3 10:35:35 2007
@@ -190,6 +190,7 @@
self.completion = OutgoingCompletion()
self.incoming_completion = IncomingCompletion(self)
self.futures = {}
+ self.control_queue = Queue(0)#used for incoming methods that appas may want to handle themselves
# Use reliable framing if version == 0-9.
if spec.major == 0 and spec.minor == 9:
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=572394&r1=572393&r2=572394&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Mon Sep 3 10:35:35 2007
@@ -339,20 +339,19 @@
msg = queue.get(timeout=1)
self.assertEqual(large, msg.content.body)
-
-
def test_reject(self):
channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True)
+ channel.queue_declare(queue = "q", exclusive=True, alternate_exchange="amq.fanout")
+ channel.queue_declare(queue = "r", exclusive=True)
+ channel.queue_bind(queue = "r", exchange = "amq.fanout")
- channel.message_subscribe(queue = "q", destination = "consumer")
+ channel.message_subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
msg = self.client.queue("consumer").get(timeout = 1)
self.assertEquals(msg.content.body, "blah, blah")
- channel.message_cancel(destination = "consumer")
- msg.reject()
+ channel.message_reject([msg.command_id, msg.command_id])
- channel.message_subscribe(queue = "q", destination = "checker")
+ channel.message_subscribe(queue = "r", destination = "checker")
msg = self.client.queue("checker").get(timeout = 1)
self.assertEquals(msg.content.body, "blah, blah")
@@ -492,6 +491,71 @@
msg.complete(cumulative=False)
self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
+
+ def test_subscribe_not_acquired(self):
+ """
+ Test the not-acquired modes works as expected for a simple case
+ """
+ #NOTE: I'm using not-acquired == 1 and pre-acquired == 0 as
+ #that keeps the default behaviour as expected. This was
+ #discussed by the SIG, and I'd rather not change all the
+ #existing tests twice.
+
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ for i in range(1, 6):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+ channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ channel.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+
+ for i in range(6, 11):
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+ #both subscribers should see all messages
+ qA = self.client.queue("a")
+ qB = self.client.queue("b")
+ for i in range(1, 11):
+ for q in [qA, qB]:
+ msg = q.get(timeout = 1)
+ self.assertEquals("Message %s" % i, msg.content.body)
+ msg.complete()
+
+ #messages should still be on the queue:
+ self.assertEquals(10, channel.queue_query(queue = "q").message_count)
+
+ def test_acquire(self):
+ """
+ Test explicit acquire function
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+
+ channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
+ msg = self.client.queue("a").get(timeout = 1)
+ channel.message_acquire([msg.command_id, msg.command_id])
+ msg.complete()
+
+ #message should have been removed from the queue:
+ self.assertEquals(0, channel.queue_query(queue = "q").message_count)
+
+ def test_release(self):
+ """
+ Test explicit release function
+ """
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
+
+ channel.message_subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
+ msg = self.client.queue("a").get(timeout = 1)
+ channel.message_cancel(destination = "a")
+ channel.message_release([msg.command_id, msg.command_id])
+ msg.complete()
+
+ #message should not have been removed from the queue:
+ self.assertEquals(1, channel.queue_query(queue = "q").message_count)
def assertDataEquals(self, channel, msg, expected):
self.assertEquals(expected, msg.content.body)