You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/10/24 14:34:30 UTC

svn commit: r707615 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/DeliveryRecord.cpp qpid/broker/DeliveryRecord.h qpid/broker/SemanticState.cpp qpid/broker/TxAccept.cpp qpid/broker/TxAccept.h tests/perftest.cpp

Author: gsim
Date: Fri Oct 24 05:34:29 2008
New Revision: 707615

URL: http://svn.apache.org/viewvc?rev=707615&view=rev
Log:
Revised transactional options to perftest as they could not be used on older boost versions due to ambiguity.
Refactored TxAccept to avoid excessive testing and searching for delivery records.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Oct 24 05:34:29 2008
@@ -171,6 +171,24 @@
         cancelled = true;
 }
 
+AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last)
+{
+    ack_iterator start = find_if(records.begin(), records.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
+    ack_iterator end = start;
+     
+    if (start != records.end()) {
+        if (first == last) {
+            //just acked single element (move end past it)
+            ++end;
+        } else {
+            //need to find end (position it just after the last record in range)
+            end = find_if(start, records.end(), boost::bind(&DeliveryRecord::after, _1, last));
+        }
+    }
+    return AckRange(start, end);
+}
+
+
 namespace qpid {
 namespace broker {
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Oct 24 05:34:29 2008
@@ -34,11 +34,24 @@
 namespace qpid {
 namespace broker {
 class SemanticState;
+class DeliveryRecord;
+
+typedef std::list<DeliveryRecord> DeliveryRecords; 
+typedef std::list<DeliveryRecord>::iterator ack_iterator; 
+
+struct AckRange
+{
+    ack_iterator start;
+    ack_iterator end;    
+    AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {}
+};
+
 
 /**
  * Record of a delivery for which an ack is outstanding.
  */
-class DeliveryRecord{
+class DeliveryRecord
+{
     QueuedMessage msg;
     mutable Queue::shared_ptr queue;
     const std::string tag;
@@ -91,24 +104,14 @@
     void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
     void setId(DeliveryId _id) { id = _id; }
 
+    static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last);
     const QueuedMessage& getMessage() const { return msg; }
     framing::SequenceNumber getId() const { return id; }
     Queue::shared_ptr getQueue() const { return queue; }
-
     friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);         
     friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
 };
 
-typedef std::list<DeliveryRecord> DeliveryRecords; 
-typedef std::list<DeliveryRecord>::iterator ack_iterator; 
-
-struct AckRange
-{
-    ack_iterator start;
-    ack_iterator end;    
-    AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {}
-};
-
 struct AcquireFunctor
 {
     DeliveryIds& results;
@@ -120,7 +123,6 @@
         record.acquire(results);
     }
 };
-
 }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 24 05:34:29 2008
@@ -549,20 +549,8 @@
 }
 
 AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{    
-    ack_iterator start = find_if(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
-    ack_iterator end = start;
-     
-    if (start != unacked.end()) {
-        if (first == last) {
-            //just acked single element (move end past it)
-            ++end;
-        } else {
-            //need to find end (position it just after the last record in range)
-            end = find_if(start, unacked.end(), boost::bind(&DeliveryRecord::after, _1, last));
-        }
-    }
-    return AckRange(start, end);
+{   
+    return DeliveryRecord::findRange(unacked, first, last);
 }
 
 void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Fri Oct 24 05:34:29 2008
@@ -26,19 +26,60 @@
 using std::mem_fun_ref;
 using namespace qpid::broker;
 using qpid::framing::SequenceSet;
+using qpid::framing::SequenceNumber;
+
+TxAccept::RangeOp::RangeOp(const AckRange& r) : range(r) {}
+
+void TxAccept::RangeOp::prepare(TransactionContext* ctxt)
+{
+    for_each(range.start, range.end, bind(&DeliveryRecord::dequeue, _1, ctxt));
+}
+
+void TxAccept::RangeOp::commit()
+{
+    for_each(range.start, range.end, bind(&DeliveryRecord::setEnded, _1));
+}
+
+TxAccept::RangeOps::RangeOps(DeliveryRecords& u) : unacked(u) {} 
+
+void TxAccept::RangeOps::operator()(SequenceNumber start, SequenceNumber end)
+{
+    ranges.push_back(RangeOp(DeliveryRecord::findRange(unacked, start, end)));
+}
+
+void TxAccept::RangeOps::prepare(TransactionContext* ctxt)
+{
+    for_each(ranges.begin(), ranges.end(), bind(&RangeOp::prepare, _1, ctxt));
+}
+
+void TxAccept::RangeOps::commit()
+{
+    for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
+    //now remove if isRedundant():
+    if (!ranges.empty()) {
+        ack_iterator i = ranges.front().range.start;
+        ack_iterator end = ranges.back().range.end;
+        while (i != end) {
+            if (i->isRedundant()) {
+                i = unacked.erase(i);
+            } else {
+                i++;
+            }
+        }
+    }
+}
 
 TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : 
-    acked(_acked), unacked(_unacked) {}
+    acked(_acked), unacked(_unacked), ops(unacked) 
+{
+    //populate the ops
+    acked.for_each(ops);
+}
 
 bool TxAccept::prepare(TransactionContext* ctxt) throw()
 {
     try{
-        //dequeue messages from their respective queues:
-        for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
-            if (i->coveredBy(&acked)) {
-                i->dequeue(ctxt);
-            }
-        }
+        ops.prepare(ctxt);
         return true;
     }catch(const std::exception& e){
         QPID_LOG(error, "Failed to prepare: " << e.what());
@@ -51,11 +92,7 @@
 
 void TxAccept::commit() throw() 
 {
-    for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
-        if (i->coveredBy(&acked)) i->setEnded();
-    }
-
-    unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+    ops.commit();
 }
 
 void TxAccept::rollback() throw() {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h Fri Oct 24 05:34:29 2008
@@ -34,9 +34,31 @@
          * Defines the transactional behaviour for accepts received by
          * a transactional channel.
          */
-        class TxAccept : public TxOp{
+        class TxAccept : public TxOp {
+            struct RangeOp
+            {
+                AckRange range;
+    
+                RangeOp(const AckRange& r);
+                void prepare(TransactionContext* ctxt);
+                void commit();
+            };
+
+            struct RangeOps
+            {
+                std::vector<RangeOp> ranges;
+                DeliveryRecords& unacked;
+    
+                RangeOps(DeliveryRecords& u);
+
+                void operator()(framing::SequenceNumber start, framing::SequenceNumber end);
+                void prepare(TransactionContext* ctxt);
+                void commit();    
+            };
+
             framing::SequenceSet& acked;
             std::list<DeliveryRecord>& unacked;
+            RangeOps ops;
 
         public:
             /**

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=707615&r1=707614&r2=707615&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Fri Oct 24 05:34:29 2008
@@ -97,9 +97,10 @@
     bool summary;
     uint32_t intervalSub;
     uint32_t intervalPub;
-    size_t tx_pub;
-    bool tx_pub_async;
-    size_t tx_sub;
+    size_t tx;
+    size_t txPub;
+    size_t txSub;
+    bool commitAsync;
 
     static const std::string helpText;
     
@@ -109,7 +110,7 @@
         pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false), syncPub(false),
         subs(1), ack(0),
         qt(1), iterations(1), mode(SHARED), summary(false),
-        intervalSub(0), intervalPub(0), tx_pub(0), tx_pub_async(false), tx_sub(0)
+        intervalSub(0), intervalPub(0), tx(0), txPub(0), txSub(0), commitAsync(false)
     {
         addOptions()
             ("setup", optValue(setup), "Create shared queues.")
@@ -145,9 +146,10 @@
             ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
             ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish")
 
-            ("tx_pub", optValue(tx_pub, "N"), "if non-zero, the transaction batch size for publishing")
-            ("tx_pub_async", optValue(tx_pub_async, "yes|no"), "Publishing tx commit async")
-            ("tx_sub", optValue(tx_sub, "N"), "if non-zero, the transaction batch size for consuming");
+            ("tx", optValue(tx, "N"), "if non-zero, the transaction batch size for publishing and consuming")
+            ("pub-tx", optValue(txPub, "N"), "if non-zero, the transaction batch size for publishing")
+            ("async-commit", optValue(commitAsync, "yes|no"), "Don't wait for completion of commit")
+            ("sub-tx", optValue(txSub, "N"), "if non-zero, the transaction batch size for consuming");
     }
 
     // Computed values
@@ -184,6 +186,18 @@
             break;
         }
         transfers=(totalPubs*count) + (totalSubs*subQuota);
+        if (tx) {
+            if (txPub) {
+                cerr << "WARNING: Using overriden tx value for publishers: " << txPub << std::endl;
+            } else {
+                txPub = tx;
+            }
+            if (txSub) {
+                cerr << "WARNING: Using overriden tx value for subscribers: " << txSub << std::endl;
+            } else {
+                txSub = tx;
+            }
+        }
     }
 };
 
@@ -457,12 +471,8 @@
                 msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
 
 
-            if (opts.tx_pub){ 
-                if (opts.tx_pub_async){
-                    session.txSelect();
-                } else {
-                    sync(session).txSelect();
-                }
+            if (opts.txPub){ 
+                session.txSelect();
             }
             SubscriptionManager subs(session);
             LocalQueue lq;
@@ -488,8 +498,8 @@
                             arg::content=msg,
                             arg::acceptMode=1);
                     }
-                    if (opts.tx_pub && ((i+1) % opts.tx_pub == 0)){
-                        if (opts.tx_pub_async){
+                    if (opts.txPub && ((i+1) % opts.txPub == 0)){
+                        if (opts.commitAsync){
                             session.txCommit();
                         } else {
                             sync(session).txCommit();
@@ -504,12 +514,8 @@
                 // Send result to controller.
                 Message report(lexical_cast<string>(opts.count/time), "pub_done");
                 session.messageTransfer(arg::content=report, arg::acceptMode=1);
-                if (opts.tx_pub){
-                    if (opts.tx_pub_async){
-                        session.txCommit();
-                    }else{
-                        sync(session).txCommit();
-                    }
+                if (opts.txPub){
+                    sync(session).txCommit();
                 }
             }
             session.close();
@@ -552,16 +558,19 @@
 
     void run() {                // Subscribe
         try {            
-            if (opts.tx_sub) sync(session).txSelect();
+            if (opts.txSub) sync(session).txSelect();
             SubscriptionManager subs(session);
-            LocalQueue lq(AckPolicy(opts.tx_sub ? opts.tx_sub : opts.ack));
-            subs.setAcceptMode(opts.tx_sub || opts.ack ? 0 : 1);
+            LocalQueue lq(AckPolicy(opts.txSub ? opts.txSub : opts.ack));
+            subs.setAcceptMode(opts.txSub || opts.ack ? 0 : 1);
             subs.setFlowControl(opts.subQuota, SubscriptionManager::UNLIMITED,
                                 false);
             subs.subscribe(lq, queue);
             // Notify controller we are ready.
             session.messageTransfer(arg::content=Message("ready", "sub_ready"), arg::acceptMode=1);
-            if (opts.tx_sub) sync(session).txCommit();
+            if (opts.txSub) {
+                if (opts.commitAsync) session.txCommit();
+                else sync(session).txCommit();
+            }
             
             for (size_t j = 0; j < opts.iterations; ++j) {
                 if (j > 0) {
@@ -573,7 +582,10 @@
                 size_t expect=0;
                 for (size_t i = 0; i < opts.subQuota; ++i) {
                     msg=lq.pop();
-                    if (opts.tx_sub && ((i+1) % opts.tx_sub == 0)) sync(session).txCommit();
+                    if (opts.txSub && ((i+1) % opts.txSub == 0)) {
+                        if (opts.commitAsync) session.txCommit();
+                        else sync(session).txCommit();
+                    }
                     if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
                     // TODO aconway 2007-11-23: check message order for. 
                     // multiple publishers. Need an array of counters,
@@ -590,17 +602,19 @@
                         expect = n+1;
                     }
                 }
-                if (opts.tx_sub || opts.ack)
+                if (opts.txSub || opts.ack)
                     lq.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
-                if (opts.tx_sub)
-                    sync(session).txCommit();
+                if (opts.txSub) {
+                    if (opts.commitAsync) session.txCommit();
+                    else sync(session).txCommit();
+                }
                 AbsTime end=now();
 
                 // Report to publisher.
                 Message result(lexical_cast<string>(opts.subQuota/secs(start,end)),
                                "sub_done");
                 session.messageTransfer(arg::content=result, arg::acceptMode=1);
-                if (opts.tx_sub) sync(session).txCommit();
+                if (opts.txSub) sync(session).txCommit();
             }
             session.close();
         }