You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/06/18 23:25:03 UTC

svn commit: r786294 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/Cluster.cpp src/qpid/cluster/Cluster.h src/qpid/cluster/Connection.cpp src/qpid/cluster/Connection.h src/qpid/cluster/ErrorCheck.cpp src/qpid/cluster/ErrorCheck.h xml/cluster.xml

Author: aconway
Date: Thu Jun 18 21:25:00 2009
New Revision: 786294

URL: http://svn.apache.org/viewvc?rev=786294&view=rev
Log:
Make error-check a cluster-connection control rather than a cluster control.

Fixes bug if an error occurs during update. As cluster controls, error-checks
were being processed out of sequence with the connection data they referred to.
Making them connection controls ensures they are processed in the proper order.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=786294&r1=786293&r2=786294&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Jun 18 21:25:00 2009
@@ -105,7 +105,6 @@
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionAbortBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
-#include "qpid/framing/ClusterErrorCheckBody.h"
 #include "qpid/framing/ClusterReadyBody.h"
 #include "qpid/framing/ClusterShutdownBody.h"
 #include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -134,7 +133,7 @@
 using namespace qpid::sys;
 using namespace std;
 using namespace qpid::cluster;
-using namespace qpid::framing::cluster;
+using namespace qpid::framing::cluster_connection;
 using qpid::management::ManagementAgent;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
@@ -152,7 +151,7 @@
     void configChange(const std::string& current) { cluster.configChange(member, current, l); }
     void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
     void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
-    void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
+
     void shutdown() { cluster.shutdown(member, l); }
 
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -765,16 +764,4 @@
     expiryPolicy->deliverExpire(id);
 }
 
-void Cluster::errorCheck(const MemberId& m, uint8_t type, uint64_t frameSeq, Lock&) {
-    // If we receive an errorCheck here, it's because we  have processed past the point
-    // of the error so respond with ERROR_TYPE_NONE
-    assert(map.getFrameSeq() >= frameSeq);
-    if (type != framing::cluster::ERROR_TYPE_NONE) { // Don't respond to NONE.
-        QPID_LOG(debug, "Error " << frameSeq << " on " << m << " did not occur locally");
-        mcast.mcastControl(
-            ClusterErrorCheckBody(ProtocolVersion(),
-                                  framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
-    }
-}
-
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=786294&r1=786293&r2=786294&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Jun 18 21:25:00 2009
@@ -144,7 +144,6 @@
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& current, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
-    void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&);
     void shutdown(const MemberId&, Lock&);
 
     // Helper functions

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=786294&r1=786293&r2=786294&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Jun 18 21:25:00 2009
@@ -38,6 +38,7 @@
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ConnectionCloseBody.h"
 #include "qpid/framing/ConnectionCloseOkBody.h"
+#include "qpid/framing/ClusterConnectionErrorCheckBody.h"
 #include "qpid/log/Statement.h"
 
 #include <boost/current_function.hpp>
@@ -54,7 +55,7 @@
 namespace cluster {
 
 using namespace framing;
-using namespace framing::cluster;
+using namespace framing::cluster_connection;
 
 qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
 
@@ -444,5 +445,19 @@
     cluster.flagError(*this, ERROR_TYPE_CONNECTION);
 }
 
+void  Connection::errorCheck(uint8_t type, uint64_t frameSeq) {
+    // If we handle an errorCheck at this point (rather than in the
+    // ErrorCheck class) then we have processed succesfully past the
+    // point of the error so respond with ERROR_TYPE_NONE
+    if (type != ERROR_TYPE_NONE) { // Don't respond to NONE.
+        QPID_LOG(debug, cluster << " error " << frameSeq << " on " << *this
+                 << " did not occur locally.");
+        cluster.getMulticast().mcastControl(
+            ClusterConnectionErrorCheckBody(
+                ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
+    }
+}
+
+
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=786294&r1=786293&r2=786294&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Jun 18 21:25:00 2009
@@ -152,6 +152,7 @@
     void giveReadCredit(int credit);
     void abort();
     void deliverClose();
+    void errorCheck(uint8_t type, uint64_t frameSeq);
 
     OutputInterceptor& getOutput() { return output; }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=786294&r1=786293&r2=786294&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Thu Jun 18 21:25:00 2009
@@ -22,7 +22,7 @@
 #include "EventFrame.h"
 #include "ClusterMap.h"
 #include "Cluster.h"
-#include "qpid/framing/ClusterErrorCheckBody.h"
+#include "qpid/framing/ClusterConnectionErrorCheckBody.h"
 #include "qpid/framing/ClusterConfigChangeBody.h"
 #include "qpid/log/Statement.h"
 
@@ -33,7 +33,7 @@
 
 using namespace std;
 using namespace framing;
-using namespace framing::cluster;
+using namespace framing::cluster_connection;
 
 ErrorCheck::ErrorCheck(Cluster& c)
     : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
@@ -55,14 +55,16 @@
     connection = &c;
     QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
              << " error " << frameSeq << " unresolved: " << unresolved);
-    mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
+    mcast.mcastControl(
+        ClusterConnectionErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId());
 }
 
 void ErrorCheck::delivered(const EventFrame& e) {
     if (isUnresolved()) {
-        const ClusterErrorCheckBody* errorCheck = 0;
+        const ClusterConnectionErrorCheckBody* errorCheck = 0;
         if (e.frame.getBody())
-            errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod());
+            errorCheck = dynamic_cast<const ClusterConnectionErrorCheckBody*>(
+                e.frame.getMethod());
         if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
             if (errorCheck->getType() < type) { // my error is worse than his
                 QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=786294&r1=786293&r2=786294&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Thu Jun 18 21:25:00 2009
@@ -48,7 +48,7 @@
 {
   public:
     typedef std::set<MemberId> MemberSet;
-    typedef framing::cluster::ErrorType ErrorType;
+    typedef framing::cluster_connection::ErrorType ErrorType;
     
     ErrorCheck(Cluster&);
 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=786294&r1=786293&r2=786294&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Thu Jun 18 21:25:00 2009
@@ -48,19 +48,6 @@
       <field name="id" type="uint64"/>
     </control>
     
-    <domain name="error-type" type="uint8" label="Types of error">
-      <enum>
-	<choice name="none" value="0"/>
-	<choice name="session" value="1"/>
-	<choice name="connection" value="2"/>
-      </enum>
-    </domain>
-	
-    <control name="error-check" code="0x13">
-      <field name="type" type="error-type"/>
-      <field name="frame-seq" type="uint64"/>
-    </control>
-    
     <control name="shutdown" code="0x20" label="Shut down entire cluster"/>
 
   </class>
@@ -80,6 +67,20 @@
       <field name="limit" type="uint32"/>
     </control>
 
+    <domain name="error-type" type="uint8" label="Types of error">
+      <enum>
+	<choice name="none" value="0"/>
+	<choice name="session" value="1"/>
+	<choice name="connection" value="2"/>
+      </enum>
+    </domain>
+	
+    <!-- Check for error consistency across the cluster -->
+    <control name="error-check" code="0x4">
+      <field name="type" type="error-type"/>
+      <field name="frame-seq" type="uint64"/>
+    </control>
+    
     <!-- Update controls. Sent to a new broker in joining mode.
 	 A connection is updateed as followed:
 	 - open as a normal connection.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org