You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/07/10 17:42:36 UTC
svn commit: r792991 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp
ErrorCheck.cpp ErrorCheck.h
Author: aconway
Date: Fri Jul 10 15:42:36 2009
New Revision: 792991
URL: http://svn.apache.org/viewvc?rev=792991&view=rev
Log:
Fix cluster handling of multiple errors.
If an error occured while there were frames on the error queue from a
previous error, the enqueued frames were not being processed for the
new error, which could lead to error-check or config-change frames
being missed.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=792991&r1=792990&r2=792991&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul 10 15:42:36 2009
@@ -440,7 +440,7 @@
connection->deliveredFrame(e);
}
else
- QPID_LOG(critical, *this << " FIXME DROP (no connection): " << e);
+ QPID_LOG(debug, *this << " DROP (no connection): " << e);
}
else // Drop connection frames while state < CATCHUP
QPID_LOG(trace, *this << " DROP (joining): " << e);
@@ -534,6 +534,7 @@
void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
bool memberChange = map.configChange(current);
+ QPID_LOG(debug, *this << " applied config change: " << map);
if (state == LEFT) return;
if (!map.isAlive(self)) { // Final config change.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=792991&r1=792990&r2=792991&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Fri Jul 10 15:42:36 2009
@@ -39,7 +39,7 @@
: cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
{}
-ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) {
copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
return o;
}
@@ -60,45 +60,58 @@
<< " (unresolved: " << unresolved << ")");
mcast.mcastControl(
ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
+ // If there are already frames queued up by a previous error, review
+ // them with respect to this new error.
+ for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i))
+ ;
}
void ErrorCheck::delivered(const EventFrame& e) {
+ FrameQueue::iterator i = frames.insert(frames.end(), e);
+ review(i);
+}
+
+// Review a frame in the queue with respect to the current error.
+ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) {
+ FrameQueue::iterator next = i+1;
if (isUnresolved()) {
const ClusterErrorCheckBody* errorCheck = 0;
- if (e.frame.getBody())
+ if (i->frame.getBody())
errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(
- e.frame.getMethod());
+ i->frame.getMethod());
if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ next = frames.erase(i); // Drop matching error check controls
if (errorCheck->getType() < type) { // my error is worse than his
QPID_LOG(critical, cluster << " error " << frameSeq
- << " did not occur on " << e.getMemberId());
+ << " did not occur on " << i->getMemberId());
throw Exception("Aborted by local failure that did not occur on all replicas");
}
else { // his error is worse/same as mine.
QPID_LOG(debug, cluster << " error " << frameSeq
- << " outcome agrees with " << e.getMemberId());
- unresolved.erase(e.getMemberId());
+ << " outcome agrees with " << i->getMemberId());
+ unresolved.erase(i->getMemberId());
checkResolved();
}
}
else {
- frames.push_back(e); // Only drop matching errorCheck controls.
const ClusterConfigChangeBody* configChange = 0;
- if (e.frame.getBody())
- configChange = dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+ if (i->frame.getBody())
+ configChange = dynamic_cast<const ClusterConfigChangeBody*>(i->frame.getMethod());
if (configChange) {
MemberSet members(ClusterMap::decode(configChange->getCurrent()));
- MemberSet result;
+ QPID_LOG(debug, cluster << " apply config change to unresolved: "
+ << members);
+
+ MemberSet intersect;
set_intersection(members.begin(), members.end(),
unresolved.begin(), unresolved.end(),
- inserter(result, result.begin()));
- unresolved.swap(result);
+ inserter(intersect, intersect.begin()));
+ unresolved.swap(intersect);
checkResolved();
}
}
}
- else
- frames.push_back(e);
+ return next;
}
void ErrorCheck::checkResolved() {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=792991&r1=792990&r2=792991&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Fri Jul 10 15:42:36 2009
@@ -41,8 +41,8 @@
/**
* Error checking logic.
*
- * When an error occurs stop processing frames and queue them until we
- * can determine if all nodes experienced the error. If not, we shut down.
+ * When an error occurs queue up frames until we can determine if all
+ * nodes experienced the error. If not, we shut down.
*/
class ErrorCheck
{
@@ -59,18 +59,22 @@
/** Called when a frame is delivered */
void delivered(const EventFrame&);
+ /**@pre canProcess **/
EventFrame getNext();
bool canProcess() const;
+
bool isUnresolved() const;
private:
+ typedef std::deque<EventFrame> FrameQueue;
+ FrameQueue::iterator review(const FrameQueue::iterator&);
void checkResolved();
Cluster& cluster;
Multicaster& mcast;
- std::deque<EventFrame> frames;
- std::set<MemberId> unresolved;
+ FrameQueue frames;
+ MemberSet unresolved;
uint64_t frameSeq;
ErrorType type;
Connection* connection;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org