You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/05/06 19:58:50 UTC

svn commit: r772384 - in /qpid/trunk/qpid/cpp/src: qpid/broker/DeliveryRecord.cpp qpid/broker/DeliveryRecord.h qpid/broker/SemanticState.cpp qpid/broker/SemanticState.h qpid/broker/TxAccept.cpp tests/DeliveryRecordTest.cpp

Author: aconway
Date: Wed May  6 17:58:50 2009
New Revision: 772384

URL: http://svn.apache.org/viewvc?rev=772384&view=rev
Log:
DeliveryRecord optimizations.

Replace linear search with binary search.
Collapse multi-pass mark-then-erase to a signle pass.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
    qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed May  6 17:58:50 2009
@@ -48,29 +48,13 @@
                                                   credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
 {}
 
-void DeliveryRecord::setEnded()
+bool DeliveryRecord::setEnded()
 {
     ended = true;
     //reset msg pointer, don't need to hold on to it anymore
     msg.payload = boost::intrusive_ptr<Message>();
-
     QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id);
-}
-
-bool DeliveryRecord::matches(DeliveryId tag) const{
-    return id == tag;
-}
-
-bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
-    return matches(tag) || after(tag);
-}
-
-bool DeliveryRecord::after(DeliveryId tag) const{
-    return id > tag;
-}
-
-bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{
-    return range->contains(id);
+    return isRedundant();
 }
 
 void DeliveryRecord::redeliver(SemanticState* const session) {
@@ -120,17 +104,17 @@
     }
 }
 
-void DeliveryRecord::complete() 
-{
+void DeliveryRecord::complete()  {
     completed = true; 
 }
 
-void DeliveryRecord::accept(TransactionContext* ctxt) {
+bool DeliveryRecord::accept(TransactionContext* ctxt) {
     if (acquired && !ended) {
         queue->dequeue(ctxt, msg);
         setEnded();
         QPID_LOG(debug, "Accepted " << id);
     }
+    return isRedundant();
 }
 
 void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
@@ -179,18 +163,10 @@
 
 AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last)
 {
-    DeliveryRecords::iterator start = find_if(records.begin(), records.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
-    DeliveryRecords::iterator end = start;
-     
-    if (start != records.end()) {
-        if (first == last) {
-            //just acked single element (move end past it)
-            ++end;
-        } else {
-            //need to find end (position it just after the last record in range)
-            end = find_if(start, records.end(), boost::bind(&DeliveryRecord::after, _1, last));
-        }
-    }
+    DeliveryRecords::iterator start = lower_bound(records.begin(), records.end(), first);
+    // Find end - position it just after the last record in range
+    DeliveryRecords::iterator end = lower_bound(records.begin(), records.end(), last);
+    if (end != records.end() && end->getId() == last) ++end;
     return AckRange(start, end);
 }
 
@@ -206,9 +182,5 @@
     return out;
 }
 
-bool operator<(const DeliveryRecord& a, const DeliveryRecord& b)
-{
-    return a.id < b.id;
-}
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Wed May  6 17:58:50 2009
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_DELIVERYRECORD_H
+#define QPID_BROKER_DELIVERYRECORD_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -18,11 +21,9 @@
  * under the License.
  *
  */
-#ifndef _DeliveryRecord_
-#define _DeliveryRecord_
 
 #include <algorithm>
-#include <list>
+#include <deque>
 #include <vector>
 #include <ostream>
 #include "qpid/framing/SequenceSet.h"
@@ -44,15 +45,14 @@
 {
     QueuedMessage msg;
     mutable Queue::shared_ptr queue;
-    const std::string tag;
+    std::string tag;
     DeliveryId id;
-    bool acquired;
-    bool acceptExpected;
-    bool cancelled;
-
-    bool completed;
-    bool ended;
-    const bool windowing;
+    bool acquired : 1;
+    bool acceptExpected : 1;
+    bool cancelled : 1;
+    bool completed : 1;
+    bool ended : 1;
+    bool windowing : 1;
 
     /**
      * Record required credit on construction as the pointer to the
@@ -61,7 +61,7 @@
      * to reallocate credit when it is completed (which could happen
      * after that).
      */
-    const uint32_t credit;
+    uint32_t credit;
 
   public:
     QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -73,10 +73,7 @@
                                       uint32_t credit=0       // Only used if msg is empty.
     );
     
-    QPID_BROKER_EXTERN bool matches(DeliveryId tag) const;
-    bool matchOrAfter(DeliveryId tag) const;
-    bool after(DeliveryId tag) const;
-    bool coveredBy(const framing::SequenceSet* const range) const;
+    bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); }
     
     void dequeue(TransactionContext* ctxt = 0) const;
     void requeue() const;
@@ -86,8 +83,8 @@
     void redeliver(SemanticState* const);
     void acquire(DeliveryIds& results);
     void complete();
-    void accept(TransactionContext* ctxt);
-    void setEnded();
+    bool accept(TransactionContext* ctxt); // Returns isRedundant()
+    bool setEnded();            // Returns isRedundant()
     void committed() const;
 
     bool isAcquired() const { return acquired; }
@@ -104,15 +101,19 @@
     void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
     void setId(DeliveryId _id) { id = _id; }
 
-    typedef std::list<DeliveryRecord> DeliveryRecords;
+    typedef std::deque<DeliveryRecord> DeliveryRecords;
     static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last);
     const QueuedMessage& getMessage() const { return msg; }
     framing::SequenceNumber getId() const { return id; }
     Queue::shared_ptr getQueue() const { return queue; }
-    friend QPID_BROKER_EXTERN bool operator<(const DeliveryRecord&, const DeliveryRecord&);         
+
     friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
 };
 
+inline bool operator<(const DeliveryRecord& a, const DeliveryRecord& b) { return a.getId() < b.getId(); }
+inline bool operator<(const framing::SequenceNumber& a, const DeliveryRecord& b) { return a < b.getId(); }
+inline bool operator<(const DeliveryRecord& a, const framing::SequenceNumber& b) { return a.getId() < b; }
+
 struct AcquireFunctor
 {
     DeliveryIds& results;
@@ -138,4 +139,4 @@
 }
 
 
-#endif
+#endif  /*!QPID_BROKER_DELIVERYRECORD_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed May  6 17:58:50 2009
@@ -408,12 +408,13 @@
         outputTasks.activateOutput();
 }
 
-void SemanticState::complete(DeliveryRecord& delivery)
+bool SemanticState::complete(DeliveryRecord& delivery)
 {    
     ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
         i->second->complete(delivery);
     }
+    return delivery.isRedundant();
 }
 
 void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
@@ -440,7 +441,7 @@
         //unconfirmed messages re redelivered and therefore have their
         //id adjusted, confirmed messages are not and so the ordering
         //w.r.t id is lost
-        unacked.sort();
+        sort(unacked.begin(), unacked.end());
     }
 }
 
@@ -638,24 +639,23 @@
             dtxBuffer->enlist(txAck);    
 
             //mark the relevant messages as 'ended' in unacked
-            for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
-
             //if the messages are already completed, they can be
             //removed from the record
-            unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
-
+            DeliveryRecords::iterator removed = remove_if(range.start, range.end, mem_fun_ref(&DeliveryRecord::setEnded));
+            unacked.erase(removed, range.end);
         }
     } else {
-        for_each(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
-        unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+        DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&DeliveryRecord::accept, _1, (TransactionContext*) 0));
+        unacked.erase(removed, range.end);
     }
 }
 
 void SemanticState::completed(DeliveryId first, DeliveryId last)
 {
     AckRange range = findRange(first, last);
-    for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
-    unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
+    
+    DeliveryRecords::iterator removed = remove_if(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
+    unacked.erase(removed, range.end);
     requestDispatch();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed May  6 17:58:50 2009
@@ -151,7 +151,7 @@
     void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
     void checkDtxTimeout();
 
-    void complete(DeliveryRecord&);
+    bool complete(DeliveryRecord&);
     AckRange findRange(DeliveryId first, DeliveryId last);
     void requestDispatch();
     void requestDispatch(ConsumerImpl&);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Wed May  6 17:58:50 2009
@@ -58,15 +58,10 @@
     std::for_each(ranges.begin(), ranges.end(), bind(&RangeOp::commit, _1));
     //now remove if isRedundant():
     if (!ranges.empty()) {
-        DeliveryRecords::iterator i = ranges.front().range.start;
+        DeliveryRecords::iterator begin = ranges.front().range.start;
         DeliveryRecords::iterator end = ranges.back().range.end;
-        while (i != end) {
-            if (i->isRedundant()) {
-                i = unacked.erase(i);
-            } else {
-                i++;
-            }
-        }
+        DeliveryRecords::iterator removed = remove_if(begin, end, mem_fun_ref(&DeliveryRecord::isRedundant));
+        unacked.erase(removed, end);
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp?rev=772384&r1=772383&r2=772384&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp Wed May  6 17:58:50 2009
@@ -53,7 +53,7 @@
 
     SequenceNumber expected(0);
     for (list<DeliveryRecord>::iterator i = records.begin(); i != records.end(); i++) {
-        BOOST_CHECK(i->matches(++expected));
+        BOOST_CHECK(i->getId() == ++expected);
     }
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org