You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/07 22:46:19 UTC

svn commit: r683711 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/cluster/Cpg.cpp qpid/cluster/Cpg.h tests/ForkedBroker.h

Author: aconway
Date: Thu Aug  7 13:46:18 2008
New Revision: 683711

URL: http://svn.apache.org/viewvc?rev=683711&view=rev
Log:
Check CPG flow control.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=683711&r1=683710&r2=683711&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Aug  7 13:46:18 2008
@@ -77,6 +77,46 @@
     }
 }
 
+void Cpg::join(const Name& group) {
+    check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
+}
+    
+void Cpg::leave(const Name& group) {
+    check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
+}
+
+bool Cpg::isFlowControlEnabled() {
+    cpg_flow_control_state_t flowState;
+    check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
+    return flowState == CPG_FLOW_CONTROL_ENABLED;
+}
+
+// TODO aconway 2008-08-07: better handling of flow control.
+// Wait for flow control to be disabled.
+void Cpg::waitForFlowControl() {
+    int delayNs=1000;           // one millisecond
+    int tries=8;                // double the delay on each try.
+    while (isFlowControlEnabled() && tries > 0) {
+        QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns");
+        ::usleep(delayNs);
+        --tries;
+        delayNs *= 2;
+    };
+    if (tries == 0) {
+        // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition.
+        throw Cpg::Exception("CPG flow control enabled, failed to send.");
+    }
+}
+
+void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) {
+    waitForFlowControl();
+    cpg_error_t result;
+    do {
+        result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
+        if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group));
+    } while(result == CPG_ERR_TRY_AGAIN);
+}
+
 void Cpg::shutdown() {
     if (!isShutdown) {
         QPID_LOG(debug,"Shutting down CPG");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=683711&r1=683710&r2=683711&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Thu Aug  7 13:46:18 2008
@@ -117,19 +117,9 @@
     void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
     void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
 
-    void join(const Name& group) {
-        check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
-    };
-    
-    void leave(const Name& group) {
-        check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
-    }
-
-    void mcast(const Name& group, const iovec* iov, int iovLen) {
-        check(cpg_mcast_joined(
-                  handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen),
-              cantMcastMsg(group));
-    }
+    void join(const Name& group);    
+    void leave(const Name& group);
+    void mcast(const Name& group, const iovec* iov, int iovLen);
 
     cpg_handle_t getHandle() const { return handle; }
 
@@ -138,8 +128,7 @@
   private:
     static std::string errorStr(cpg_error_t err, const std::string& msg);
     static std::string cantJoinMsg(const Name&);
-    static std::string cantLeaveMsg(const Name&);
-    static std::string cantMcastMsg(const Name&);
+    static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&);
     
     static void check(cpg_error_t result, const std::string& msg) {
         if (result != CPG_OK) throw Exception(errorStr(result, msg));
@@ -163,6 +152,9 @@
         struct cpg_address *joined, int nJoined
     );
 
+    bool isFlowControlEnabled();
+    void waitForFlowControl();
+
     cpg_handle_t handle;
     Handler& handler;
     bool isShutdown;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=683711&r1=683710&r2=683711&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Thu Aug  7 13:46:18 2008
@@ -85,7 +85,7 @@
             ::close(pipeFds[1]);
             FILE* f = ::fdopen(pipeFds[0], "r");
             if (!f) throw ErrnoException("fopen failed");
-            if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("fscanf failed");
+            if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("ill-formatted port");
         }
         else {                  // child
             ::close(pipeFds[0]);