You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/10/24 14:34:30 UTC
svn commit: r707615 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/broker/DeliveryRecord.cpp qpid/broker/DeliveryRecord.h
qpid/broker/SemanticState.cpp qpid/broker/TxAccept.cpp
qpid/broker/TxAccept.h tests/perftest.cpp
Author: gsim
Date: Fri Oct 24 05:34:29 2008
New Revision: 707615
URL: http://svn.apache.org/viewvc?rev=707615&view=rev
Log:
Revised transactional options to perftest as they could not be used on older boost versions due to ambiguity.
Refactored TxAccept to avoid excessive testing and searching for delivery records.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Oct 24 05:34:29 2008
@@ -171,6 +171,24 @@
cancelled = true;
}
+AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last)
+{
+ ack_iterator start = find_if(records.begin(), records.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
+ ack_iterator end = start;
+
+ if (start != records.end()) {
+ if (first == last) {
+ //just acked single element (move end past it)
+ ++end;
+ } else {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, records.end(), boost::bind(&DeliveryRecord::after, _1, last));
+ }
+ }
+ return AckRange(start, end);
+}
+
+
namespace qpid {
namespace broker {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Oct 24 05:34:29 2008
@@ -34,11 +34,24 @@
namespace qpid {
namespace broker {
class SemanticState;
+class DeliveryRecord;
+
+typedef std::list<DeliveryRecord> DeliveryRecords;
+typedef std::list<DeliveryRecord>::iterator ack_iterator;
+
+struct AckRange
+{
+ ack_iterator start;
+ ack_iterator end;
+ AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {}
+};
+
/**
* Record of a delivery for which an ack is outstanding.
*/
-class DeliveryRecord{
+class DeliveryRecord
+{
QueuedMessage msg;
mutable Queue::shared_ptr queue;
const std::string tag;
@@ -91,24 +104,14 @@
void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
void setId(DeliveryId _id) { id = _id; }
+ static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last);
const QueuedMessage& getMessage() const { return msg; }
framing::SequenceNumber getId() const { return id; }
Queue::shared_ptr getQueue() const { return queue; }
-
friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
-typedef std::list<DeliveryRecord> DeliveryRecords;
-typedef std::list<DeliveryRecord>::iterator ack_iterator;
-
-struct AckRange
-{
- ack_iterator start;
- ack_iterator end;
- AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {}
-};
-
struct AcquireFunctor
{
DeliveryIds& results;
@@ -120,7 +123,6 @@
record.acquire(results);
}
};
-
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 24 05:34:29 2008
@@ -549,20 +549,8 @@
}
AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{
- ack_iterator start = find_if(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
- ack_iterator end = start;
-
- if (start != unacked.end()) {
- if (first == last) {
- //just acked single element (move end past it)
- ++end;
- } else {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), boost::bind(&DeliveryRecord::after, _1, last));
- }
- }
- return AckRange(start, end);
+{
+ return DeliveryRecord::findRange(unacked, first, last);
}
void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Fri Oct 24 05:34:29 2008
@@ -26,19 +26,60 @@
using std::mem_fun_ref;
using namespace qpid::broker;
using qpid::framing::SequenceSet;
+using qpid::framing::SequenceNumber;
+
+TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {}
+
+void TxAccept::RangeOp::prepare(TransactionContext* ctxt)
+{
+ for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt));
+}
+
+void TxAccept::RangeOp::commit()
+{
+ for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1));
+}
+
+TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {}
+
+void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end)
+{
+ ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end)));
+}
+
+void TxAccept::RangeOps::prepare(TransactionContext* ctxt)
+{
+ for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
+}
+
+void TxAccept::RangeOps::commit()
+{
+ for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
+ //now remove if isRedundant():
+ if (!ranges.empty()) {
+ ack_iterator i = ranges.front().range.start;
+ ack_iterator end = ranges.back().range.end;
+ while (i != end) {
+ if (i->isRedundant()) {
+ i = unacked.erase(i);
+ } else {
+ i++;
+ }
+ }
+ }
+}
TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) :
- acked(_acked), unacked(_unacked) {}
+ acked(_acked), unacked(_unacked), ops(unacked)
+{
+ //populate the ops
+ acked.for_each(ops);
+}
bool TxAccept::prepare(TransactionContext* ctxt) throw()
{
try{
- //dequeue messages from their respective queues:
- for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
- if (i->coveredBy(&acked)) {
- i->dequeue(ctxt);
- }
- }
+ ops.prepare(ctxt);
return true;
}catch(const std::exception& e){
QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -51,11 +92,7 @@
void TxAccept::commit() throw()
{
- for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
- if (i->coveredBy(&acked)) i->setEnded();
- }
-
- unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+ ops.commit();
}
void TxAccept::rollback() throw() {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h Fri Oct 24 05:34:29 2008
@@ -34,9 +34,31 @@
* Defines the transactional behaviour for accepts received by
* a transactional channel.
*/
- class TxAccept : public TxOp{
+ class TxAccept : public TxOp {
+ struct RangeOp
+ {
+ AckRange range;
+
+ RangeOp(const AckRange& r);
+ void prepare(TransactionContext* ctxt);
+ void commit();
+ };
+
+ struct RangeOps
+ {
+ std::vector<RangeOp> ranges;
+ DeliveryRecords& unacked;
+
+ RangeOps(DeliveryRecords& u);
+
+ void operator()(framing::SequenceNumber start, framing::SequenceNumber end);
+ void prepare(TransactionContext* ctxt);
+ void commit();
+ };
+
framing::SequenceSet& acked;
std::list<DeliveryRecord>& unacked;
+ RangeOps ops;
public:
/**
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Fri Oct 24 05:34:29 2008
@@ -97,9 +97,10 @@
bool summary;
uint32_t intervalSub;
uint32_t intervalPub;
- size_t tx_pub;
- bool tx_pub_async;
- size_t tx_sub;
+ size_t tx;
+ size_t txPub;
+ size_t txSub;
+ bool commitAsync;
static const std::string helpText;
@@ -109,7 +110,7 @@
pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
subs(1), ack(0),
qt(1), iterations(1), mode(SHARED), summary(false),
- intervalSub(0), intervalPub(0), tx_pub(0), tx_pub_async(false), tx_sub(0)
+ intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false)
{
addOptions()
("setup", optValue(setup), "Create shared queues.")
@@ -145,9 +146,10 @@
("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish")
- ("tx_pub", optValue(tx_pub, "N"), "if non-zero, the transaction batch size for publishing")
- ("tx_pub_async", optValue(tx_pub_async, "yes|no"), "Publishing tx commit async")
- ("tx_sub", optValue(tx_sub, "N"), "if non-zero, the transaction batch size for consuming");
+ ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming")
+ ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing")
+ ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit")
+ ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming");
}
// Computed values
@@ -184,6 +186,18 @@
break;
}
transfers=(totalPubs*count) + (totalSubs*subQuota);
+ if (tx) {
+ if (txPub) {
+ cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl;
+ } else {
+ txPub = tx;
+ }
+ if (txSub) {
+ cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl;
+ } else {
+ txSub = tx;
+ }
+ }
}
};
@@ -457,12 +471,8 @@
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
- if (opts.tx_pub){
- if (opts.tx_pub_async){
- session.txSelect();
- } else {
- sync(session).txSelect();
- }
+ if (opts.txPub){
+ session.txSelect();
}
SubscriptionManager subs(session);
LocalQueue lq;
@@ -488,8 +498,8 @@
arg::content=msg,
arg::acceptMode=1);
}
- if (opts.tx_pub && ((i+1) % opts.tx_pub == 0)){
- if (opts.tx_pub_async){
+ if (opts.txPub && ((i+1) % opts.txPub == 0)){
+ if (opts.commitAsync){
session.txCommit();
} else {
sync(session).txCommit();
@@ -504,12 +514,8 @@
// Send result to controller.
Message report(lexical_cast<string>(opts.count/time), "pub_done");
session.messageTransfer(arg::content=report, arg::acceptMode=1);
- if (opts.tx_pub){
- if (opts.tx_pub_async){
- session.txCommit();
- }else{
- sync(session).txCommit();
- }
+ if (opts.txPub){
+ sync(session).txCommit();
}
}
session.close();
@@ -552,16 +558,19 @@
void run() { // Subscribe
try {
- if (opts.tx_sub) sync(session).txSelect();
+ if (opts.txSub) sync(session).txSelect();
SubscriptionManager subs(session);
- LocalQueue lq(AckPolicy(opts.tx_sub ? opts.tx_sub : opts.ack));
- subs.setAcceptMode(opts.tx_sub || opts.ack ? 0 : 1);
+ LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack));
+ subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1);
subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
false);
subs.subscribe(lq, queue);
// Notify controller we are ready.
session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
- if (opts.tx_sub) sync(session).txCommit();
+ if (opts.txSub) {
+ if (opts.commitAsync) session.txCommit();
+ else sync(session).txCommit();
+ }
for (size_t j = 0; j < opts.iterations; ++j) {
if (j > 0) {
@@ -573,7 +582,10 @@
size_t expect=0;
for (size_t i = 0; i < opts.subQuota; ++i) {
msg=lq.pop();
- if (opts.tx_sub && ((i+1) % opts.tx_sub == 0)) sync(session).txCommit();
+ if (opts.txSub && ((i+1) % opts.txSub == 0)) {
+ if (opts.commitAsync) session.txCommit();
+ else sync(session).txCommit();
+ }
if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
// TODO aconway 2007-11-23: check message order for.
// multiple publishers. Need an array of counters,
@@ -590,17 +602,19 @@
expect = n+1;
}
}
- if (opts.tx_sub || opts.ack)
+ if (opts.txSub || opts.ack)
lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
- if (opts.tx_sub)
- sync(session).txCommit();
+ if (opts.txSub) {
+ if (opts.commitAsync) session.txCommit();
+ else sync(session).txCommit();
+ }
AbsTime end=now();
// Report to publisher.
Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
"sub_done");
session.messageTransfer(arg::content=result, arg::acceptMode=1);
- if (opts.tx_sub) sync(session).txCommit();
+ if (opts.txSub) sync(session).txCommit();
}
session.close();
}