You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/08/17 15:07:27 UTC

svn commit: r1158686 - in /qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker: DeliveryRecord.cpp LegacyLVQ.cpp LegacyLVQ.h MessageDeque.cpp MessageDeque.h MessageMap.cpp MessageMap.h Messages.h PriorityQueue.cpp PriorityQueue.h Queue.cpp Queue.h

Author: kgiusti
Date: Wed Aug 17 13:07:26 2011
New Revision: 1158686

URL: http://svn.apache.org/viewvc?rev=1158686&view=rev
Log:
QPID-3346: checkpoint - incorporate changes based on review feedback

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Messages.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.h
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed Aug 17 13:07:26 2011
@@ -142,7 +142,7 @@ void DeliveryRecord::reject() 
             //just drop it
             QPID_LOG(info, "Dropping rejected message from " << queue->getName());
         }
-        queue->dequeue(0, msg);
+        dequeue();
         setEnded();
     }
 }
@@ -152,8 +152,7 @@ uint32_t DeliveryRecord::getCredit() con
     return credit;
 }
 
-void DeliveryRecord::acquire(DeliveryIds& results)
-{
+void DeliveryRecord::acquire(DeliveryIds& results) {
     if (queue->acquire(msg, tag)) {
         acquired = true;
         results.push_back(id);

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp Wed Aug 17 13:07:26 2011
@@ -44,11 +44,6 @@ bool LegacyLVQ::remove(const framing::Se
     }
 }
 
-bool LegacyLVQ::next(const QueuedMessage& message, QueuedMessage& next)
-{
-    return this->next(message.position, next);
-}
-
 bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     if (MessageMap::next(position, message)) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.h?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.h Wed Aug 17 13:07:26 2011
@@ -42,7 +42,6 @@ class LegacyLVQ : public MessageMap
     LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
     bool remove(const framing::SequenceNumber&, QueuedMessage&);
     bool next(const framing::SequenceNumber&, QueuedMessage&);
-    bool next(const QueuedMessage&, QueuedMessage&);
     bool push(const QueuedMessage& added, QueuedMessage& removed);
     void removeIf(Predicate);
     void setNoBrowse(bool);

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.cpp?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.cpp Wed Aug 17 13:07:26 2011
@@ -74,11 +74,6 @@ bool MessageDeque::find(const framing::S
     return find(position, message, false);
 }
 
-bool MessageDeque::next(const QueuedMessage& message, QueuedMessage& next)
-{
-    return this->next(message.position, next);
-}
-
 bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     if (messages.empty()) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.h?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageDeque.h Wed Aug 17 13:07:26 2011
@@ -41,7 +41,6 @@ class MessageDeque : public Messages
     bool remove(const framing::SequenceNumber&, QueuedMessage&);
     bool find(const framing::SequenceNumber&, QueuedMessage&);
     bool next(const framing::SequenceNumber&, QueuedMessage&);
-    bool next(const QueuedMessage&, QueuedMessage&);
 
     QueuedMessage& front();
     void pop();

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.cpp?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.cpp Wed Aug 17 13:07:26 2011
@@ -77,11 +77,6 @@ bool MessageMap::find(const framing::Seq
     }
 }
 
-bool MessageMap::next(const QueuedMessage& message, QueuedMessage& next)
-{
-    return this->next(message.position, next);
-}
-
 bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     if (!messages.empty() && position < front().position) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.h?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageMap.h Wed Aug 17 13:07:26 2011
@@ -47,7 +47,6 @@ class MessageMap : public Messages
     virtual bool remove(const framing::SequenceNumber&, QueuedMessage&);
     bool find(const framing::SequenceNumber&, QueuedMessage&);
     virtual bool next(const framing::SequenceNumber&, QueuedMessage&);
-    virtual bool next(const QueuedMessage&, QueuedMessage&);
 
     QueuedMessage& front();
     void pop();

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Messages.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Messages.h?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Messages.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Messages.h Wed Aug 17 13:07:26 2011
@@ -76,15 +76,6 @@ class Messages
      * @return true if there is another message, false otherwise.
      */
     virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
-
-    /**
-     * Return the next message based on the supplied queued message.
-     * The next messages is passed back via the second parameter.
-     * @todo replace with queue iterator
-     * @return true if there is another message, false otherwise.
-     */
-    virtual bool next(const QueuedMessage&, QueuedMessage&) = 0;
-
     /**
      * Note: Caller is responsible for ensuring that there is a front
      * (e.g. empty() returns false)

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.cpp Wed Aug 17 13:07:26 2011
@@ -88,25 +88,6 @@ bool PriorityQueue::find(const framing::
     return find(position, message, false);
 }
 
-bool PriorityQueue::next(const QueuedMessage& message, QueuedMessage& next)
-{
-    uint p = getPriorityLevel(message);
-    QueuedMessage match;
-    match.position = message.position+1;
-    Deque::iterator m = lower_bound(messages[p].begin(), messages[p].end(), match);
-    if (m != messages[p].end()) {
-        next = *m;
-        return true;
-    }
-    while (p-- > 0) {
-        if (!messages[p].empty()) {
-            next = messages[p].front();
-            return true;
-        }
-    }
-    return false;
-}
-
 bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message)
 {
     QueuedMessage match;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.h?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/PriorityQueue.h Wed Aug 17 13:07:26 2011
@@ -46,7 +46,6 @@ class PriorityQueue : public Messages
     bool remove(const framing::SequenceNumber&, QueuedMessage&);
     bool find(const framing::SequenceNumber&, QueuedMessage&);
     bool next(const framing::SequenceNumber&, QueuedMessage&);
-    bool next(const QueuedMessage&, QueuedMessage&);
 
     QueuedMessage& front();
     void pop();

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Wed Aug 17 13:07:26 2011
@@ -115,12 +115,6 @@ class MessageAllocator
 
     /** hook to add any interesting management state to the status map (lock held) */
     virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {};
-
-    /** for move, purge, reroute - check if message matches against a filter,
-     * return true if message matches.
-     */
-     virtual bool match(const qpid::types::Variant::Map* filter,
-                        const QueuedMessage& message) const;
 };
 
 
@@ -147,7 +141,6 @@ class MessageGroupManager : public Queue
     static const std::string qpidMessageGroupKey;
     static const std::string qpidMessageGroupTimestamp;
     static const std::string qpidMessageGroupDefault;
-    static const std::string qpidMessageGroupFilter;    // key for move/purge filter map
 
     const std::string getGroupId( const QueuedMessage& qm ) const;
 
@@ -174,9 +167,11 @@ class MessageGroupManager : public Queue
 const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key");
 const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
 const std::string MessageGroupManager::qpidMessageGroupDefault("qpid.no_group");     /** @todo KAG: make configurable in Broker options */
-const std::string MessageGroupManager::qpidMessageGroupFilter("qpid.group_id");
 
 }}
+// KAG TBD: END find me a home....
+
+
 
 Queue::Queue(const string& _name, bool _autodelete,
              MessageStore* const _store,
@@ -203,8 +198,7 @@ Queue::Queue(const string& _name, bool _
     deleted(false),
     barrier(*this),
     autoDeleteTimeout(0),
-    allocator(new MessageAllocator( this )),
-    type(FIFO)
+    allocator(new MessageAllocator( this ))
 {
     if (parent != 0 && broker != 0) {
         ManagementAgent* agent = broker->getManagementAgent();
@@ -602,24 +596,101 @@ void Queue::purgeExpired(qpid::sys::Dura
 
 namespace {
     // for use with purge/move below - collect messages that match a given filter
+    //
+    class MessageFilter
+    {
+    public:
+        static const std::string typeKey;
+        static const std::string paramsKey;
+        static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
+        virtual bool match( const QueuedMessage& ) const { return true; }
+        virtual ~MessageFilter() {}
+    protected:
+        MessageFilter() {};
+    };
+    const std::string MessageFilter::typeKey("filter_type");
+    const std::string MessageFilter::paramsKey("filter_params");
+
+    // filter by message header string value exact match
+    class HeaderMatchFilter : public MessageFilter
+    {
+    public:
+        /* Config:
+           { 'filter_type' : 'header_match_str',
+             'filter_params' : { 'header_key' : "<header name>",
+                                 'header_value' : "<value to match>"
+                               }
+           }
+        */
+        static const std::string typeKey;
+        static const std::string headerKey;
+        static const std::string valueKey;
+        HeaderMatchFilter( const std::string& _header, const std::string& _value )
+            : MessageFilter (), header(_header), value(_value) {}
+        bool match( const QueuedMessage& msg ) const
+        {
+            const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders();
+            if (!headers) return false;
+            FieldTable::ValuePtr h = headers->get(header);
+            if (!h || !h->convertsTo<std::string>()) return false;
+            return h->get<std::string>() == value;
+        }
+    private:
+        const std::string header;
+        const std::string value;
+    };
+    const std::string HeaderMatchFilter::typeKey("header_match_str");
+    const std::string HeaderMatchFilter::headerKey("header_key");
+    const std::string HeaderMatchFilter::valueKey("header_value");
+
+    // factory to create correct filter based on map
+    MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
+    {
+        using namespace qpid::types;
+        if (filter) {
+            Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
+            if (i != filter->end()) {
+
+                if (i->second.asString() == HeaderMatchFilter::typeKey) {
+                    Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
+                    if (p != filter->end() && p->second.getType() == VAR_MAP) {
+                        Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
+                        Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
+                        if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
+                            std::string headerKey(k->second.asString());
+                            std::string value(v->second.asString());
+                            QPID_LOG(debug, "Message filtering by header value configured.  key: " << headerKey << " value: " << value );
+                            return new HeaderMatchFilter( headerKey, value );
+                        }
+                    }
+                }
+            }
+            QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'");
+        }
+        return new MessageFilter();
+    }
+
+    // used by removeIf() to collect all messages matching a filter, maximum match count is
+    // optional.
     struct Collector {
         const uint32_t maxMatches;
-        const qpid::types::Variant::Map *filter;
+        MessageFilter& filter;
         std::deque<QueuedMessage> matches;
-        boost::shared_ptr<MessageAllocator> allocator;
-        Collector(boost::shared_ptr<MessageAllocator> a, uint32_t m,
-                  const qpid::types::Variant::Map *f)
-            : maxMatches(m), filter(f), allocator(a) {}
-        void operator() (QueuedMessage& qm)
+        Collector(MessageFilter& filter, uint32_t max)
+            : maxMatches(max), filter(filter) {}
+        bool operator() (QueuedMessage& qm)
         {
             if (maxMatches == 0 || matches.size() < maxMatches) {
-                if (allocator->match( filter, qm )) {
+                if (filter.match( qm )) {
                     matches.push_back(qm);
+                    return true;
                 }
             }
+            return false;
         }
     };
-}
+
+} // end namespace
 
 
 /**
@@ -640,56 +711,45 @@ namespace {
 uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest,
                       const qpid::types::Variant::Map *filter)
 {
-    Collector c(allocator, purge_request, filter);
+    std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+    Collector c(*mf.get(), purge_request);
 
     Mutex::ScopedLock locker(messageLock);
-    messages->foreach( boost::bind<void>(boost::ref(c), _1) );
-
-    uint32_t count = c.matches.size();
-
-    // first remove all matches
+    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
     for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
-         qmsg != c.matches.end(); qmsg++) {
-        /** @todo KAG: need a direct remove method here */
-        bool ok = acquire(qmsg->position, *qmsg);
-        (void) ok; assert(ok);
+         qmsg != c.matches.end(); ++qmsg) {
+        // Update observers and message state:
+        acquired(*qmsg);
         dequeue(0, *qmsg);
-    }
-
-    // now reroute if necessary
-    if (dest.get()) {
-        while (!c.matches.empty()) {
-            QueuedMessage msg = c.matches.front();
-            c.matches.pop_front();
-            assert(msg.payload);
-            DeliverableMessage dmsg(msg.payload);
+        // now reroute if necessary
+        if (dest.get()) {
+            assert(qmsg->payload);
+            DeliverableMessage dmsg(qmsg->payload);
             dest->routeWithAlternate(dmsg);
         }
     }
-    return count;
+    return c.matches.size();
 }
 
 uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
                      const qpid::types::Variant::Map *filter)
 {
-    Collector c(allocator, qty, filter);
+    std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
+    Collector c(*mf.get(), qty);
 
     Mutex::ScopedLock locker(messageLock);
-    messages->foreach( boost::bind<void>(boost::ref(c), _1) );
-
-    uint32_t count = c.matches.size();
+    messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
 
-    while (!c.matches.empty()) {
-        QueuedMessage qmsg = c.matches.front();
-        c.matches.pop_front();
-        /** @todo KAG: need a direct remove method here */
-        bool ok = acquire(qmsg.position, qmsg);
-        (void) ok; assert(ok);
-        dequeue(0, qmsg);
-        assert(qmsg.payload);
-        destq->deliver(qmsg.payload);
+    for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
+         qmsg != c.matches.end(); ++qmsg) {
+        // Update observers and message state:
+        acquired(*qmsg);
+        dequeue(0, *qmsg);
+        // and move to destination Queue.
+        assert(qmsg->payload);
+        destq->deliver(qmsg->payload);
     }
-    return count;
+    return c.matches.size();
 }
 
 /** Acquire the front (oldest) message from the in-memory queue.
@@ -939,7 +999,6 @@ void Queue::acquired(const QueuedMessage
     }
 }
 
-
 void Queue::create(const FieldTable& _settings)
 {
     settings = _settings;
@@ -949,7 +1008,6 @@ void Queue::create(const FieldTable& _se
     configureImpl(_settings);
 }
 
-
 int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
 {
     qpid::framing::FieldTable::ValuePtr v = settings.get(key);
@@ -1009,29 +1067,23 @@ void Queue::configureImpl(const FieldTab
     if (lvqKey.size()) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Last Value Queue with key " << lvqKey);
         messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
-        type = LVQ;
     } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue with 'no-browse' on");
         messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
-        type = LVQ;
     } else if (_settings.get(qpidLastValueQueue)) {
         QPID_LOG(debug, "Configured queue " <<  getName() << " as Legacy Last Value Queue");
         messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
-        type = LVQ;
     } else {
         std::auto_ptr<Messages> m = Fairshare::create(_settings);
         if (m.get()) {
             messages = m;
             QPID_LOG(debug, "Configured queue " <<  getName() << " as priority queue.");
-            type = PRIORITY;
-        }
-    }
-
-    {   // override default message allocator if message groups configured.
-        boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings ));
-        if (ma) {
-            allocator = ma;
-            type = GROUP;
+        } else { // default (FIFO) queue type
+            // override default message allocator if message groups configured.
+            boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings ));
+            if (ma) {
+                allocator = ma;
+            }
         }
     }
 
@@ -1593,11 +1645,9 @@ bool MessageGroupManager::nextMessage( C
 
     if (c->preAcquires()) {     // not browsing
         next = messages.front();
-        QueuedMessage current;
         do {
-            current = next;
             /** @todo KAG: horrifingly suboptimal  - optimize */
-            std::string group( getGroupId( current ) );
+            std::string group( getGroupId( next ) );
             GroupMap::iterator gs = messageGroups.find( group );    /** @todo need to cache this somehow */
             assert( gs != messageGroups.end() );
             GroupState& state( gs->second );
@@ -1610,7 +1660,7 @@ bool MessageGroupManager::nextMessage( C
             if (state.owner == c->getName()) {
                 return true;
             }
-        } while (messages.next( current, next ));     /** @todo: .next() is a linear search from front - optimize */
+        } while (messages.next( next.position, next ));     /** @todo: .next() is a linear search from front - optimize */
         return false;
     } else if (messages.next(c->position, next))
         return true;
@@ -1681,15 +1731,6 @@ void MessageGroupManager::query(qpid::ty
     status[GroupQueryKey] = state;
 }
 
-bool MessageGroupManager::match(const qpid::types::Variant::Map* filter,
-                                const QueuedMessage& message) const
-{
-    if (!filter) return true;
-    qpid::types::Variant::Map::const_iterator i = filter->find( qpidMessageGroupFilter );
-    if (i == filter->end()) return true;
-    if (i->second.asString() == getGroupId(message)) return true;
-    return false;
-}
 
 boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
                                                                     const qpid::framing::FieldTable& settings )
@@ -1698,16 +1739,6 @@ boost::shared_ptr<MessageGroupManager> M
 
     if (settings.isSet(qpidMessageGroupKey)) {
 
-        Queue::Disposition qt = q->getDisposition();
-
-        if (qt == Queue::LVQ) {
-            QPID_LOG( error, "Message Groups cannot be enabled on LVQ Queues, queue=" << q->getName());
-            return empty;
-        }
-        if (qt == Queue::PRIORITY) {
-            QPID_LOG( error, "Message Groups cannot be enabled for Priority Queues, queue=" << q->getName());
-            return empty;
-        }
         std::string headerKey = settings.getAsString(qpidMessageGroupKey);
         if (headerKey.empty()) {
             QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName());
@@ -1756,12 +1787,6 @@ bool MessageAllocator::canAcquire(const 
 }
 
 
-// default match - ignore filter and always match.
-bool MessageAllocator::match(const qpid::types::Variant::Map*,
-                             const QueuedMessage&) const
-{
-    return true;
-}
 
 
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h?rev=1158686&r1=1158685&r2=1158686&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h Wed Aug 17 13:07:26 2011
@@ -70,10 +70,6 @@ class MessageAllocator;
 class Queue : public boost::enable_shared_from_this<Queue>,
               public PersistableQueue, public management::Manageable {
 
- public:
-    enum Disposition {FIFO, LVQ, PRIORITY, GROUP};
-
- private:
     struct UsageBarrier
     {
         Queue& parent;
@@ -134,8 +130,6 @@ class Queue : public boost::enable_share
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
     boost::shared_ptr<MessageAllocator> allocator;
-    Disposition type;
-
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -412,8 +406,6 @@ class Queue : public boost::enable_share
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     void setDequeueSincePurge(uint32_t value);
-
-    Disposition getDisposition() const { return type; }
 };
 }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org