You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/11/18 20:40:54 UTC
svn commit: r1036589 - in /qpid/trunk/qpid:
cpp/src/qpid/cluster/ExpiryPolicy.cpp cpp/src/qpid/cluster/ExpiryPolicy.h
cpp/src/tests/cluster_tests.py python/qpid/brokertest.py
Author: aconway
Date: Thu Nov 18 19:40:53 2010
New Revision: 1036589
URL: http://svn.apache.org/viewvc?rev=1036589&view=rev
Log:
QPID-2874 Clustered broker crashes in assertion in cluster/ExpiryPolicy.cpp
- Added missing lock to ExpiryPolicy
- 1-N mapping for expiry ID to mapping when receiving an update.
- Regression test.
A fan-out message (sent to multiple queues e.g. by fanout or topic
exchange) is a single message on multiple queues with a single expiry
ID. During an update however each instance is sent as a separate
message so we need to allow 1-N mapping of expiry ID to message during
update.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=1036589&r1=1036588&r2=1036589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Thu Nov 18 19:40:53 2010
@@ -41,40 +41,76 @@ struct ExpiryTask : public sys::TimerTas
const uint64_t expiryId;
};
+// Called while receiving an update
+void ExpiryPolicy::setId(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ expiryId = id;
+}
+
+// Called while giving an update
+uint64_t ExpiryPolicy::getId() const {
+ sys::Mutex::ScopedLock l(lock);
+ return expiryId;
+}
+
+// Called in enqueuing connection thread
void ExpiryPolicy::willExpire(broker::Message& m) {
- uint64_t id = expiryId++;
- assert(unexpiredById.find(id) == unexpiredById.end());
- assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
- unexpiredById[id] = &m;
- unexpiredByMessage[&m] = id;
+ uint64_t id;
+ {
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ //
+ // TODO: fix update to avoid duplicating messages.
+ sys::Mutex::ScopedLock l(lock);
+ id = expiryId++; // if this is an update, this expiryId may already exist
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ unexpiredById.insert(IdMessageMap::value_type(id, &m));
+ unexpiredByMessage[&m] = id;
+ }
timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
+// Called in dequeueing connection thread
void ExpiryPolicy::forget(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
MessageIdMap::iterator i = unexpiredByMessage.find(&m);
assert(i != unexpiredByMessage.end());
unexpiredById.erase(i->second);
unexpiredByMessage.erase(i);
}
+// Called in dequeueing connection or cleanup thread.
bool ExpiryPolicy::hasExpired(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
+// Called in timer thread
void ExpiryPolicy::sendExpire(uint64_t id) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ // Don't multicast an expiry notice if message is already forgotten.
+ if (unexpiredById.find(id) == unexpiredById.end()) return;
+ }
mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
}
+// Called in CPG deliver thread.
void ExpiryPolicy::deliverExpire(uint64_t id) {
- IdMessageMap::iterator i = unexpiredById.find(id);
- if (i != unexpiredById.end()) {
+ sys::Mutex::ScopedLock l(lock);
+ std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
+ IdMessageMap::iterator i = expired.first;
+ while (i != expired.second) {
i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
unexpiredByMessage.erase(i->second);
- unexpiredById.erase(i);
+ unexpiredById.erase(i++);
}
}
+// Called in update thread on the updater.
boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
MessageIdMap::iterator i = unexpiredByMessage.find(&m);
return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=1036589&r1=1036588&r2=1036589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Thu Nov 18 19:40:53 2010
@@ -61,20 +61,24 @@ class ExpiryPolicy : public broker::Expi
// Cluster delivers expiry notice.
void deliverExpire(uint64_t);
- void setId(uint64_t id) { expiryId = id; }
- uint64_t getId() const { return expiryId; }
+ void setId(uint64_t id);
+ uint64_t getId() const;
boost::optional<uint64_t> getId(broker::Message&);
private:
typedef std::map<broker::Message*, uint64_t> MessageIdMap;
- typedef std::map<uint64_t, broker::Message*> IdMessageMap;
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
struct Expired : public broker::ExpiryPolicy {
bool hasExpired(broker::Message&);
void willExpire(broker::Message&);
};
+ mutable sys::Mutex lock;
MessageIdMap unexpiredByMessage;
IdMessageMap unexpiredById;
uint64_t expiryId;
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1036589&r1=1036588&r2=1036589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Nov 18 19:40:53 2010
@@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, s
from qpid import datatypes, messaging
from qpid.brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message
+from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
from itertools import chain
@@ -41,7 +41,6 @@ log = getLogger("qpid.cluster_tests")
# Import scripts as modules
qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
-
def readfile(filename):
"""Returns te content of file named filename as a string"""
f = file(filename)
@@ -223,6 +222,29 @@ acl allow all all
out = qs.communicate()[0]
assert out.find("amq.failover") > 0
+ def evaluate_address(self, session, address):
+ """Create a receiver just to evaluate an address for its side effects"""
+ r = session.receiver(address)
+ r.close()
+
+ def test_expire_fanout(self):
+ """Regression test for QPID-2874: Clustered broker crashes in assertion in
+ cluster/ExpiryPolicy.cpp.
+ Caused by a fan-out message being updated as separate messages"""
+ cluster = self.cluster(1)
+ session0 = cluster[0].connect().session()
+ # Create 2 queues bound to fanout exchange.
+ self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}")
+ self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}")
+ queues = ["q1", "q2"]
+ # Send a fanout message with a long timeout
+ s = session0.sender("amq.fanout")
+ s.send(Message("foo", ttl=100), sync=False)
+ # Start a new member, check the messages
+ cluster.start()
+ session1 = cluster[1].connect().session()
+ for q in queues: self.assert_browse(session1, "q1", ["foo"])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=1036589&r1=1036588&r2=1036589&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Thu Nov 18 19:40:53 2010
@@ -523,6 +523,20 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait)
return cluster
+ def assert_browse(self, session, queue, expect_contents, timeout=0):
+ """Assert that the contents of messages on queue (as retrieved
+ using session and timeout) exactly match the strings in
+ expect_contents"""
+
+ r = session.receiver("%s;{mode:browse}"%(queue))
+ actual_contents = []
+ try:
+ for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
+ while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
+ except messaging.Empty: pass
+ r.close()
+ self.assertEqual(expect_contents, actual_contents)
+
class RethrownException(Exception):
"""Captures the stack trace of the current exception to be thrown later"""
def __init__(self, msg=""):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org