You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/11/18 20:40:54 UTC

svn commit: r1036589 - in /qpid/trunk/qpid: cpp/src/qpid/cluster/ExpiryPolicy.cpp cpp/src/qpid/cluster/ExpiryPolicy.h cpp/src/tests/cluster_tests.py python/qpid/brokertest.py

Author: aconway
Date: Thu Nov 18 19:40:53 2010
New Revision: 1036589

URL: http://svn.apache.org/viewvc?rev=1036589&view=rev
Log:
QPID-2874 Clustered broker crashes in assertion in cluster/ExpiryPolicy.cpp

- Added missing lock to ExpiryPolicy
- 1-N mapping for expiry ID to mapping when receiving an update.
- Regression test.

A fan-out message (sent to multiple queues e.g. by fanout or topic
exchange) is a single message on multiple queues with a single expiry
ID. During an update however each instance is sent as a separate
message so we need to allow 1-N mapping of expiry ID to message during
update.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=1036589&r1=1036588&r2=1036589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Thu Nov 18 19:40:53 2010
@@ -41,40 +41,76 @@ struct ExpiryTask : public sys::TimerTas
     const uint64_t expiryId;
 };
 
+// Called while receiving an update
+void ExpiryPolicy::setId(uint64_t id) {
+    sys::Mutex::ScopedLock l(lock);
+    expiryId = id;
+}
+
+// Called while giving an update
+uint64_t ExpiryPolicy::getId() const {
+    sys::Mutex::ScopedLock l(lock);
+    return expiryId;
+}
+
+// Called in enqueuing connection thread
 void ExpiryPolicy::willExpire(broker::Message& m) {
-    uint64_t id = expiryId++;
-    assert(unexpiredById.find(id) == unexpiredById.end());
-    assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
-    unexpiredById[id] = &m;
-    unexpiredByMessage[&m] = id;
+    uint64_t id;
+    {
+        // When messages are fanned out to multiple queues, update sends
+        // them as independenty messages so we can have multiple messages
+        // with the same expiry ID.
+        //
+        // TODO: fix update to avoid duplicating messages.
+        sys::Mutex::ScopedLock l(lock);
+        id = expiryId++;        // if this is an update, this expiryId may already exist
+        assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+        unexpiredById.insert(IdMessageMap::value_type(id, &m));
+        unexpiredByMessage[&m] = id;
+    }
     timer.add(new ExpiryTask(this, id, m.getExpiration()));
 }
 
+// Called in dequeueing connection thread
 void ExpiryPolicy::forget(broker::Message& m) {
+    sys::Mutex::ScopedLock l(lock);
     MessageIdMap::iterator i = unexpiredByMessage.find(&m);
     assert(i != unexpiredByMessage.end());
     unexpiredById.erase(i->second);
     unexpiredByMessage.erase(i);
 }
 
+// Called in dequeueing connection or cleanup thread.
 bool ExpiryPolicy::hasExpired(broker::Message& m) {
+    sys::Mutex::ScopedLock l(lock);
     return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
 }
 
+// Called in timer thread
 void ExpiryPolicy::sendExpire(uint64_t id) {
+    {
+        sys::Mutex::ScopedLock l(lock);
+        // Don't multicast an expiry notice if message is already forgotten.
+        if (unexpiredById.find(id) == unexpiredById.end()) return;
+    }
     mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
 }
 
+// Called in CPG deliver thread.
 void ExpiryPolicy::deliverExpire(uint64_t id) {
-    IdMessageMap::iterator i = unexpiredById.find(id);
-    if (i != unexpiredById.end()) {
+    sys::Mutex::ScopedLock l(lock);
+    std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
+    IdMessageMap::iterator i = expired.first;
+    while (i != expired.second) {
         i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; 
         unexpiredByMessage.erase(i->second);
-        unexpiredById.erase(i);
+        unexpiredById.erase(i++);
     }
 }
 
+// Called in update thread on the updater.
 boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+    sys::Mutex::ScopedLock l(lock);
     MessageIdMap::iterator i = unexpiredByMessage.find(&m);
     return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=1036589&r1=1036588&r2=1036589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Thu Nov 18 19:40:53 2010
@@ -61,20 +61,24 @@ class ExpiryPolicy : public broker::Expi
     // Cluster delivers expiry notice.
     void deliverExpire(uint64_t);
 
-    void setId(uint64_t id) { expiryId = id; }
-    uint64_t getId() const { return expiryId; }
+    void setId(uint64_t id);
+    uint64_t getId() const;
     
     boost::optional<uint64_t> getId(broker::Message&);
     
   private:
     typedef std::map<broker::Message*,  uint64_t> MessageIdMap;
-    typedef std::map<uint64_t, broker::Message*> IdMessageMap;
+    // When messages are fanned out to multiple queues, update sends
+    // them as independenty messages so we can have multiple messages
+    // with the same expiry ID.
+    typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
 
     struct Expired : public broker::ExpiryPolicy {
         bool hasExpired(broker::Message&);
         void willExpire(broker::Message&);
     };
 
+    mutable sys::Mutex lock;
     MessageIdMap unexpiredByMessage;
     IdMessageMap unexpiredById;
     uint64_t expiryId;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1036589&r1=1036588&r2=1036589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Nov 18 19:40:53 2010
@@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, s
 from qpid import datatypes, messaging
 from qpid.brokertest import *
 from qpid.harness import Skipped
-from qpid.messaging import Message
+from qpid.messaging import Message, Empty
 from threading import Thread, Lock
 from logging import getLogger
 from itertools import chain
@@ -41,7 +41,6 @@ log = getLogger("qpid.cluster_tests")
 # Import scripts as modules
 qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
 
-
 def readfile(filename):
     """Returns te content of file named filename as a string"""
     f = file(filename)
@@ -223,6 +222,29 @@ acl allow all all
         out = qs.communicate()[0]
         assert out.find("amq.failover") > 0
 
+    def evaluate_address(self, session, address):
+        """Create a receiver just to evaluate an address for its side effects"""
+        r = session.receiver(address)
+        r.close()
+
+    def test_expire_fanout(self):
+        """Regression test for QPID-2874: Clustered broker crashes in assertion in
+        cluster/ExpiryPolicy.cpp.
+        Caused by a fan-out message being updated as separate messages"""
+        cluster = self.cluster(1)
+        session0 = cluster[0].connect().session()
+        # Create 2 queues bound to fanout exchange.
+        self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}")
+        self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}")
+        queues = ["q1", "q2"]
+        # Send a fanout message with a long timeout
+        s = session0.sender("amq.fanout")
+        s.send(Message("foo", ttl=100), sync=False)
+        # Start a new member, check the messages
+        cluster.start()
+        session1 = cluster[1].connect().session()
+        for q in queues: self.assert_browse(session1, "q1", ["foo"])
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=1036589&r1=1036588&r2=1036589&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Thu Nov 18 19:40:53 2010
@@ -523,6 +523,20 @@ class BrokerTest(TestCase):
         cluster = Cluster(self, count, args, expect=expect, wait=wait)
         return cluster
 
+    def assert_browse(self, session, queue, expect_contents, timeout=0):
+        """Assert that the contents of messages on queue (as retrieved
+        using session and timeout) exactly match the strings in
+        expect_contents"""
+
+        r = session.receiver("%s;{mode:browse}"%(queue))
+        actual_contents = []
+        try:
+            for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
+            while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
+        except messaging.Empty: pass
+        r.close()
+        self.assertEqual(expect_contents, actual_contents)
+
 class RethrownException(Exception):
     """Captures the stack trace of the current exception to be thrown later"""
     def __init__(self, msg=""):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org