You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/07/31 20:40:27 UTC
svn commit: r799687 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/ src/tests/
xml/
Author: aconway
Date: Fri Jul 31 18:40:26 2009
New Revision: 799687
URL: http://svn.apache.org/viewvc?rev=799687&view=rev
Log:
Fix race condition in cluster error handling.
If different errors occured almost simultaneously on two different
nodes in a cluster, there was a race condition that could cause the
cluster to hang.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul 31 18:40:26 2009
@@ -66,7 +66,7 @@
*
* Events are either
* - Connection events: non-0 connection number and are associated with a connection.
- * - Cluster Events: 0 connection number, are not associated with a connectin.
+ * - Cluster Events: 0 connection number, are not associated with a connection.
*
* Events are further categorized as:
* - Control: carries method frame(s) that affect cluster behavior.
@@ -149,7 +149,7 @@
* sensible reporting of an attempt to mix different versions in a
* cluster.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1;
+const uint32_t Cluster::CLUSTER_VERSION = 2;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -163,7 +163,7 @@
void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); }
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
- void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
+ void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
void shutdown() { cluster.shutdown(member, l); }
@@ -869,15 +869,12 @@
expiryPolicy->deliverExpire(id);
}
-void Cluster::errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&) {
- // If we handle an errorCheck at this point (rather than in the
- // ErrorCheck class) then we have processed succesfully past the
- // point of the error.
- if (state >= CATCHUP && type != ERROR_TYPE_NONE) {
- QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally.");
- mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
- }
+void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
+ // If we see an errorCheck here (rather than in the ErrorCheck
+ // class) then we have processed succesfully past the point of the
+ // error.
+ if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened
+ error.respondNone(from, type, frameSeq);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jul 31 18:40:26 2009
@@ -152,7 +152,7 @@
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
- void errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&);
+ void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void shutdown(const MemberId&, Lock&);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Fri Jul 31 18:40:26 2009
@@ -71,7 +71,7 @@
joiners[id] = url;
}
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_)
: frameSeq(frameSeq_)
{
std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Fri Jul 31 18:40:26 2009
@@ -25,6 +25,7 @@
#include "qpid/cluster/types.h"
#include "qpid/Url.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
#include <boost/optional.hpp>
@@ -53,7 +54,7 @@
ClusterMap();
ClusterMap(const MemberId& id, const Url& url, bool isReady);
- ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
+ ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq);
/** Update from config change.
*@return true if member set changed.
@@ -92,8 +93,8 @@
*/
static Set intersection(const Set& a, const Set& b);
- uint64_t getFrameSeq() { return frameSeq; }
- uint64_t incrementFrameSeq() { return ++frameSeq; }
+ framing::SequenceNumber getFrameSeq() { return frameSeq; }
+ framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; }
/** Clear out all knowledge of joiners & members, just keep alive set */
void clearStatus() { joiners.clear(); members.clear(); }
@@ -103,7 +104,7 @@
Map joiners, members;
Set alive;
- uint64_t frameSeq;
+ framing::SequenceNumber frameSeq;
friend std::ostream& operator<<(std::ostream&, const Map&);
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Jul 31 18:40:26 2009
@@ -311,7 +311,7 @@
output.setSendMax(sendMax);
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
consumerNumbering.clear();
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Jul 31 18:40:26 2009
@@ -122,7 +122,7 @@
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
- void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, const framing::SequenceNumber& frameSeq);
void retractOffer();
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Fri Jul 31 18:40:26 2009
@@ -45,11 +45,11 @@
}
void ErrorCheck::error(
- Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg)
+ Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg)
{
// Detected a local error, inform cluster and set error state.
assert(t != ERROR_TYPE_NONE); // Must be an error.
- assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+ assert(type == ERROR_TYPE_NONE); // Can't be called when already in an error state.
type = t;
unresolved = ms;
frameSeq = seq;
@@ -59,7 +59,7 @@
<< " error " << frameSeq << " on " << c << ": " << msg
<< " must be resolved with: " << unresolved);
mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
+ ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
// If there are already frames queued up by a previous error, review
// them with respect to this new error.
for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i))
@@ -74,41 +74,52 @@
// Review a frame in the queue with respect to the current error.
ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) {
FrameQueue::iterator next = i+1;
- if (isUnresolved()) {
- const ClusterErrorCheckBody* errorCheck = 0;
- if (i->frame.getBody())
- errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(
- i->frame.getMethod());
- if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ if(!isUnresolved() || !i->frame.getBody() || !i->frame.getMethod())
+ return next; // Only interested in control frames while unresolved.
+ const AMQMethodBody* method = i->frame.getMethod();
+ if (method->isA<const ClusterErrorCheckBody>()) {
+ const ClusterErrorCheckBody* errorCheck =
+ static_cast<const ClusterErrorCheckBody*>(method);
+
+ if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error
next = frames.erase(i); // Drop matching error check controls
if (errorCheck->getType() < type) { // my error is worse than his
QPID_LOG(critical, cluster << " error " << frameSeq
<< " did not occur on " << i->getMemberId());
- throw Exception("Aborted by failure that did not occur on all replicas");
+ throw Exception(QPID_MSG("Error " << frameSeq
+ << " did not occur on all members"));
}
else { // his error is worse/same as mine.
- QPID_LOG(notice, cluster << " error " << frameSeq
+ QPID_LOG(info, cluster << " error " << frameSeq
<< " resolved with " << i->getMemberId());
unresolved.erase(i->getMemberId());
checkResolved();
}
}
- else {
- const ClusterConfigChangeBody* configChange = 0;
- if (i->frame.getBody())
- configChange = dynamic_cast<const ClusterConfigChangeBody*>(
- i->frame.getMethod());
- if (configChange) {
- MemberSet members(ClusterMap::decode(configChange->getCurrent()));
- QPID_LOG(debug, cluster << " apply config change to unresolved: "
- << members);
- MemberSet intersect;
- set_intersection(members.begin(), members.end(),
- unresolved.begin(), unresolved.end(),
- inserter(intersect, intersect.begin()));
- unresolved.swap(intersect);
- checkResolved();
- }
+ else if (errorCheck->getFrameSeq() < frameSeq && errorCheck->getType() != NONE
+ && i->connectionId.getMember() != cluster.getId())
+ {
+ // This error occured before the current error so we
+ // have processed past it.
+ next = frames.erase(i); // Drop the error check control
+ respondNone(i->connectionId.getMember(), errorCheck->getType(),
+ errorCheck->getFrameSeq());
+ }
+ // if errorCheck->getFrameSeq() > frameSeq then leave it in the queue.
+ }
+ else if (method->isA<const ClusterConfigChangeBody>()) {
+ const ClusterConfigChangeBody* configChange =
+ static_cast<const ClusterConfigChangeBody*>(method);
+ if (configChange) {
+ MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+ QPID_LOG(debug, cluster << " apply config change to error "
+ << frameSeq << ": " << members);
+ MemberSet intersect;
+ set_intersection(members.begin(), members.end(),
+ unresolved.begin(), unresolved.end(),
+ inserter(intersect, intersect.begin()));
+ unresolved.swap(intersect);
+ checkResolved();
}
}
return next;
@@ -117,10 +128,10 @@
void ErrorCheck::checkResolved() {
if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
type = ERROR_TYPE_NONE;
- QPID_LOG(notice, cluster << " error " << frameSeq << " resolved.");
+ QPID_LOG(info, cluster << " error " << frameSeq << " resolved.");
}
else
- QPID_LOG(notice, cluster << " error " << frameSeq
+ QPID_LOG(info, cluster << " error " << frameSeq
<< " must be resolved with " << unresolved);
}
@@ -131,4 +142,15 @@
return e;
}
+void ErrorCheck::respondNone(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq) {
+ // Don't respond to non-errors or to my own errors.
+ if (type == ERROR_TYPE_NONE || from == cluster.getId())
+ return;
+ QPID_LOG(info, cluster << " error " << frameSeq << " did not occur locally.");
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq),
+ cluster.getId()
+ );
+}
+
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Fri Jul 31 18:40:26 2009
@@ -25,6 +25,7 @@
#include "qpid/cluster/types.h"
#include "qpid/cluster/Multicaster.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
#include <deque>
#include <set>
@@ -49,11 +50,12 @@
public:
typedef std::set<MemberId> MemberSet;
typedef framing::cluster::ErrorType ErrorType;
+ typedef framing::SequenceNumber SequenceNumber;
ErrorCheck(Cluster&);
/** A local error has occured */
- void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&,
+ void error(Connection&, ErrorType, SequenceNumber frameSeq, const MemberSet&,
const std::string& msg);
/** Called when a frame is delivered */
@@ -66,7 +68,8 @@
bool isUnresolved() const { return type != NONE; }
-
+ /** Respond to an error check saying we had no error. */
+ void respondNone(const MemberId&, uint8_t type, SequenceNumber frameSeq);
private:
static const ErrorType NONE = framing::cluster::ERROR_TYPE_NONE;
@@ -78,7 +81,7 @@
Multicaster& mcast;
FrameQueue frames;
MemberSet unresolved;
- uint64_t frameSeq;
+ SequenceNumber frameSeq;
ErrorType type;
Connection* connection;
};
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Jul 31 18:40:26 2009
@@ -65,6 +65,7 @@
$(lib_client) $(lib_broker) $(lib_console)
unit_test_SOURCES= unit_test.cpp unit_test.h \
+ ClientSessionTest.cpp \
BrokerFixture.h SocketProxy.h \
exception_test.cpp \
RefCounted.cpp \
@@ -75,7 +76,6 @@
QueueOptionsTest.cpp \
InlineAllocator.cpp \
InlineVector.cpp \
- ClientSessionTest.cpp \
SequenceSet.cpp \
StringUtils.cpp \
IncompleteMessageList.cpp \
Modified: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp Fri Jul 31 18:40:26 2009
@@ -82,10 +82,31 @@
c.subs.subscribe(c.lq, c.name);
}
+// Handle near-simultaneous errors
+QPID_AUTO_TEST_CASE(testCoincidentErrors) {
+ ClusterFixture cluster(2, updateArgs, -1);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ c0.session.queueDeclare("q", durable=true);
+ {
+ ScopedSuppressLogging allQuiet;
+ async(c0.session).messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception]", "q"));
+ async(c1.session).messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception]", "q"));
+
+ int alive=0;
+ try { Client c00(cluster[0], "c00"); ++alive; } catch (...) {}
+ try { Client c11(cluster[1], "c11"); ++alive; } catch (...) {}
+
+ BOOST_CHECK_EQUAL(alive, 1);
+ }
+}
+
+#if 0 // FIXME aconway 2009-07-30:
// Verify normal cluster-wide errors.
QPID_AUTO_TEST_CASE(testNormalErrors) {
// FIXME aconway 2009-04-10: Would like to put a scope just around
- // the statements expected to fail (in BOOST_CHECK_THROW) but that
+ // the statements expected to fail (in BOOST_CHECK_yTHROW) but that
// sproadically lets out messages, possibly because they're in
// Connection thread.
@@ -96,7 +117,7 @@
{
ScopedSuppressLogging allQuiet;
- queueAndSub(c0);
+ queueAndsub(c0);
c0.session.messageTransfer(content=Message("x", "c0"));
BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
@@ -234,5 +255,5 @@
}
}
#endif
-
+#endif // FIXME aconway 2009-07-30:
QPID_AUTO_TEST_SUITE_END()
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=799687&r1=799686&r2=799687&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Jul 31 18:40:26 2009
@@ -68,7 +68,7 @@
<!-- Check for error consistency across the cluster -->
<control name="error-check" code="0x14">
<field name="type" type="error-type"/>
- <field name="frame-seq" type="uint64"/>
+ <field name="frame-seq" type="sequence-no"/>
</control>
@@ -170,7 +170,7 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
- <field name="frame-seq" type="uint64"/> <!-- frame sequence number -->
+ <field name="frame-seq" type="sequence-no"/> <!-- frame sequence number -->
</control>
<!-- Updater cannot fulfill an update offer. -->
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org