You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/12 20:07:48 UTC
svn commit: r694758 - in /incubator/qpid/trunk/qpid/cpp: src/
src/qpid/cluster/ src/qpid/sys/ src/tests/ xml/
Author: aconway
Date: Fri Sep 12 11:07:47 2008
New Revision: 694758
URL: http://svn.apache.org/viewvc?rev=694758&view=rev
Log:
Added ClusterMap and test. Moved PollableCondition, PollableQueue to sys.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Sep 12 11:07:47 2008
@@ -26,7 +26,9 @@
qpid/cluster/Event.h \
qpid/cluster/Event.cpp \
qpid/cluster/DumpClient.h \
- qpid/cluster/DumpClient.cpp
+ qpid/cluster/DumpClient.cpp \
+ qpid/cluster/ClusterMap.h \
+ qpid/cluster/ClusterMap.cpp
cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Sep 12 11:07:47 2008
@@ -25,7 +25,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterJoiningBody.h"
+#include "qpid/framing/ClusterUrlNoticeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -50,7 +50,7 @@
Cluster& cluster;
MemberId member;
ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
- void joining(const std::string& u) { cluster.joining (member, u); }
+ void urlNotice(const std::string& u) { cluster.urlNotice(member, u); }
void ready() { cluster.ready(member); }
void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) {
@@ -58,6 +58,11 @@
}
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
+
+ virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) {
+ // FIXME aconway 2008-09-12: TODO
+ }
+
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -72,13 +77,14 @@
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1)))
+ connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
+ state(DISCARD)
{
QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
cpg.join(name);
- deliverQueue.start(poller);
+ connectionEventQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
}
@@ -103,6 +109,7 @@
void Cluster::leave() {
QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str());
cpg.leave(name);
+ // Cluster will shut down in configChange when the cluster knows we've left.
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -172,8 +179,23 @@
{
try {
MemberId from(nodeid, pid);
- QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10:
- deliverQueue.push(Event::delivered(from, msg, msg_len));
+ Event e = Event::delivered(from, msg, msg_len);
+ QPID_LOG(trace, "Cluster deliver: " << e);
+
+ // Process cluster controls immediately
+ if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control
+ Buffer buf(e);
+ AMQFrame frame;
+ while (frame.decode(buf))
+ if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
+ throw Exception("Invalid cluster control");
+ }
+ else { // Process connection controls & data via the connectionEventQueue.
+ if (state != DISCARD) {
+ e.setConnection(getConnection(e.getConnectionId()));
+ connectionEventQueue.push(e);
+ }
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -183,24 +205,15 @@
}
}
-void Cluster::deliverEvent(const Event& e) {
- QPID_LOG(trace, "Delivered: " << e);
+void Cluster::connectionEvent(const Event& e) {
Buffer buf(e);
- if (e.getConnection().getConnectionPtr() == 0) { // Cluster control
+ assert(e.getConnection());
+ if (e.getType() == DATA)
+ e.getConnection()->deliverBuffer(buf);
+ else { // control
AMQFrame frame;
- while (frame.decode(buf))
- if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame))
- throw Exception("Invalid cluster control");
- }
- else { // Connection data or control
- boost::intrusive_ptr<Connection> c = getConnection(e.getConnection());
- if (e.getType() == DATA)
- c->deliverBuffer(buf);
- else { // control
- AMQFrame frame;
- while (frame.decode(buf))
- c->deliver(frame);
- }
+ while (frame.decode(buf))
+ e.getConnection()->deliver(frame);
}
}
@@ -239,7 +252,7 @@
if (nJoined) // Notfiy new members of my URL.
mcastFrame(
- AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())),
+ AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
ConnectionId(self,0));
if (find(left, left+nLeft, self) != left+nLeft) {
@@ -266,8 +279,15 @@
broker.shutdown();
}
-void Cluster::joining(const MemberId& m, const string& url) {
+void Cluster::urlNotice(const MemberId& m, const string& url) {
+ //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also.
+ //FIXME aconway 2008-09-11: Note multiple meanings of my own notice -
+ //from DISCARD->STALL and from STALL->READY via map.
+
QPID_LOG(info, "Cluster member " << m << " has URL " << url);
+ // My brain dump is up to this point, stall till it is complete.
+ if (m == self && state == DISCARD)
+ state = STALL;
urls.insert(UrlMap::value_type(m,Url(url)));
}
@@ -289,4 +309,18 @@
broker::Broker& Cluster::getBroker(){ return broker; }
+void Cluster::stall() {
+ // Stop processing connection events. We still process config changes
+ // and cluster controls in deliver()
+
+ // FIXME aconway 2008-09-11: Flow control, we should slow down or
+ // stop reading from local connections while stalled to avoid an
+ // unbounded queue.
+ connectionEventQueue.stop();
+}
+
+void Cluster::unStall() {
+ connectionEventQueue.start(poller);
+}
+
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Sep 12 11:07:47 2008
@@ -75,11 +75,14 @@
/** Leave the cluster */
void leave();
- void joining(const MemberId&, const std::string& url);
+ void urlNotice(const MemberId&, const std::string& url);
void ready(const MemberId&);
MemberId getSelf() const { return self; }
+ void stall();
+ void unStall();
+
void shutdown();
broker::Broker& getBroker();
@@ -88,15 +91,13 @@
typedef std::map<MemberId, Url> UrlMap;
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
typedef sys::PollableQueue<Event> EventQueue;
+ enum State {
+ DISCARD, // Initially discard connection events up to my own join message.
+ READY, // Normal processing.
+ STALL // Stalled while a new member joins.
+ };
- boost::function<void()> shutdownNext;
-
- /** Handle a delivered frame */
- void deliverFrame(framing::AMQFrame&, const ConnectionId&);
-
- void deliverBuffer(const char*, size_t, const ConnectionId&);
-
- void deliverEvent(const Event&);
+ void connectionEvent(const Event&);
/** CPG deliver callback. */
void deliver(
@@ -136,7 +137,8 @@
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- EventQueue deliverQueue;
+ EventQueue connectionEventQueue;
+ State state;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=694758&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Fri Sep 12 11:07:47 2008
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClusterMap.h"
+#include "qpid/Url.h"
+#include "qpid/framing/FieldTable.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+#include <functional>
+
+namespace qpid {
+using namespace framing;
+
+namespace cluster {
+
+ClusterMap::ClusterMap() {}
+
+MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) {
+ if (isMember(id)) return MemberId(); // Ignore notices from established members.
+ if (isDumpee(id)) {
+ // Dumpee caught up, graduate to member with new URL and remove dumper from list.
+ dumpees.erase(id);
+ members[id] = url;
+ }
+ else if (members.empty()) {
+ // First in cluster, congratulations!
+ members[id] = url;
+ }
+ else {
+ // New member needs brain dump.
+ MemberId dumper = nextDumper();
+ Dumpee& d = dumpees[id];
+ d.url = url;
+ d.dumper = dumper;
+ return dumper;
+ }
+ return MemberId();
+}
+
+MemberId ClusterMap::nextDumper() const {
+ // Choose the first member in member-id order of the group that
+ // has the least number of dumps-in-progress.
+ assert(!members.empty());
+ MemberId dumper = members.begin()->first;
+ int minDumps = dumps(dumper);
+ MemberMap::const_iterator i = ++members.begin();
+ while (i != members.end()) {
+ int d = dumps(i->first);
+ if (d < minDumps) {
+ minDumps = d;
+ dumper = i->first;
+ }
+ ++i;
+ }
+ return dumper;
+}
+
+void ClusterMap::leave(const MemberId& id) {
+ if (isDumpee(id))
+ dumpees.erase(id);
+ if (isMember(id)) {
+ members.erase(id);
+ DumpeeMap::iterator i = dumpees.begin();
+ while (i != dumpees.end()) {
+ if (i->second.dumper == id) dumpees.erase(i++);
+ else ++i;
+ }
+ }
+}
+
+struct ClusterMap::MatchDumper {
+ MemberId d;
+ MatchDumper(const MemberId& i) : d(i) {}
+ bool operator()(const DumpeeMap::value_type& v) const { return v.second.dumper == d; }
+};
+
+int ClusterMap::dumps(const MemberId& id) const {
+ return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id));
+}
+
+void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); }
+
+framing::ClusterMapBody ClusterMap::toControl() const {
+ framing::ClusterMapBody b;
+ for (MemberMap::const_iterator i = members.begin(); i != members.end(); ++i)
+ b.getMembers().setString(i->first.str(), i->second.str());
+ for (DumpeeMap::const_iterator i = dumpees.begin(); i != dumpees.end(); ++i) {
+ b.getDumpees().setString(i->first.str(), i->second.url.str());
+ b.getDumps().setString(i->first.str(), i->second.dumper.str());
+ }
+ return b;
+}
+
+void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
+ *this = ClusterMap(); // Reset any current contents.
+ FieldTable::ValueMap::const_iterator i;
+ for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i)
+ members[i->first] = Url(i->second->get<std::string>());
+ for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i)
+ dumpees[i->first].url = Url(i->second->get<std::string>());
+ for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i)
+ dumpees[i->first].dumper = MemberId(i->second->get<std::string>());
+}
+
+}} // namespace qpid::cluster
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=694758&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Fri Sep 12 11:07:47 2008
@@ -0,0 +1,86 @@
+#ifndef QPID_CLUSTER_CLUSTERMAP_H
+#define QPID_CLUSTER_CLUSTERMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/framing/ClusterMapBody.h"
+#include "qpid/Url.h"
+#include <boost/optional.hpp>
+#include <vector>
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Map of established cluster members and brain-dumps in progress.
+ * A dumper is an established member that is sending catch-up data.
+ * A dumpee is an aspiring member that is receiving catch-up data.
+ */
+class ClusterMap
+{
+ public:
+ ClusterMap();
+
+ /** Update map for url-notice event.
+ *@param from Member that sent the notice.
+ *@param url URL for from.
+ *@return MemberId of member that should dump to URL, or a null
+ * MemberId() if no dump is needed.
+ */
+ MemberId urlNotice(const MemberId& from, const Url& url);
+
+ /** Dump failed notice */
+ void dumpFailed(const MemberId&);
+
+ /** Update map for leave event */
+ void leave(const MemberId&);
+
+ /** Number of unfinished dumps for member. */
+ int dumps(const MemberId&) const;
+
+ /** Convert map contents to a cluster control body. */
+ framing::ClusterMapBody toControl() const;
+
+ /** Initialize map contents from a cluster control body. */
+ void fromControl(const framing::ClusterMapBody&);
+
+ size_t memberCount() const { return members.size(); }
+ size_t dumpeeCount() const { return dumpees.size(); }
+ bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
+ bool isDumpee(const MemberId& id) const { return dumpees.find(id) != dumpees.end(); }
+
+ private:
+ struct Dumpee { Url url; MemberId dumper; };
+ typedef std::map<MemberId, Url> MemberMap;
+ typedef std::map<MemberId, Dumpee> DumpeeMap;
+ struct MatchDumper;
+
+ MemberId nextDumper() const;
+
+ MemberMap members;
+ DumpeeMap dumpees;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CLUSTERMAP_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Sep 12 11:07:47 2008
@@ -76,7 +76,6 @@
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified.
- QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin.");
cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Sep 12 11:07:47 2008
@@ -19,6 +19,7 @@
*
*/
#include "Connection.h"
+#include "Cluster.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/Invoker.h"
#include "qpid/framing/AllInvoker.h"
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Sep 12 11:07:47 2008
@@ -23,9 +23,9 @@
*/
#include "types.h"
-#include "Cluster.h"
#include "WriteEstimate.h"
#include "OutputInterceptor.h"
+#include "NoOpConnectionOutputHandler.h"
#include "qpid/broker/Connection.h"
#include "qpid/amqp_0_10/Connection.h"
@@ -39,6 +39,7 @@
namespace framing { class AMQFrame; }
namespace cluster {
+class Cluster;
/** Intercept broker::Connection calls for shadow and local cluster connections. */
class Connection :
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Fri Sep 12 11:07:47 2008
@@ -20,6 +20,7 @@
*/
#include "ConnectionCodec.h"
#include "Connection.h"
+#include "Cluster.h"
#include "ProxyInputHandler.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Sep 12 11:07:47 2008
@@ -183,4 +183,15 @@
return o << c.first << "-" << c.second;
}
+std::string MemberId::str() const {
+ char s[8];
+ reinterpret_cast<uint32_t&>(s[0]) = htonl(first);
+ reinterpret_cast<uint32_t&>(s[4]) = htonl(second);
+ return std::string(s,8);
+}
+
+MemberId::MemberId(const std::string& s) {
+ first = ntohl(reinterpret_cast<const uint32_t&>(s[0]));
+ second = ntohl(reinterpret_cast<const uint32_t&>(s[4]));
+}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Fri Sep 12 11:07:47 2008
@@ -34,7 +34,7 @@
const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t);
Event::Event(EventType t, const ConnectionId c, const size_t s)
- : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {}
+ : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)) {}
Event Event::delivered(const MemberId& m, void* d, size_t s) {
Buffer buf(static_cast<char*>(d), s);
@@ -50,7 +50,7 @@
char header[OVERHEAD];
Buffer b(header, OVERHEAD);
b.putOctet(type);
- b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr()));
+ b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getConnectionPtr()));
iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } };
cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
}
@@ -61,7 +61,7 @@
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
std::ostream& operator << (std::ostream& o, const Event& e) {
- o << "[event: " << e.getConnection()
+ o << "[event: " << e.getConnectionId()
<< " " << EVENT_TYPE_NAMES[e.getType()]
<< " " << e.getSize() << " bytes: ";
std::ostream_iterator<char> oi(o,"");
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Fri Sep 12 11:07:47 2008
@@ -24,6 +24,7 @@
#include "types.h"
#include "Cpg.h"
+#include "Connection.h"
#include "qpid/RefCountedBuffer.h"
#include "qpid/framing/Buffer.h"
#include <iosfwd>
@@ -39,7 +40,7 @@
* Events are sent to/received from the cluster.
* Refcounted so they can be stored on queues.
*/
-struct Event {
+class Event {
public:
/** Create an event to mcast with a buffer of size bytes. */
Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t size=0);
@@ -50,17 +51,21 @@
void mcast(const Cpg::Name& name, Cpg& cpg) const;
EventType getType() const { return type; }
- ConnectionId getConnection() const { return connection; }
+ ConnectionId getConnectionId() const { return connectionId; }
size_t getSize() const { return size; }
char* getData() { return data; }
const char* getData() const { return data; }
+ boost::intrusive_ptr<Connection> getConnection() const { return connection; }
+ void setConnection(const boost::intrusive_ptr<Connection>& c) { connection=c; }
+
operator framing::Buffer() const;
private:
static const size_t OVERHEAD;
EventType type;
- ConnectionId connection;
+ ConnectionId connectionId;
+ boost::intrusive_ptr<Connection> connection;
size_t size;
RefCountedBuffer::pointer data;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Fri Sep 12 11:07:47 2008
@@ -20,6 +20,7 @@
*/
#include "OutputInterceptor.h"
#include "Connection.h"
+#include "Cluster.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Fri Sep 12 11:07:47 2008
@@ -23,6 +23,8 @@
*/
#include <utility>
#include <iosfwd>
+#include <string>
+
#include <stdint.h>
extern "C" {
@@ -39,10 +41,15 @@
/** first=node-id, second=pid */
struct MemberId : std::pair<uint32_t, uint32_t> {
- MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
+ explicit MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {}
+ MemberId(const std::string&); // Decode from string.
uint32_t getNode() const { return first; }
uint32_t getPid() const { return second; }
+ operator bool() const { return first || second; }
+
+ // Encode as string, network byte order.
+ std::string str() const;
};
inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id == MemberId(caddr); }
@@ -55,6 +62,13 @@
Connection* getConnectionPtr() const { return second; }
};
+/** State of a cluster member */
+enum State {
+ DISCARD, // Initially discard connection events up to my own join message.
+ STALL, // All members stall while a new member joins.
+ READY // Normal processing.
+};
+
std::ostream& operator<<(std::ostream&, const ConnectionId&);
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Fri Sep 12 11:07:47 2008
@@ -24,7 +24,7 @@
#include "qpid/sys/PollableCondition.h"
#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <algorithm>
@@ -54,58 +54,84 @@
/** Callback to process a range of items. */
typedef boost::function<void (const iterator&, const iterator&)> Callback;
- /** Functor tempalate to create a Callback from a functor that handles a single item. */
+ /** @see forEach() */
template <class F> struct ForEach {
F handleOne;
ForEach(const F& f) : handleOne(f) {}
void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); }
};
- /** Function to create ForEach instances */
+
+ /** Create a range callback from a functor that processes a single item. */
template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); }
/** When the queue is selected by the poller, values are passed to callback cb. */
explicit PollableQueue(const Callback& cb);
/** Push a value onto the queue. Thread safe */
- void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); }
+ void push(const T& t);
/** Start polling. */
- void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); }
+ void start(const boost::shared_ptr<sys::Poller>& poller);
- /** Stop polling. */
- void stop() { handle.stopWatch(); }
+ /** Stop polling and wait for the current callback, if any, to complete. */
+ void stop();
private:
- typedef sys::Mutex::ScopedLock ScopedLock;
- typedef sys::Mutex::ScopedUnlock ScopedUnlock;
+ typedef sys::Monitor::ScopedLock ScopedLock;
+ typedef sys::Monitor::ScopedUnlock ScopedUnlock;
void dispatch(sys::DispatchHandle&);
- sys::Mutex lock;
+ sys::Monitor lock;
Callback callback;
PollableCondition condition;
sys::DispatchHandle handle;
Queue queue;
Queue batch;
+ bool dispatching, stopped;
};
template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12:
: callback(cb),
- handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0)
+ handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0),
+ dispatching(false), stopped(true)
{}
+template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) {
+ ScopedLock l(lock);
+ stopped = false;
+ handle.startWatch(poller);
+}
+
+template <class T> void PollableQueue<T>::push(const T& t) {
+ ScopedLock l(lock);
+ queue.push_back(t);
+ condition.set();
+}
+
template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
- ScopedLock l(lock); // Lock for concurrent push()
- batch.clear();
- batch.swap(queue);
+ ScopedLock l(lock);
+ if (stopped) return;
+ dispatching = true;
condition.clear();
+ batch.clear();
+ batch.swap(queue); // Snapshot of current queue contents.
{
// Process outside the lock to allow concurrent push.
ScopedUnlock u(lock);
callback(batch.begin(), batch.end());
- h.rewatch();
}
batch.clear();
+ dispatching = false;
+ if (stopped) lock.notifyAll();
+ else h.rewatch();
+}
+
+template <class T> void PollableQueue<T>::stop() {
+ ScopedLock l(lock);
+ handle.stopWatch();
+ stopped = true;
+ while (dispatching) lock.wait();
}
}} // namespace qpid::sys
Added: incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp?rev=694758&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp Fri Sep 12 11:07:47 2008
@@ -0,0 +1,191 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/cluster/ClusterMap.h"
+#include "qpid/framing/ClusterMapBody.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/Url.h"
+#include <boost/assign.hpp>
+
+QPID_AUTO_TEST_SUITE(CluterMapTest)
+
+using namespace std;
+using namespace qpid;
+using namespace cluster;
+using namespace framing;
+
+MemberId id(int i) { return MemberId(i,i); }
+
+Url url(const char* host) { return Url(TcpAddress(host)); }
+
+QPID_AUTO_TEST_CASE(testNotice) {
+ ClusterMap m;
+ BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); // id(0) member, no dump.
+ BOOST_CHECK(m.isMember(id(0)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK_EQUAL(m.memberCount(), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); // Newbie, needs dump
+ BOOST_CHECK(m.isMember(id(0)));
+ BOOST_CHECK(m.isDumpee(id(1)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
+ BOOST_CHECK_EQUAL(m.memberCount(), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); // id(1) is ready.
+ BOOST_CHECK(m.isMember(id(0)));
+ BOOST_CHECK(m.isMember(id(1)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); // id(2) needs dump
+ BOOST_CHECK(m.isDumpee(id(2)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ BOOST_CHECK_EQUAL(id(1), m.urlNotice(id(3), url("3-dump"))); // 0 busy, dump to id(1).
+ BOOST_CHECK(m.isDumpee(id(3)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 2);
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); // Equally busy, 0 is first on list.
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 2);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 3);
+
+ // My dumpees both complete
+ BOOST_CHECK(!m.urlNotice(id(2), url("2-ready")));
+ BOOST_CHECK(!m.urlNotice(id(4), url("4-ready")));
+ BOOST_CHECK(m.isMember(id(2)));
+ BOOST_CHECK(m.isMember(id(4)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ // Final dumpee completes.
+ BOOST_CHECK(!m.urlNotice(id(3), url("3-ready")));
+ BOOST_CHECK(m.isMember(id(3)));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+}
+
+QPID_AUTO_TEST_CASE(testLeave) {
+ ClusterMap m;
+ BOOST_CHECK(!m.urlNotice(id(0), url("0-ready")));
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump")));
+ BOOST_CHECK(!m.urlNotice(id(1), url("1-ready")));
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump")));
+ BOOST_CHECK(!m.urlNotice(id(2), url("2-ready")));
+ BOOST_CHECK_EQUAL(m.memberCount(), 3);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ m.leave(id(1));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+ BOOST_CHECK(m.isMember(id(0)));
+ BOOST_CHECK(m.isMember(id(2)));
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump")));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK(m.isDumpee(id(4)));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ m.dumpFailed(id(4)); // Dumper detected a failure.
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK(!m.isDumpee(id(4)));
+ BOOST_CHECK(!m.isMember(id(4)));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ m.leave(id(4)); // Dumpee leaves, no-op since we already know it failed.
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(5), url("5-dump")));
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+ BOOST_CHECK(m.isDumpee(id(5)));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+ m.leave(id(5)); // Dumpee detects failure and leaves cluster.
+ BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+ BOOST_CHECK(!m.isDumpee(id(5)));
+ BOOST_CHECK(!m.isMember(id(5)));
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+ m.dumpFailed(id(5)); // Dumper reports failure - no op, we already know.
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+}
+
+QPID_AUTO_TEST_CASE(testToControl) {
+ ClusterMap m;
+ m.urlNotice(id(0), url("0"));
+ m.urlNotice(id(1), url("1dump"));
+ m.urlNotice(id(1), url("1"));
+ m.urlNotice(id(2), url("2dump"));
+ m.urlNotice(id(3), url("3dump"));
+ m.urlNotice(id(4), url("4dump"));
+
+ BOOST_CHECK_EQUAL(m.memberCount(), 2);
+ BOOST_CHECK_EQUAL(m.dumpeeCount(), 3);
+
+ ClusterMapBody b = m.toControl();
+
+ BOOST_CHECK_EQUAL(b.getMembers().count(), 2);
+ BOOST_CHECK_EQUAL(b.getMembers().getString(id(0).str()), url("0").str());
+ BOOST_CHECK_EQUAL(b.getMembers().getString(id(1).str()), url("1").str());
+
+ BOOST_CHECK_EQUAL(b.getDumpees().count(), 3);
+ BOOST_CHECK_EQUAL(b.getDumpees().getString(id(2).str()), url("2dump").str());
+ BOOST_CHECK_EQUAL(b.getDumpees().getString(id(3).str()), url("3dump").str());
+ BOOST_CHECK_EQUAL(b.getDumpees().getString(id(4).str()), url("4dump").str());
+
+ BOOST_CHECK_EQUAL(b.getDumps().count(), 3);
+ BOOST_CHECK_EQUAL(b.getDumps().getString(id(2).str()), id(0).str());
+ BOOST_CHECK_EQUAL(b.getDumps().getString(id(3).str()), id(1).str());
+ BOOST_CHECK_EQUAL(b.getDumps().getString(id(4).str()), id(0).str());
+
+ std::string s(b.size(), '\0');
+ Buffer buf(&s[0], s.size());
+ b.encode(buf);
+
+ ClusterMap m2;
+ m2.fromControl(b);
+ ClusterMapBody b2 = m2.toControl();
+ std::string s2(b2.size(), '\0');
+ Buffer buf2(&s2[0], s2.size());
+ b2.encode(buf2);
+
+ // Verify a round-trip encoding produces identical results.
+ BOOST_CHECK_EQUAL(s,s2);
+}
+
+QPID_AUTO_TEST_SUITE_END()
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Sep 12 11:07:47 2008
@@ -28,7 +28,7 @@
TESTS+=unit_test
check_PROGRAMS+=unit_test
unit_test_LDADD=-lboost_unit_test_framework -lboost_regex \
- $(lib_client) $(lib_broker) # $(lib_amqp_0_10)
+ $(lib_client) $(lib_broker)
unit_test_SOURCES= unit_test.cpp unit_test.h \
BrokerFixture.h SocketProxy.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Fri Sep 12 11:07:47 2008
@@ -18,4 +18,7 @@
cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
+unit_test_SOURCES+=ClusterMapTest.cpp
+unit_test_LDADD+=$(lib_cluster)
+
endif
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Sep 12 11:07:47 2008
@@ -59,12 +59,19 @@
using qpid::cluster::Cluster;
using qpid::cluster::getGlobalCluster;
+/** Parse broker & cluster options */
+Broker::Options parseOpts(size_t argc, const char* argv[]) {
+ Broker::Options opts;
+ Plugin::addOptions(opts); // Pick up cluster options.
+ opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
+ return opts;
+}
+
/** Cluster fixture is a vector of ports for the replicas.
* Replica 0 is in the current process, all others are forked as children.
*/
struct ClusterFixture : public vector<uint16_t> {
string name;
- Broker::Options opts;
std::auto_ptr<BrokerFixture> broker0;
boost::ptr_vector<ForkedBroker> forkedBrokers;
@@ -96,7 +103,7 @@
const char* argv[] = {
"qpidd " __FILE__ ,
- "--load-module=../.libs/libqpidcluster.so",
+ "--load-module=../.libs/cluster.so",
"--cluster-name", name.c_str(),
"--auth=no", "--no-data-dir",
"--log-prefix", prefix.c_str(),
@@ -108,11 +115,8 @@
push_back(forkedBrokers.back().getPort());
}
else { // First broker, run in this process.
- Broker::Options opts;
qpid::log::Logger::instance().setPrefix("main");
- Plugin::addOptions(opts); // Pick up cluster options.
- opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
- broker0.reset(new BrokerFixture(opts));
+ broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
push_back(broker0->getPort());
}
}
@@ -136,38 +140,16 @@
return o;
}
-#if 0 // FIXME aconway 2008-09-10: finish & enable
-QPID_AUTO_TEST_CASE(testDumpConsumers) {
- ClusterFixture cluster(1);
- Client a(cluster[0]);
- a.session.queueDeclare("q");
- a.subs.subscribe(a.lq, "q");
-
- cluster.add();
- Client b(cluster[1]);
- try {
- b.connection.newSession(a.session.getId().getName());
- BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
- } catch (const SessionBusyException&) {}
-
- // Transfer some messages to the subscription by client a.
- Message m;
- a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q"));
- BOOST_CHECK(a.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "aaa");
-
- b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q"));
- BOOST_CHECK(a.lq.get(m, TIME_SEC));
- BOOST_CHECK_EQUAL(m.getData(), "bbb");
-
- // Verify that the queue has been drained on both brokers.
- // This proves that the consumer was replicated when the second broker joined.
- BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
-}
-#endif
-
+// FIXME aconway 2008-09-11: This test has to be first otherwise
+// it picks up the cluster name from a previous test and runs the
+// brokers as cluster nodes. Something wrong with option parsing...
+//
QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
- BrokerFixture donor, receiver;
+ // In this test we don't want the cluster plugin to initialize, so set --cluster-name=""
+ const char* argv[] = { "--cluster-name", "" };
+ Broker::Options opts = parseOpts(sizeof(argv)/sizeof(*argv), argv);
+
+ BrokerFixture donor(opts), receiver(opts);
{
Client c(donor.getPort());
FieldTable args;
@@ -246,6 +228,94 @@
}
}
+
+// FIXME aconway 2008-09-12: finish the new join protocol.
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) {
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ // Create some shared state.
+ c0.session.queueDeclare("q");
+ c0.session.messageTransfer(arg::content=Message("foo","q"));
+ while (c0.session.queueQuery("q").getMessageCount() != 1)
+ ::usleep(1000); // Wait for message to show up on broker 0.
+
+ // Now join new broker, should catch up.
+ cluster.add();
+ c0.session.messageTransfer(arg::content=Message("bar","q"));
+ c0.session.queueDeclare("p");
+ c0.session.messageTransfer(arg::content=Message("poo","p"));
+
+ // Verify new broker has all state.
+ Message m;
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+ BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bar");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0);
+ BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "poo");
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0);
+}
+
+QPID_AUTO_TEST_CASE(testStall) {
+ ClusterFixture cluster(2);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ // Declare on all to avoid race condition.
+ c0.session.queueDeclare("q");
+ c1.session.queueDeclare("q");
+
+ // Stall 0, verify it does not process deliverys while stalled.
+ getGlobalCluster().stall();
+ c1.session.messageTransfer(arg::content=Message("foo","q"));
+ while (c1.session.queueQuery("q").getMessageCount() != 1)
+ ::usleep(1000); // Wait for message to show up on broker 1.
+ sleep(2); // FIXME aconway 2008-09-11: remove.
+ // But it should not be on broker 0.
+ boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
+ BOOST_REQUIRE(q0);
+ BOOST_CHECK_EQUAL(q0->getMessageCount(), 0);
+ // Now unstall and we should get the message.
+ getGlobalCluster().unStall();
+ Message m;
+ BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "foo");
+}
+
+#if 0 // FIXME aconway 2008-09-10: finish & enable
+QPID_AUTO_TEST_CASE(testDumpConsumers) {
+ ClusterFixture cluster(1);
+ Client a(cluster[0]);
+ a.session.queueDeclare("q");
+ a.subs.subscribe(a.lq, "q");
+
+ cluster.add();
+ Client b(cluster[1]);
+ try {
+ b.connection.newSession(a.session.getId().getName());
+ BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
+ } catch (const SessionBusyException&) {}
+
+ // Transfer some messages to the subscription by client a.
+ Message m;
+ a.session.messageTransfer(arg::content=Message("aaa", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "aaa");
+
+ b.session.messageTransfer(arg::content=Message("bbb", "q"));
+ BOOST_CHECK(a.lq.get(m, TIME_SEC));
+ BOOST_CHECK_EQUAL(m.getData(), "bbb");
+
+ // Verify that the queue has been drained on both brokers.
+ // This proves that the consumer was replicated when the second broker joined.
+ BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
+}
+
+
+#endif
+
QPID_AUTO_TEST_CASE(testForkedBroker) {
// Verify the ForkedBroker works as expected.
const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster Fri Sep 12 11:07:47 2008
@@ -12,7 +12,7 @@
rm -f cluster*.log
SIZE=$1; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
-OPTS="-d --load-module ../.libs/libqpidcluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
+OPTS="-d --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
if test "$SIZE" = "one"; then # Special case of singleton cluster, use default port.
../qpidd -q
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts Fri Sep 12 11:07:47 2008
@@ -15,7 +15,7 @@
#
QPIDD=${QPIDD:-$PWD/../qpidd}
-LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/libqpidcluster.so}
+LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/cluster.so}
NAME=$USER # User name is default cluster name.
RESTART=NO
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Fri Sep 12 11:07:47 2008
@@ -29,17 +29,14 @@
<!-- Cluster membership -->
- <control name = "joining" code="0x1">
- <field name="joining" type="str16" label="URL of new member joining cluster."/>
+ <control name = "url-notice" code="0x1" label="Url to use for a cluster member">
+ <field name="url" type="str16" label="URL for brain dump to new member."/>
</control>
-
- <control name="ready" code="0x2" label="New member is ready."/>
-
- <control name="members" code="0x3" label="Cluster map sent to new members.">
+ <control name="map" code="0x3" label="Cluster map sent to new members.">
<field name="members" type="map"/> <!-- member-id -> URL -->
- <field name="donors" type="map"/> <!-- member-id -> uint32 (donor-count) -->
- <field name="newbies" type="map"/> <!-- member-id -> URL -->
+ <field name="dumpees" type="map"/> <!-- dumpee-id -> braindump URL -->
+ <field name="dumps" type="map"/> <!-- dumpee-id -> donor-id -->
</control>
<!-- Transferring broker state -->