You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/07 22:46:19 UTC
svn commit: r683711 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/cluster/Cpg.cpp qpid/cluster/Cpg.h tests/ForkedBroker.h
Author: aconway
Date: Thu Aug 7 13:46:18 2008
New Revision: 683711
URL: http://svn.apache.org/viewvc?rev=683711&view=rev
Log:
Check CPG flow control.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=683711&r1=683710&r2=683711&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Aug 7 13:46:18 2008
@@ -77,6 +77,46 @@
}
}
+void Cpg::join(const Name& group) {
+ check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
+}
+
+void Cpg::leave(const Name& group) {
+ check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
+}
+
+bool Cpg::isFlowControlEnabled() {
+ cpg_flow_control_state_t flowState;
+ check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
+ return flowState == CPG_FLOW_CONTROL_ENABLED;
+}
+
+// TODO aconway 2008-08-07: better handling of flow control.
+// Wait for flow control to be disabled.
+void Cpg::waitForFlowControl() {
+ int delayNs=1000; // one millisecond
+ int tries=8; // double the delay on each try.
+ while (isFlowControlEnabled() && tries > 0) {
+ QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs << "ns");
+ ::usleep(delayNs);
+ --tries;
+ delayNs *= 2;
+ };
+ if (tries == 0) {
+ // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition.
+ throw Cpg::Exception("CPG flow control enabled, failed to send.");
+ }
+}
+
+void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) {
+ waitForFlowControl();
+ cpg_error_t result;
+ do {
+ result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
+ if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group));
+ } while(result == CPG_ERR_TRY_AGAIN);
+}
+
void Cpg::shutdown() {
if (!isShutdown) {
QPID_LOG(debug,"Shutting down CPG");
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=683711&r1=683710&r2=683711&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Thu Aug 7 13:46:18 2008
@@ -117,19 +117,9 @@
void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
- void join(const Name& group) {
- check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
- };
-
- void leave(const Name& group) {
- check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
- }
-
- void mcast(const Name& group, const iovec* iov, int iovLen) {
- check(cpg_mcast_joined(
- handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen),
- cantMcastMsg(group));
- }
+ void join(const Name& group);
+ void leave(const Name& group);
+ void mcast(const Name& group, const iovec* iov, int iovLen);
cpg_handle_t getHandle() const { return handle; }
@@ -138,8 +128,7 @@
private:
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
- static std::string cantLeaveMsg(const Name&);
- static std::string cantMcastMsg(const Name&);
+ static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&);
static void check(cpg_error_t result, const std::string& msg) {
if (result != CPG_OK) throw Exception(errorStr(result, msg));
@@ -163,6 +152,9 @@
struct cpg_address *joined, int nJoined
);
+ bool isFlowControlEnabled();
+ void waitForFlowControl();
+
cpg_handle_t handle;
Handler& handler;
bool isShutdown;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=683711&r1=683710&r2=683711&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Thu Aug 7 13:46:18 2008
@@ -85,7 +85,7 @@
::close(pipeFds[1]);
FILE* f = ::fdopen(pipeFds[0], "r");
if (!f) throw ErrnoException("fopen failed");
- if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("fscanf failed");
+ if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("ill-formatted port");
}
else { // child
::close(pipeFds[0]);