You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/03/30 21:36:49 UTC

svn commit: r1307582 - in /qpid/trunk/qpid/cpp: src/qpid/broker/ src/qpid/cluster/ src/tests/ xml/

Author: aconway
Date: Fri Mar 30 19:36:48 2012
New Revision: 1307582

URL: http://svn.apache.org/viewvc?rev=1307582&view=rev
Log:
QPID-3603: Keep acquired messages on queues for all queue types.

Updated priority and lvq queues to keep acquired messages, and supply
them to browsers if requested. This is necessary so replicating
subscriptions can back-up these queue types without message loss.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp Fri Mar 30 19:36:48 2012
@@ -23,6 +23,7 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
+#include <algorithm>
 #include <boost/format.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/assign/list_of.hpp>
@@ -32,7 +33,7 @@ namespace broker {
 
 Fairshare::Fairshare(size_t levels, uint limit) :
     PriorityQueue(levels),
-    limits(levels, limit), priority(levels-1), count(0) {}
+    limits(levels, limit), counts(levels, 0) {}
 
 
 void Fairshare::setLimit(size_t level, uint limit)
@@ -40,70 +41,63 @@ void Fairshare::setLimit(size_t level, u
     limits[level] = limit;
 }
 
-bool Fairshare::limitReached()
-{
-    uint l = limits[priority];
-    return l && ++count > l;
-}
-
-uint Fairshare::currentLevel()
-{
-    if (limitReached()) {
-        return nextLevel();
-    } else {
-        return priority;
-    }
-}
-
-uint Fairshare::nextLevel()
-{
-    count = 1;
-    if (priority) --priority;
-    else priority = levels-1;
-    return priority;
-}
-
 bool Fairshare::isNull()
 {
     for (int i = 0; i < levels; i++) if (limits[i]) return false;
     return true;
 }
 
-bool Fairshare::getState(uint& p, uint& c) const
+bool Fairshare::getState(qpid::framing::FieldTable& state) const
 {
-    p = priority;
-    c = count;
+    for (int i = 0; i < levels; i++) {
+        if (counts[i]) {
+            std::string key = (boost::format("fairshare-count-%1%") % i).str();
+            state.setInt(key, counts[i]);
+        }
+    }
     return true;
 }
 
-bool Fairshare::setState(uint p, uint c)
+bool Fairshare::checkLevel(uint level)
 {
-    priority = p;
-    count = c;
-    return true;
+    if (!limits[level] || counts[level] < limits[level]) {
+        counts[level]++;
+        return true;
+    } else {
+        return false;
+    }
 }
 
-bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
+bool Fairshare::consume(QueuedMessage& message)
 {
-    const uint start = p = currentLevel();
-    do {
-        if (!messages[p].empty()) return true;
-    } while ((p = nextLevel()) != start);
-    return false;
+    for (Available::iterator i = available.begin(); i != available.end(); ++i) {
+        QueuedMessage* next = *i;
+        if (checkLevel(getPriorityLevel(*next))) {
+            messages[next->position].status = QueuedMessage::ACQUIRED;
+            message = *next;
+            available.erase(i);
+            return true;
+        }
+    }
+    if (!available.empty()) {
+        std::fill(counts.begin(), counts.end(), 0);//reset counts
+        return consume(message);
+    } else {
+        return false;
+    }
 }
 
 
-
-bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
+bool Fairshare::getState(const Messages& m, qpid::framing::FieldTable& counts)
 {
     const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
-    return fairshare && fairshare->getState(priority, count);
+    return fairshare && fairshare->getState(counts);
 }
 
-bool Fairshare::setState(Messages& m, uint priority, uint count)
+bool Fairshare::setState(Messages& m, const qpid::framing::FieldTable& counts)
 {
     Fairshare* fairshare = dynamic_cast<Fairshare*>(&m);
-    return fairshare && fairshare->setState(priority, count);
+    return fairshare && fairshare->setState(counts);
 }
 
 int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys)
@@ -136,7 +130,14 @@ int getIntegerSettingForKey(const qpid::
 {
     return getIntegerSetting(settings, boost::assign::list_of<std::string>(key));
 }
-
+bool Fairshare::setState(const qpid::framing::FieldTable& state)
+{
+    for (int i = 0; i < levels; i++) {
+        std::string key = (boost::format("fairshare-count-%1%") % i).str();
+        counts[i] = state.isSet(key) ? getIntegerSettingForKey(state, key) : 0;
+    }
+    return true;
+}
 int getSetting(const qpid::framing::FieldTable& settings, const std::vector<std::string>& keys, int minvalue, int maxvalue)
 {
     return std::max(minvalue,std::min(getIntegerSetting(settings, keys), maxvalue));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Fairshare.h Fri Mar 30 19:36:48 2012
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/broker/PriorityQueue.h"
+#include <vector>
 
 namespace qpid {
 namespace framing {
@@ -38,23 +39,19 @@ class Fairshare : public PriorityQueue
 {
   public:
     Fairshare(size_t levels, uint limit);
-    bool getState(uint& priority, uint& count) const;
-    bool setState(uint priority, uint count);
+    bool getState(qpid::framing::FieldTable& counts) const;
+    bool setState(const qpid::framing::FieldTable& counts);
     void setLimit(size_t level, uint limit);
     bool isNull();
+    bool consume(QueuedMessage&);
     static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
-    static bool getState(const Messages&, uint& priority, uint& count);
-    static bool setState(Messages&, uint priority, uint count);
+    static bool getState(const Messages&, qpid::framing::FieldTable& counts);
+    static bool setState(Messages&, const qpid::framing::FieldTable& counts);
   private:
     std::vector<uint> limits;
+    std::vector<uint> counts;
 
-    uint priority;
-    uint count;
-
-    uint currentLevel();
-    uint nextLevel();
-    bool limitReached();
-    bool findFrontLevel(uint& p, PriorityLevels&);
+    bool checkLevel(uint level);
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp Fri Mar 30 19:36:48 2012
@@ -28,16 +28,26 @@ namespace broker {
 LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
 
 void LegacyLVQ::setNoBrowse(bool b)
-{ 
+{
     noBrowse = b;
 }
+bool LegacyLVQ::deleted(const QueuedMessage& message)
+{
+    Ordering::iterator i = messages.find(message.position);
+    if (i != messages.end() && i->second.payload == message.payload) {
+        erase(i);
+        return true;
+    } else {
+        return false;
+    }
+}
 
 bool LegacyLVQ::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     Ordering::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.payload == message.payload) {
+    if (i != messages.end() && i->second.payload == message.payload && i->second.status == QueuedMessage::AVAILABLE) {
+        i->second.status = QueuedMessage::ACQUIRED;
         message = i->second;
-        erase(i);
         return true;
     } else {
         return false;
@@ -66,12 +76,17 @@ bool LegacyLVQ::push(const QueuedMessage
 }
 
 const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
-{ 
+{
     //add the new message into the original position of the replaced message
     Ordering::iterator i = messages.find(original.position);
-    i->second = update;
-    i->second.position = original.position;
-    return i->second;
+    if (i != messages.end()) {
+        i->second = update;
+        i->second.position = original.position;
+        return i->second;
+    } else {
+        QPID_LOG(error, "Failed to replace message at " << original.position);
+        return update;
+    }
 }
 
 void LegacyLVQ::removeIf(Predicate p)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h Fri Mar 30 19:36:48 2012
@@ -40,6 +40,7 @@ class LegacyLVQ : public MessageMap
 {
   public:
     LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
+    bool deleted(const QueuedMessage&);
     bool acquire(const framing::SequenceNumber&, QueuedMessage&);
     bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
     bool push(const QueuedMessage& added, QueuedMessage& removed);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp Fri Mar 30 19:36:48 2012
@@ -20,6 +20,7 @@
  */
 #include "qpid/broker/MessageMap.h"
 #include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
@@ -27,7 +28,16 @@ namespace {
 const std::string EMPTY;
 }
 
-bool MessageMap::deleted(const QueuedMessage&) { return true; }
+bool MessageMap::deleted(const QueuedMessage& message)
+{
+    Ordering::iterator i = messages.find(message.position);
+    if (i != messages.end()) {
+        erase(i);
+        return true;
+    } else {
+        return false;
+    }
+}
 
 std::string MessageMap::getKey(const QueuedMessage& message)
 {
@@ -38,30 +48,32 @@ std::string MessageMap::getKey(const Que
 
 size_t MessageMap::size()
 {
-    return messages.size();
+    size_t count(0);
+    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+        if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+    }
+    return count;
 }
 
 bool MessageMap::empty()
 {
-    return messages.empty();
+    return size() == 0;//TODO: more efficient implementation
 }
 
 void MessageMap::release(const QueuedMessage& message)
 {
-    std::string key = getKey(message);
-    Index::iterator i = index.find(key);
-    if (i == index.end()) {
-        index[key] = message;
-        messages[message.position] = message;
-    } //else message has already been replaced
+    Ordering::iterator i = messages.find(message.position);
+    if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
+        i->second.status = QueuedMessage::AVAILABLE;
+    }
 }
 
 bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     Ordering::iterator i = messages.find(position);
-    if (i != messages.end()) {
+    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+        i->second.status = QueuedMessage::ACQUIRED;
         message = i->second;
-        erase(i);
         return true;
     } else {
         return false;
@@ -71,7 +83,7 @@ bool MessageMap::acquire(const framing::
 bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     Ordering::iterator i = messages.find(position);
-    if (i != messages.end()) {
+    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
         message = i->second;
         return true;
     } else {
@@ -79,10 +91,10 @@ bool MessageMap::find(const framing::Seq
     }
 }
 
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
 {
     Ordering::iterator i = messages.lower_bound(position+1);
-    if (i != messages.end()) {
+    if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE  || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
         message = i->second;
         return true;
     } else {
@@ -92,14 +104,14 @@ bool MessageMap::browse(const framing::S
 
 bool MessageMap::consume(QueuedMessage& message)
 {
-    Ordering::iterator i = messages.begin();
-    if (i != messages.end()) {
-        message = i->second;
-        erase(i);
-        return true;
-    } else {
-        return false;
+    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+        if (i->second.status == QueuedMessage::AVAILABLE) {
+            i->second.status = QueuedMessage::ACQUIRED;
+            message = i->second;
+            return true;
+        }
     }
+    return false;
 }
 
 const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
@@ -115,12 +127,17 @@ bool MessageMap::push(const QueuedMessag
     if (result.second) {
         //there was no previous message for this key; nothing needs to
         //be removed, just add the message into its correct position
-        messages[added.position] = added;
+        QueuedMessage& a = messages[added.position];
+        a = added;
+        a.status = QueuedMessage::AVAILABLE;
+        QPID_LOG(debug, "Added message at " << a.position);
         return false;
     } else {
         //there is already a message with that key which needs to be replaced
         removed = result.first->second;
         result.first->second = replace(result.first->second, added);
+        result.first->second.status = QueuedMessage::AVAILABLE;
+        QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first);
         return true;
     }
 }
@@ -128,15 +145,24 @@ bool MessageMap::push(const QueuedMessag
 void MessageMap::foreach(Functor f)
 {
     for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
-        f(i->second);
+        if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
     }
 }
 
 void MessageMap::removeIf(Predicate p)
 {
-    for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) {
-        if (p(i->second)) {
-            erase(i);
+    for (Ordering::iterator i = messages.begin(); i != messages.end();) {
+        if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
+            index.erase(getKey(i->second));
+            //Note: Removing from messages means that the subsequent
+            //call to deleted() for the same message will return
+            //false. At present that is not a problem. If this were
+            //changed to hold onto the message until dequeued
+            //(e.g. with REMOVED state), then the erase() below would
+            //need to take that into account.
+            messages.erase(i++);
+        } else {
+            ++i;
         }
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h Fri Mar 30 19:36:48 2012
@@ -43,7 +43,7 @@ class MessageMap : public Messages
     size_t size();
     bool empty();
 
-    bool deleted(const QueuedMessage&);
+    virtual bool deleted(const QueuedMessage&);
     void release(const QueuedMessage&);
     virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
     bool find(const framing::SequenceNumber&, QueuedMessage&);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp Fri Mar 30 19:36:48 2012
@@ -28,120 +28,125 @@ namespace qpid {
 namespace broker {
 
 PriorityQueue::PriorityQueue(int l) : 
-    levels(l),
-    messages(levels, Deque()),
-    frontLevel(0), haveFront(false), cached(false) {}
+    levels(l) {}
 
-bool PriorityQueue::deleted(const QueuedMessage&) { return true; }
+bool PriorityQueue::deleted(const QueuedMessage& message)
+{
+    Index::iterator i = messages.find(message.position);
+    if (i != messages.end()) {
+        //remove from available list if necessary
+        if (i->second.status == QueuedMessage::AVAILABLE) {
+            Available::iterator j = std::find(available.begin(), available.end(), &i->second);
+            if (j != available.end()) available.erase(j);
+        }
+        //remove from messages map
+        messages.erase(i);
+        return true;
+    } else {
+        return false;
+    }
+}
 
 size_t PriorityQueue::size()
 {
-    size_t total(0);
-    for (int i = 0; i < levels; ++i) {
-        total += messages[i].size();
-    }
-    return total;
+    return available.size();
 }
 
 void PriorityQueue::release(const QueuedMessage& message)
 {
-    uint p = getPriorityLevel(message);
-    messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
-    clearCache();
-}
-
-bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
-{
-    QueuedMessage comp;
-    comp.position = position;
-    for (int i = 0; i < levels; ++i) {
-        if (!messages[i].empty()) {
-            unsigned long diff = position.getValue() - messages[i].front().position.getValue();
-            long maxEnd = diff < messages[i].size() ? diff : messages[i].size();        
-            Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp);
-            if (l != messages[i].end() && l->position == position) {
-                message = *l;
-                if (remove) {
-                    messages[i].erase(l);
-                    clearCache();
-                }
-                return true;
-            }
-        }
+    Index::iterator i = messages.find(message.position);
+    if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
+        i->second.status = QueuedMessage::AVAILABLE;
+        //insert message back into the correct place in available queue, based on priority:
+        Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
+        available.insert(j, &i->second);
     }
-    return false;
 }
 
 bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
 {
-    return find(position, message, true);
+    Index::iterator i = messages.find(position);
+    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+        i->second.status = QueuedMessage::ACQUIRED;
+        message = i->second;
+        //remove it from available list (could make this faster by using ordering):
+        Available::iterator j = std::find(available.begin(), available.end(), &i->second);
+        assert(j != available.end());
+        available.erase(j);
+        return true;
+    } else {
+        return false;
+    }
 }
 
 bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
 {
-    return find(position, message, false);
+    Index::iterator i = messages.find(position);
+    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
+        message = i->second;
+        return true;
+    } else {
+        return false;
+    }
 }
 
-bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
+bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
 {
-    QueuedMessage match;
-    match.position = position+1;
-    Deque::iterator lowest;
-    bool found = false;
-    for (int i = 0; i < levels; ++i) {
-        Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); 
-        if (m != messages[i].end()) {
-            if (m->position == match.position) {
-                message = *m;
-                return true;
-            } else if (!found || m->position < lowest->position) {
-                lowest = m;
-                found = true;
-            }
-        }
-    }
-    if (found) {
-        message = *lowest;
+    Index::iterator i = messages.lower_bound(position+1);
+    if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE  || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
+        message = i->second;
+        return true;
+    } else {
+        return false;
     }
-    return found;
 }
 
 bool PriorityQueue::consume(QueuedMessage& message)
 {
-    if (checkFront()) {
-        message = messages[frontLevel].front();
-        messages[frontLevel].pop_front();
-        clearCache();
+    if (!available.empty()) {
+        QueuedMessage* next = available.front();
+        messages[next->position].status = QueuedMessage::ACQUIRED;
+        message = *next;
+        available.pop_front();
         return true;
     } else {
         return false;
     }
 }
 
+bool PriorityQueue::compare(const QueuedMessage* a, const QueuedMessage* b) const
+{
+    int priorityA = getPriorityLevel(*a);
+    int priorityB = getPriorityLevel(*b);
+    if (priorityA == priorityB) return a->position < b->position;
+    else return priorityA > priorityB;
+}
+
 bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
 {
-    messages[getPriorityLevel(added)].push_back(added);
-    clearCache();
+    Index::iterator i = messages.insert(Index::value_type(added.position, added)).first;
+    i->second.status = QueuedMessage::AVAILABLE;
+    //insert message into the correct place in available queue, based on priority:
+    Available::iterator j = upper_bound(available.begin(), available.end(), &i->second, boost::bind(&PriorityQueue::compare, this, _1, _2));
+    available.insert(j, &i->second);
     return false;//adding a message never causes one to be removed for deque
 }
 
 void PriorityQueue::foreach(Functor f)
 {
-    for (int i = 0; i < levels; ++i) {
-        std::for_each(messages[i].begin(), messages[i].end(), f);
+    for (Available::iterator i = available.begin(); i != available.end(); ++i) {
+        f(**i);
     }
 }
 
 void PriorityQueue::removeIf(Predicate p)
 {
-    for (int priority = 0; priority < levels; ++priority) {
-        for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
-            if (p(*i)) {
-                i = messages[priority].erase(i);
-                clearCache();
-            } else {
-                ++i;
-            }
+    for (Available::iterator i = available.begin(); i != available.end();) {
+        if (p(**i)) {
+            messages[(*i)->position].status = QueuedMessage::REMOVED;
+            i = available.erase(i);
+        } else {
+            ++i;
         }
     }
 }
@@ -156,30 +161,6 @@ uint PriorityQueue::getPriorityLevel(con
     return std::min(priority - firstLevel, (uint)levels-1);
 }
 
-void PriorityQueue::clearCache()
-{
-    cached = false;
-}
-
-bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
-{
-    for (int p = levels-1; p >= 0; --p) {
-        if (!m[p].empty()) {
-            l = p;
-            return true;
-        }
-    }
-    return false;
-}
-
-bool PriorityQueue::checkFront()
-{
-    if (!cached) {
-        haveFront = findFrontLevel(frontLevel, messages);
-        cached = true;
-    }
-    return haveFront;
-}
 
 uint PriorityQueue::getPriority(const QueuedMessage& message)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h Fri Mar 30 19:36:48 2012
@@ -23,8 +23,8 @@
  */
 #include "qpid/broker/Messages.h"
 #include "qpid/sys/IntegerTypes.h"
-#include <deque>
-#include <vector>
+#include <list>
+#include <map>
 
 namespace qpid {
 namespace broker {
@@ -46,28 +46,22 @@ class PriorityQueue : public Messages
     bool acquire(const framing::SequenceNumber&, QueuedMessage&);
     bool find(const framing::SequenceNumber&, QueuedMessage&);
     bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
-    bool consume(QueuedMessage&);
+    virtual bool consume(QueuedMessage&);
     bool push(const QueuedMessage& added, QueuedMessage& removed);
 
     void foreach(Functor);
     void removeIf(Predicate);
     static uint getPriority(const QueuedMessage&);
   protected:
-    typedef std::deque<QueuedMessage> Deque;
-    typedef std::vector<Deque> PriorityLevels;
-    virtual bool findFrontLevel(uint& p, PriorityLevels&);
+    typedef std::list<QueuedMessage*> Available;
+    typedef std::map<framing::SequenceNumber, QueuedMessage> Index;
 
     const int levels;
-  private:
-    PriorityLevels messages;
-    uint frontLevel;
-    bool haveFront;
-    bool cached;
-    
-    bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
-    uint getPriorityLevel(const QueuedMessage&) const;
-    void clearCache();
-    bool checkFront();
+    Index messages;
+    Available available;
+
+    bool compare(const QueuedMessage* a, const QueuedMessage* b) const;
+    uint getPriorityLevel(const QueuedMessage& m) const;
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Mar 30 19:36:48 2012
@@ -585,9 +585,9 @@ void Connection::queuePosition(const str
     findQueue(qname)->setPosition(position);
 }
 
-void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
+void Connection::queueFairshareState(const std::string& qname, const framing::FieldTable& counts)
 {
-    if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) {
+    if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), counts)) {
         QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Mar 30 19:36:48 2012
@@ -156,7 +156,7 @@ class Connection :
                         uint32_t credit);
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
-    void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
+    void queueFairshareState(const std::string&, const framing::FieldTable& count);
     void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
 
     void txStart();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Mar 30 19:36:48 2012
@@ -389,9 +389,9 @@ void UpdateClient::updateQueue(client::A
     q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
     q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
     ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
-    uint priority, count;
-    if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
-        ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
+    qpid::framing::FieldTable counts;
+    if (qpid::broker::Fairshare::getState(q->getMessages(), counts)) {
+        ClusterConnectionProxy(s).queueFairshareState(q->getName(), counts);
     }
 
     ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Fri Mar 30 19:36:48 2012
@@ -513,18 +513,21 @@ class BrokerTest(TestCase):
         finally: r.close()
         return contents
 
-    def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content):
+    def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
         """Assert that the contents of messages on queue (as retrieved
         using session and timeout) exactly match the strings in
         expect_contents"""
         actual_contents = self.browse(session, queue, timeout, transform=transform)
-        self.assertEqual(expect_contents, actual_contents)
+        if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+        self.assertEqual(expect_contents, actual_contents, msg)
 
-    def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
+    def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None):
         """Wait up to timeout for contents of queue to match expect_contents"""
         test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
         retry(test, timeout, delay)
-        self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
+        actual_contents = self.browse(session, queue, 0, transform=transform)
+        if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+        self.assertEqual(expect_contents, actual_contents, msg)
 
 def join(thread, timeout=10):
     thread.join(timeout)

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Mar 30 19:36:48 2012
@@ -100,8 +100,8 @@ class HaCluster(object):
 
 def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
 
-class ShortTests(BrokerTest):
-    """Short HA functionality tests."""
+class HaTest(BrokerTest):
+    """Base class for HA test cases, defines convenience functions"""
 
     # Wait for an address to become valid.
     def wait(self, session, address):
@@ -135,6 +135,9 @@ class ShortTests(BrokerTest):
         """Connect to a backup broker as an admin connection"""
         return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
 
+class ReplicationTests(HaTest):
+    """Correctness tests for  HA replication."""
+
     def test_replication(self):
         """Test basic replication of configuration and messages before and
         after backup has connected"""
@@ -491,6 +494,51 @@ class ShortTests(BrokerTest):
         # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
         self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
 
+    def test_backup_acquired(self):
+        """Verify that acquired messages are backed up, for all queue types."""
+        class Test:
+            def __init__(self, queue, arguments, expect):
+                self.queue = queue
+                self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%(
+                    self.queue, ",".join(arguments + ["'qpid.replicate':all"]))
+                self.expect = [str(i) for i in expect]
+
+            def send(self, connection):
+                """Send messages, then acquire one but don't acknowledge"""
+                s = connection.session()
+                for m in range(10): s.sender(self.address).send(str(m))
+                s.receiver(self.address).fetch()
+
+            def wait(self, brokertest, backup):
+                brokertest.wait_backup(backup, self.queue)
+
+            def verify(self, brokertest, backup):
+                brokertest.assert_browse_backup(
+                    backup, self.queue, self.expect, msg=self.queue)
+
+        tests = [
+            Test("plain",[],range(10)),
+            Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)),
+            Test("priority",["'qpid.priorities':10"], range(10)),
+            Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)),
+            Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9])
+            ]
+
+        primary  = HaBroker(self, name="primary")
+        primary.promote()
+        backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+        c = primary.connect()
+        for t in tests: t.send(c) # Send messages, leave one unacknowledged.
+
+        backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+        # Wait for backups to catch up.
+        for t in tests:
+            t.wait(self, backup1)
+            t.wait(self, backup2)
+        # Verify acquired message was replicated
+        for t in tests: t.verify(self, backup1)
+        for t in tests: t.verify(self, backup2)
+
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given fairshare limit

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1307582&r1=1307581&r2=1307582&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Mar 30 19:36:48 2012
@@ -304,8 +304,7 @@
     <!-- Set the fairshare delivery related state of a replicated queue. -->
     <control name="queue-fairshare-state" code="0x38">
       <field name="queue" type="str8"/>
-      <field name="position" type="uint8"/>
-      <field name="count" type="uint8"/>
+      <field name="counts" type="map"/>
     </control>
 
     <!-- Replicate a QueueObserver for a given queue. -->



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org