You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/03/20 21:50:50 UTC

svn commit: r1303143 - in /qpid/trunk/qpid: cpp/src/qpid/broker/MessageGroupManager.cpp cpp/src/qpid/broker/MessageGroupManager.h tests/src/py/qpid_tests/broker_0_10/msg_groups.py

Author: kgiusti
Date: Tue Mar 20 20:50:50 2012
New Revision: 1303143

URL: http://svn.apache.org/viewvc?rev=1303143&view=rev
Log:
QPID-3899: prevent TTL expiration of grouped msg from crashing broker.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1303143&r1=1303142&r2=1303143&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Tue Mar 20 20:50:50 2012
@@ -43,9 +43,24 @@ const std::string MessageGroupManager::q
 const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
 
 
+MessageGroupManager::GroupState::MessageFifo::iterator
+MessageGroupManager::GroupState::findMsg(const framing::SequenceNumber &position)
+{
+    MessageState mState(position);
+    MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState);
+    return (found->position == position) ? found : members.end();
+#if 0    
+    MessageFifo::iterator msg = members.begin();
+    const MessageFifo::iterator end = members.end();
+    while (msg != end && msg->position != position)
+        ++msg;
+    return msg;
+#endif    
+}
+
 void MessageGroupManager::unFree( const GroupState& state )
 {
-    GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+    GroupFifo::iterator pos = freeGroups.find( state.members.front().position );
     assert( pos != freeGroups.end() && pos->second == &state );
     freeGroups.erase( pos );
 }
@@ -60,8 +75,8 @@ void MessageGroupManager::disown( GroupS
 {
     state.owner.clear();
     assert(state.members.size());
-    assert(freeGroups.find(state.members.front()) == freeGroups.end());
-    freeGroups[state.members.front()] = &state;
+    assert(freeGroups.find(state.members.front().position) == freeGroups.end());
+    freeGroups[state.members.front().position] = &state;
 }
 
 MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
@@ -106,7 +121,8 @@ void MessageGroupManager::enqueued( cons
     // @todo KAG optimization - store reference to group state in QueuedMessage
     // issue: const-ness??
     GroupState& state = findGroup(qm);
-    state.members.push_back(qm.position);
+    GroupState::MessageState mState(qm.position);
+    state.members.push_back(mState);
     uint32_t total = state.members.size();
     QPID_LOG( trace, "group queue " << qName <<
               ": added message to group id=" << state.group << " total=" << total );
@@ -123,7 +139,9 @@ void MessageGroupManager::acquired( cons
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
     GroupState& state = findGroup(qm);
-    assert(state.members.size());   // there are msgs present
+    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+    assert(m != state.members.end());
+    m->acquired = true;
     state.acquired += 1;
     QPID_LOG( trace, "group queue " << qName <<
               ": acquired message in group id=" << state.group << " acquired=" << state.acquired );
@@ -137,6 +155,9 @@ void MessageGroupManager::requeued( cons
     GroupState& state = findGroup(qm);
     assert( state.acquired != 0 );
     state.acquired -= 1;
+    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+    assert(m != state.members.end());
+    m->acquired = false;
     if (state.acquired == 0 && state.owned()) {
         QPID_LOG( trace, "group queue " << qName <<
                   ": consumer name=" << state.owner << " released group id=" << state.group);
@@ -152,13 +173,17 @@ void MessageGroupManager::dequeued( cons
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
     GroupState& state = findGroup(qm);
-    assert( state.members.size() != 0 );
-    assert( state.acquired != 0 );
-    state.acquired -= 1;
+    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
+    assert(m != state.members.end());
+    if (m->acquired) {
+        assert( state.acquired != 0 );
+        state.acquired -= 1;
+    }
 
-    // likely to be at or near begin() if dequeued in order
+    // special case if qm is first (oldest) message in the group:
+    // may need to re-insert it back on the freeGroups list, as the index will change
     bool reFreeNeeded = false;
-    if (state.members.front() == qm.position) {
+    if (m == state.members.begin()) {
         if (!state.owned()) {
             // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
             // if on freelist, it is indexed by first member, which is about to be removed!
@@ -167,15 +192,7 @@ void MessageGroupManager::dequeued( cons
         }
         state.members.pop_front();
     } else {
-        GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
-        GroupState::PositionFifo::iterator end = state.members.end();
-        while (pos != end) {
-            if (*pos == qm.position) {
-                state.members.erase(pos);
-                break;
-            }
-            ++pos;
-        }
+        state.members.erase(m);
     }
 
     uint32_t total = state.members.size();
@@ -220,11 +237,11 @@ bool MessageGroupManager::nextConsumable
         GroupState& group = findGroup(next);
         if (!group.owned()) {
             //TODO: make acquire more efficient when we already have the message in question
-            if (group.members.front() == next.position && messages.acquire(next.position, next)) {    // only take from head!
+            if (group.members.front().position == next.position && messages.acquire(next.position, next)) {    // only take from head!
                 return true;
             }
             QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
-                     << "'s head message still pending. pos=" << group.members.front());
+                     << "'s head message still pending. pos=" << group.members.front().position);
         } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
             return true;
         }
@@ -284,7 +301,7 @@ void MessageGroupManager::query(qpid::ty
         info[GROUP_TIMESTAMP] = 0;
         if (g->second.members.size() != 0) {
             QueuedMessage qm;
-            if (messages.find(g->second.members.front(), qm) &&
+            if (messages.find(g->second.members.front().position, qm) &&
                 qm.payload &&
                 qm.payload->hasProperties<framing::DeliveryProperties>()) {
                 info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp();
@@ -353,6 +370,7 @@ namespace {
     const std::string GROUP_OWNER("owner");
     const std::string GROUP_ACQUIRED_CT("acquired-ct");
     const std::string GROUP_POSITIONS("positions");
+    const std::string GROUP_ACQUIRED_MSGS("acquired-msgs");
     const std::string GROUP_STATE("group-state");
 }
 
@@ -371,10 +389,14 @@ void MessageGroupManager::getState(qpid:
         group.setString(GROUP_OWNER, g->second.owner);
         group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
         framing::Array positions(TYPE_CODE_UINT32);
-        for (GroupState::PositionFifo::const_iterator p = g->second.members.begin();
-             p != g->second.members.end(); ++p)
-            positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p )));
+        framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+        for (GroupState::MessageFifo::const_iterator p = g->second.members.begin();
+             p != g->second.members.end(); ++p) {
+            positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position )));
+            acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired )));
+        }
         group.setArray(GROUP_POSITIONS, positions);
+        group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
         groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group)));
     }
     state.setArray(GROUP_STATE, groupState);
@@ -425,13 +447,25 @@ void MessageGroupManager::setState(const
                      qName << "\": position encoding error!");
             return;
         }
+        framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
+        ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
+        if (!ok || positions.count() != acquiredMsgs.count()) {
+            QPID_LOG(error, "Invalid message group state information for queue \"" <<
+                     qName << "\": acquired flag encoding error!");
+            return;
+        }
+
+        Array::const_iterator a = acquiredMsgs.begin();
+        for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) {
+            GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>());
+            mState.acquired = (*a++)->getIntegerValue<bool>();
+            state.members.push_back(mState);
+        }
 
-        for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p)
-            state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
         messageGroups[state.group] = state;
         if (!state.owned()) {
             assert(state.members.size());
-            freeGroups[state.members.front()] = &messageGroups[state.group];
+            freeGroups[state.members.front().position] = &messageGroups[state.group];
         }
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1303143&r1=1303142&r2=1303143&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h Tue Mar 20 20:50:50 2012
@@ -45,15 +45,25 @@ class MessageGroupManager : public State
 
     struct GroupState {
         // note: update getState()/setState() when changing this object's state implementation
-        typedef std::deque<framing::SequenceNumber> PositionFifo;
+
+        // track which messages are in this group, and if they have been acquired
+        struct MessageState {
+            framing::SequenceNumber position;
+            bool                    acquired;
+            MessageState() : acquired(false) {}
+            MessageState(const framing::SequenceNumber& p) : position(p), acquired(false) {}
+            bool operator<(const MessageState& b) { return position < b.position; }
+        };
+        typedef std::deque<MessageState> MessageFifo;
 
         std::string group;  // group identifier
         std::string owner;  // consumer with outstanding acquired messages
         uint32_t acquired;  // count of outstanding acquired messages
-        PositionFifo members;   // msgs belonging to this group
+        MessageFifo members;   // msgs belonging to this group, in enqueue order
 
         GroupState() : acquired(0) {}
         bool owned() const {return !owner.empty();}
+        MessageFifo::iterator findMsg(const framing::SequenceNumber &);
     };
 
     typedef sys::unordered_map<std::string, struct GroupState> GroupMap;

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py?rev=1303143&r1=1303142&r2=1303143&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py Tue Mar 20 20:50:50 2012
@@ -1122,6 +1122,70 @@ class MultiConsumerMsgGroupTests(Base):
         snd.close()
 
 
+    def test_ttl_expire(self):
+        """ Verify that expired (TTL) group messages are skipped correctly
+        """
+        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+                              " node: {x-declare: {arguments:" +
+                              " {'qpid.group_header_key':'THE-GROUP'," +
+                              "'qpid.shared_msg_group':1}}}}")
+
+        groups = ["A","B","C","A","B","C"]
+        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+        index = 0
+        for m in messages:
+            m.content['index'] = index
+            index += 1
+            if m.properties['THE-GROUP'] == 'B':
+                m.ttl = 1;
+            snd.send(m)
+
+        sleep(2)  # let all B's expire
+
+        # create consumers on separate sessions: C1,C2
+        s1 = self.setup_session()
+        c1 = s1.receiver("msg-group-q", options={"capacity":0})
+        s2 = self.setup_session()
+        c2 = s2.receiver("msg-group-q", options={"capacity":0})
+
+        # C1 should acquire A-0, then C2 should acquire C-2, Group B should
+        # expire and never be fetched
+
+        m1 = c1.fetch(0);
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 0
+
+        m2 = c2.fetch(0);
+        assert m2.properties['THE-GROUP'] == 'C'
+        assert m2.content['index'] == 2
+
+        m1 = c1.fetch(0);
+        assert m1.properties['THE-GROUP'] == 'A'
+        assert m1.content['index'] == 3
+
+        m2 = c2.fetch(0);
+        assert m2.properties['THE-GROUP'] == 'C'
+        assert m2.content['index'] == 5
+
+        # there should be no more left for either consumer
+        try:
+            mx = c1.fetch(0)
+            assert False     # should never get here
+        except Empty:
+            pass
+        try:
+            mx = c2.fetch(0)
+            assert False     # should never get here
+        except Empty:
+            pass
+
+        c1.session.acknowledge()
+        c2.session.acknowledge()
+        c1.close()
+        c2.close()
+        snd.close()
+
+
 class StickyConsumerMsgGroupTests(Base):
     """
     Tests for the behavior of sticky-consumer message groups.  These tests



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org