You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/05/06 19:58:50 UTC
svn commit: r772384 - in /qpid/trunk/qpid/cpp/src:
qpid/broker/DeliveryRecord.cpp qpid/broker/DeliveryRecord.h
qpid/broker/SemanticState.cpp qpid/broker/SemanticState.h
qpid/broker/TxAccept.cpp tests/DeliveryRecordTest.cpp
Author: aconway
Date: Wed May 6 17:58:50 2009
New Revision: 772384
URL: http://svn.apache.org/viewvc?rev=772384&view=rev
Log:
DeliveryRecord optimizations.
Replace linear search with binary search.
Collapse multi-pass mark-then-erase to a signle pass.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed May 6 17:58:50 2009
@@ -48,29 +48,13 @@
credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
{}
-void DeliveryRecord::setEnded()
+bool DeliveryRecord::setEnded()
{
ended = true;
//reset msg pointer, don't need to hold on to it anymore
msg.payload = boost::intrusive_ptr<Message>();
-
QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id);
-}
-
-bool DeliveryRecord::matches(DeliveryId tag) const{
- return id == tag;
-}
-
-bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
- return matches(tag) || after(tag);
-}
-
-bool DeliveryRecord::after(DeliveryId tag) const{
- return id > tag;
-}
-
-bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{
- return range->contains(id);
+ return isRedundant();
}
void DeliveryRecord::redeliver(SemanticState* const session) {
@@ -120,17 +104,17 @@
}
}
-void DeliveryRecord::complete()
-{
+void DeliveryRecord::complete() {
completed = true;
}
-void DeliveryRecord::accept(TransactionContext* ctxt) {
+bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
+ return isRedundant();
}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
@@ -179,18 +163,10 @@
AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last)
{
- DeliveryRecords::iterator start = find_if(records.begin(), records.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
- DeliveryRecords::iterator end = start;
-
- if (start != records.end()) {
- if (first == last) {
- //just acked single element (move end past it)
- ++end;
- } else {
- //need to find end (position it just after the last record in range)
- end = find_if(start, records.end(), boost::bind(&DeliveryRecord::after, _1, last));
- }
- }
+ DeliveryRecords::iterator start = lower_bound(records.begin(), records.end(), first);
+ // Find end - position it just after the last record in range
+ DeliveryRecords::iterator end = lower_bound(records.begin(), records.end(), last);
+ if (end != records.end() && end->getId() == last) ++end;
return AckRange(start, end);
}
@@ -206,9 +182,5 @@
return out;
}
-bool operator<(const DeliveryRecord& a, const DeliveryRecord& b)
-{
- return a.id < b.id;
-}
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Wed May 6 17:58:50 2009
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_DELIVERYRECORD_H
+#define QPID_BROKER_DELIVERYRECORD_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,11 +21,9 @@
* under the License.
*
*/
-#ifndef _DeliveryRecord_
-#define _DeliveryRecord_
#include <algorithm>
-#include <list>
+#include <deque>
#include <vector>
#include <ostream>
#include "qpid/framing/SequenceSet.h"
@@ -44,15 +45,14 @@
{
QueuedMessage msg;
mutable Queue::shared_ptr queue;
- const std::string tag;
+ std::string tag;
DeliveryId id;
- bool acquired;
- bool acceptExpected;
- bool cancelled;
-
- bool completed;
- bool ended;
- const bool windowing;
+ bool acquired : 1;
+ bool acceptExpected : 1;
+ bool cancelled : 1;
+ bool completed : 1;
+ bool ended : 1;
+ bool windowing : 1;
/**
* Record required credit on construction as the pointer to the
@@ -61,7 +61,7 @@
* to reallocate credit when it is completed (which could happen
* after that).
*/
- const uint32_t credit;
+ uint32_t credit;
public:
QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -73,10 +73,7 @@
uint32_t credit=0 // Only used if msg is empty.
);
- QPID_BROKER_EXTERN bool matches(DeliveryId tag) const;
- bool matchOrAfter(DeliveryId tag) const;
- bool after(DeliveryId tag) const;
- bool coveredBy(const framing::SequenceSet* const range) const;
+ bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
@@ -86,8 +83,8 @@
void redeliver(SemanticState* const);
void acquire(DeliveryIds& results);
void complete();
- void accept(TransactionContext* ctxt);
- void setEnded();
+ bool accept(TransactionContext* ctxt); // Returns isRedundant()
+ bool setEnded(); // Returns isRedundant()
void committed() const;
bool isAcquired() const { return acquired; }
@@ -104,15 +101,19 @@
void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
void setId(DeliveryId _id) { id = _id; }
- typedef std::list<DeliveryRecord> DeliveryRecords;
+ typedef std::deque<DeliveryRecord> DeliveryRecords;
static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last);
const QueuedMessage& getMessage() const { return msg; }
framing::SequenceNumber getId() const { return id; }
Queue::shared_ptr getQueue() const { return queue; }
- friend QPID_BROKER_EXTERN bool operator<(const DeliveryRecord&, const DeliveryRecord&);
+
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
+inline bool operator<(const DeliveryRecord& a, const DeliveryRecord& b) { return a.getId() < b.getId(); }
+inline bool operator<(const framing::SequenceNumber& a, const DeliveryRecord& b) { return a < b.getId(); }
+inline bool operator<(const DeliveryRecord& a, const framing::SequenceNumber& b) { return a.getId() < b; }
+
struct AcquireFunctor
{
DeliveryIds& results;
@@ -138,4 +139,4 @@
}
-#endif
+#endif /*!QPID_BROKER_DELIVERYRECORD_H*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed May 6 17:58:50 2009
@@ -408,12 +408,13 @@
outputTasks.activateOutput();
}
-void SemanticState::complete(DeliveryRecord& delivery)
+bool SemanticState::complete(DeliveryRecord& delivery)
{
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
i->second->complete(delivery);
}
+ return delivery.isRedundant();
}
void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
@@ -440,7 +441,7 @@
//unconfirmed messages re redelivered and therefore have their
//id adjusted, confirmed messages are not and so the ordering
//w.r.t id is lost
- unacked.sort();
+ sort(unacked.begin(), unacked.end());
}
}
@@ -638,24 +639,23 @@
dtxBuffer->enlist(txAck);
//mark the relevant messages as 'ended' in unacked
- for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
-
//if the messages are already completed, they can be
//removed from the record
- unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
-
+ DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
+ unacked.erase(removed, range.end);
}
} else {
- for_each(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
- unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+ DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
+ unacked.erase(removed, range.end);
}
}
void SemanticState::completed(DeliveryId first, DeliveryId last)
{
AckRange range = findRange(first, last);
- for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
- unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+
+ DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
+ unacked.erase(removed, range.end);
requestDispatch();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed May 6 17:58:50 2009
@@ -151,7 +151,7 @@
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
void checkDtxTimeout();
- void complete(DeliveryRecord&);
+ bool complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void requestDispatch(ConsumerImpl&);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Wed May 6 17:58:50 2009
@@ -58,15 +58,10 @@
std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
//now remove if isRedundant():
if (!ranges.empty()) {
- DeliveryRecords::iterator i = ranges.front().range.start;
+ DeliveryRecords::iterator begin = ranges.front().range.start;
DeliveryRecords::iterator end = ranges.back().range.end;
- while (i != end) {
- if (i->isRedundant()) {
- i = unacked.erase(i);
- } else {
- i++;
- }
- }
+ DeliveryRecords::iterator removed = remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
+ unacked.erase(removed, end);
}
}
Modified: qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp Wed May 6 17:58:50 2009
@@ -53,7 +53,7 @@
SequenceNumber expected(0);
for (list<DeliveryRecord>::iterator i = records.begin(); i != records.end(); i++) {
- BOOST_CHECK(i->matches(++expected));
+ BOOST_CHECK(i->getId() == ++expected);
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org