You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/07/08 17:22:37 UTC
svn commit: r674855 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/cluster/Cluster.cpp qpid/cluster/Cpg.cpp qpid/cluster/Cpg.h
tests/ForkedBroker.h tests/cluster_test.cpp
Author: aconway
Date: Tue Jul 8 08:22:37 2008
New Revision: 674855
URL: http://svn.apache.org/viewvc?rev=674855&view=rev
Log:
Removed static Cpg::handlers, fixed ForkedBroker shutdown.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jul 8 08:22:37 2008
@@ -114,6 +114,7 @@
QPID_LOG(trace, *this << " Leaving cluster.");
try {
cpg.leave(name);
+ cpg.shutdown();
dispatcher.join();
}
catch (const std::exception& e) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Jul 8 08:22:37 2008
@@ -32,36 +32,14 @@
using namespace std;
-// Global vector of Cpg pointers by handle.
-// TODO aconway 2007-06-12: Replace this with cpg_get/set_context,
-// coming in in RHEL 5.1.
-class Cpg::Handles
-{
- public:
- void put(cpg_handle_t handle, Cpg::Handler* handler) {
- sys::Mutex::ScopedLock l(lock);
- uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
- if (index >= handles.size())
- handles.resize(index+1, 0);
- handles[index] = handler;
- }
-
- Cpg::Handler* get(cpg_handle_t handle) {
- sys::Mutex::ScopedLock l(lock);
- uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
- assert(index < handles.size());
- assert(handles[index]);
- return handles[index];
- }
-
- private:
- sys::Mutex lock;
- vector<Cpg::Handler*> handles;
-};
-
-Cpg::Handles Cpg::handles;
+Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) {
+ void* cpg=0;
+ check(cpg_context_get(handle, &cpg), "Cannot get CPG instance.");
+ if (!cpg) throw Exception("Cannot get CPG instance.");
+ return reinterpret_cast<Cpg*>(cpg);
+}
-// Global callback functions call per-object callbacks via handles vector.
+// Global callback functions.
void Cpg::globalDeliver (
cpg_handle_t handle,
struct cpg_name *group,
@@ -70,9 +48,7 @@
void* msg,
int msg_len)
{
- Cpg::Handler* handler=handles.get(handle);
- if (handler)
- handler->deliver(handle, group, nodeid, pid, msg, msg_len);
+ cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len);
}
void Cpg::globalConfigChange(
@@ -83,15 +59,13 @@
struct cpg_address *joined, int nJoined
)
{
- Cpg::Handler* handler=handles.get(handle);
- if (handler)
- handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+ cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
}
Cpg::Cpg(Handler& h) : handler(h) {
cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
- handles.put(handle, &handler);
+ check(cpg_context_set(handle, this), "Cannot set CPG context");
QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle);
}
@@ -104,10 +78,10 @@
}
void Cpg::shutdown() {
- if (handles.get(handle)) {
- QPID_LOG(debug, "Finalize CPG handle " << std::hex << handle);
- handles.put(handle, 0);
+ if (handle) {
+ cpg_context_set(handle, 0);
check(cpg_finalize(handle), "Error in shutdown of CPG");
+ handle = 0;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Tue Jul 8 08:22:37 2008
@@ -38,6 +38,8 @@
* Lightweight C++ interface to cpg.h operations.
* Manages a single CPG handle, initialized in ctor, finialzed in destructor.
* On error all functions throw Cpg::Exception
+ *
+ * NOTE: only one at a time can exist per process.
*/
class Cpg : public Dispatchable {
public:
@@ -95,7 +97,7 @@
*/
Cpg(Handler&);
- /** Destructor calls shutdown. */
+ /** Destructor calls shutdown if not already calledx. */
~Cpg();
/** Disconnect from CPG */
@@ -134,22 +136,17 @@
Id self() const;
private:
- class Handles;
- struct ClearHandleOnExit;
- friend class Handles;
- friend struct ClearHandleOnExit;
-
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
static std::string cantLeaveMsg(const Name&);
static std::string cantMcastMsg(const Name&);
static void check(cpg_error_t result, const std::string& msg) {
- // TODO aconway 2007-06-01: Logging and exceptions.
- if (result != CPG_OK)
- throw Exception(errorStr(result, msg));
+ if (result != CPG_OK) throw Exception(errorStr(result, msg));
}
+ static Cpg* cpgFromHandle(cpg_handle_t);
+
static void globalDeliver(
cpg_handle_t handle,
struct cpg_name *group,
@@ -166,7 +163,6 @@
struct cpg_address *joined, int nJoined
);
- static Handles handles;
cpg_handle_t handle;
Handler& handler;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Tue Jul 8 08:22:37 2008
@@ -22,6 +22,7 @@
*
*/
+#include "qpid/Exception.h"
#include "qpid/sys/Fork.h"
#include "qpid/log/Logger.h"
#include "qpid/broker/Broker.h"
@@ -48,7 +49,7 @@
*
*/
class ForkedBroker : public qpid::sys::ForkWithMessage {
- pid_t childPid;
+ pid_t pid;
uint16_t port;
qpid::broker::Broker::Options opts;
std::string prefix;
@@ -56,22 +57,33 @@
public:
struct ChildExit {}; // Thrown in child processes.
- ForkedBroker(const qpid::broker::Broker::Options& opts_, const std::string& prefix_=std::string())
- : childPid(0), port(0), opts(opts_), prefix(prefix_) { fork(); }
-
- ~ForkedBroker() { stop(); }
+ ForkedBroker(const qpid::broker::Broker::Options& opts_=qpid::broker::Broker::Options(),
+ const std::string& prefix_=std::string())
+ : pid(0), port(0), opts(opts_), prefix(prefix_) { fork(); }
+
+ ~ForkedBroker() {
+ try { stop(); }
+ catch(const std::exception& e) {
+ QPID_LOG(error, e.what());
+ }
+ }
void stop() {
- if (childPid > 0) {
- ::kill(childPid, SIGINT);
- ::waitpid(childPid, 0, 0);
+ if (pid > 0) { // I am the parent, clean up children.
+ if (::kill(pid, SIGINT) < 0)
+ throw qpid::Exception(QPID_MSG("Can't kill process " << pid << ": " << qpid::strError(errno)));
+ int status = 0;
+ if (::waitpid(pid, &status, 0) < 0)
+ throw qpid::Exception(QPID_MSG("Waiting for process " << pid << ": " << qpid::strError(errno)));
+ if (WEXITSTATUS(status) != 0)
+ throw qpid::Exception(QPID_MSG("Process " << pid << " exited with status: " << WEXITSTATUS(status)));
}
}
- void parent(pid_t pid) {
- childPid = pid;
+ void parent(pid_t pid_) {
+ pid = pid_;
qpid::log::Logger::instance().setPrefix("parent");
- std::string portStr = wait(2);
+ std::string portStr = wait(5);
port = boost::lexical_cast<uint16_t>(portStr);
}
@@ -88,6 +100,7 @@
// Force exit in the child process, otherwise we will try to
// carry with parent tests.
+ broker.reset(); // Run broker dtor before we exit.
exit(0);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Jul 8 08:22:37 2008
@@ -147,6 +147,16 @@
}
+QPID_AUTO_TEST_CASE(testForkedBroker) {
+ // Verify the ForkedBroker works as expected.
+ Broker::Options opts;
+ opts.auth="no";
+ opts.noDataDir=true;
+ ForkedBroker broker(opts);
+ Client c(broker.getPort());
+ BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType());
+}
+
QPID_AUTO_TEST_CASE(testWiringReplication) {
ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers
Client c0(cluster[0].getPort());