You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/07/10 17:42:36 UTC

svn commit: r792991 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp ErrorCheck.cpp ErrorCheck.h

Author: aconway
Date: Fri Jul 10 15:42:36 2009
New Revision: 792991

URL: http://svn.apache.org/viewvc?rev=792991&view=rev
Log:
Fix cluster handling of multiple errors.

If an error occured while there were frames on the error queue from a
previous error, the enqueued frames were not being processed for the
new error, which could lead to error-check or config-change frames
being missed.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=792991&r1=792990&r2=792991&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul 10 15:42:36 2009
@@ -440,7 +440,7 @@
             connection->deliveredFrame(e);
         }
         else
-            QPID_LOG(critical, *this << " FIXME DROP (no connection): " << e);
+            QPID_LOG(debug, *this << " DROP (no connection): " << e);
     }
     else // Drop connection frames while state < CATCHUP
         QPID_LOG(trace, *this << " DROP (joining): " << e);
@@ -534,6 +534,7 @@
 
 void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
     bool memberChange = map.configChange(current);
+    QPID_LOG(debug, *this << " applied config change: " << map);
     if (state == LEFT) return;
     
     if (!map.isAlive(self)) {  // Final config change.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=792991&r1=792990&r2=792991&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Fri Jul 10 15:42:36 2009
@@ -39,7 +39,7 @@
     : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
 {}
 
-ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) {
     copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
     return o;
 }
@@ -60,45 +60,58 @@
              << " (unresolved: " << unresolved << ")");
     mcast.mcastControl(
         ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
+    // If there are already frames queued up by a previous error, review
+    // them with respect to this new error.
+    for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i))
+        ;
 }
 
 void ErrorCheck::delivered(const EventFrame& e) {
+    FrameQueue::iterator i = frames.insert(frames.end(), e);
+    review(i);
+}
+
+// Review a frame in the queue with respect to the current error.
+ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) {
+    FrameQueue::iterator next = i+1;
     if (isUnresolved()) {
         const ClusterErrorCheckBody* errorCheck = 0;
-        if (e.frame.getBody())
+        if (i->frame.getBody())
             errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(
-                e.frame.getMethod());
+                i->frame.getMethod());
         if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+            next = frames.erase(i);    // Drop matching error check controls
             if (errorCheck->getType() < type) { // my error is worse than his
                 QPID_LOG(critical, cluster << " error " << frameSeq
-                         << " did not occur on " << e.getMemberId());
+                         << " did not occur on " << i->getMemberId());
                 throw Exception("Aborted by local failure that did not occur on all replicas");
             }
             else {              // his error is worse/same as mine.
                 QPID_LOG(debug, cluster << " error " << frameSeq
-                         << " outcome agrees with " << e.getMemberId());
-                unresolved.erase(e.getMemberId());
+                         << " outcome agrees with " << i->getMemberId());
+                unresolved.erase(i->getMemberId());
                 checkResolved();
             }
         }
         else {
-            frames.push_back(e); // Only drop matching errorCheck controls.
             const ClusterConfigChangeBody* configChange = 0;
-            if (e.frame.getBody())
-                configChange = dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+            if (i->frame.getBody())
+                configChange = dynamic_cast<const ClusterConfigChangeBody*>(i->frame.getMethod());
             if (configChange) {
                 MemberSet members(ClusterMap::decode(configChange->getCurrent()));
-                MemberSet result;
+                QPID_LOG(debug, cluster << " apply config change to unresolved: "
+                         << members);
+
+                MemberSet intersect;
                 set_intersection(members.begin(), members.end(),
                                  unresolved.begin(), unresolved.end(),
-                                 inserter(result, result.begin()));
-                unresolved.swap(result);
+                                 inserter(intersect, intersect.begin()));
+                unresolved.swap(intersect);
                 checkResolved();
             }
         }
     }
-    else 
-        frames.push_back(e);
+    return next;
 }
 
 void ErrorCheck::checkResolved() {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=792991&r1=792990&r2=792991&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Fri Jul 10 15:42:36 2009
@@ -41,8 +41,8 @@
 /**
  * Error checking logic.
  * 
- * When an error occurs stop processing frames and queue them until we
- * can determine if all nodes experienced the error. If not, we shut down.
+ * When an error occurs queue up frames until we can determine if all
+ * nodes experienced the error. If not, we shut down.
  */
 class ErrorCheck
 {
@@ -59,18 +59,22 @@
     /** Called when a frame is delivered */
     void delivered(const EventFrame&);
 
+    /**@pre canProcess **/
     EventFrame getNext();
 
     bool canProcess() const;
+
     bool isUnresolved() const;
     
   private:
+    typedef std::deque<EventFrame>  FrameQueue;
+    FrameQueue::iterator review(const FrameQueue::iterator&);
     void checkResolved();
     
     Cluster& cluster;
     Multicaster& mcast;
-    std::deque<EventFrame> frames;
-    std::set<MemberId> unresolved;
+    FrameQueue frames;
+    MemberSet unresolved;
     uint64_t frameSeq;
     ErrorType type;
     Connection* connection;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org