You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/12 20:07:48 UTC

svn commit: r694758 - in /incubator/qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/qpid/sys/ src/tests/ xml/

Author: aconway
Date: Fri Sep 12 11:07:47 2008
New Revision: 694758

URL: http://svn.apache.org/viewvc?rev=694758&view=rev
Log:
Added ClusterMap and test. Moved PollableCondition, PollableQueue to sys.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
    incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Sep 12 11:07:47 2008
@@ -26,7 +26,9 @@
   qpid/cluster/Event.h \
   qpid/cluster/Event.cpp \
   qpid/cluster/DumpClient.h \
-  qpid/cluster/DumpClient.cpp
+  qpid/cluster/DumpClient.cpp \
+  qpid/cluster/ClusterMap.h \
+  qpid/cluster/ClusterMap.cpp
 
 cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Sep 12 11:07:47 2008
@@ -25,7 +25,7 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterJoiningBody.h"
+#include "qpid/framing/ClusterUrlNoticeBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/log/Statement.h"
@@ -50,7 +50,7 @@
     Cluster& cluster;
     MemberId member;
     ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
-    void joining(const std::string& u) { cluster.joining (member, u); }
+    void urlNotice(const std::string& u) { cluster.urlNotice(member, u); }
     void ready() { cluster.ready(member); }
 
     void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) {
@@ -58,6 +58,11 @@
     }
 
     bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
+
+    virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) {
+        // FIXME aconway 2008-09-12: TODO
+    }
+
 };
 
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -72,13 +77,14 @@
                       0,                                         // write
                       boost::bind(&Cluster::disconnect, this, _1) // disconnect
     ),
-    deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1)))
+    connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
+    state(DISCARD)
 {
     QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str());
     broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
     cpg.join(name);
 
-    deliverQueue.start(poller);
+    connectionEventQueue.start(poller);
     cpgDispatchHandle.startWatch(poller);
 }
 
@@ -103,6 +109,7 @@
 void Cluster::leave() {
     QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str());
     cpg.leave(name);
+    // Cluster will shut down in configChange when the cluster knows we've left.
 }
 
 template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -172,8 +179,23 @@
 {
     try {
         MemberId from(nodeid, pid);
-        QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10: 
-        deliverQueue.push(Event::delivered(from, msg, msg_len));
+        Event e = Event::delivered(from, msg, msg_len);
+        QPID_LOG(trace, "Cluster deliver: " << e);
+
+        // Process cluster controls immediately 
+        if (e.getConnectionId().getConnectionPtr() == 0)  { // Cluster control
+            Buffer buf(e);
+            AMQFrame frame;
+            while (frame.decode(buf)) 
+                if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
+                    throw Exception("Invalid cluster control");
+        }
+        else { // Process connection controls & data via the connectionEventQueue.
+            if (state != DISCARD) { 
+                e.setConnection(getConnection(e.getConnectionId()));
+                connectionEventQueue.push(e);
+            }
+        }
     }
     catch (const std::exception& e) {
         // FIXME aconway 2008-01-30: exception handling.
@@ -183,24 +205,15 @@
     }
 }
 
-void Cluster::deliverEvent(const Event& e) {
-    QPID_LOG(trace, "Delivered: " << e);
+void Cluster::connectionEvent(const Event& e) {
     Buffer buf(e);
-    if (e.getConnection().getConnectionPtr() == 0)  { // Cluster control
+    assert(e.getConnection());
+    if (e.getType() == DATA)
+        e.getConnection()->deliverBuffer(buf);
+    else {              // control
         AMQFrame frame;
-        while (frame.decode(buf)) 
-            if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame))
-                throw Exception("Invalid cluster control");
-    }
-    else {                  // Connection data or control
-        boost::intrusive_ptr<Connection> c = getConnection(e.getConnection());
-        if (e.getType() == DATA)
-            c->deliverBuffer(buf);
-        else {              // control
-            AMQFrame frame;
-            while (frame.decode(buf))
-                c->deliver(frame);
-        }
+        while (frame.decode(buf))
+            e.getConnection()->deliver(frame);
     }
 }
 
@@ -239,7 +252,7 @@
     
     if (nJoined)                // Notfiy new members of my URL.
         mcastFrame(
-            AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())),
+            AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
             ConnectionId(self,0));
 
     if (find(left, left+nLeft, self) != left+nLeft) {
@@ -266,8 +279,15 @@
     broker.shutdown();
 }
 
-void Cluster::joining(const MemberId& m, const string& url) {
+void Cluster::urlNotice(const MemberId& m, const string& url) {
+    //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also.
+    //FIXME aconway 2008-09-11: Note multiple meanings of my own notice -
+    //from DISCARD->STALL and from STALL->READY via map.
+
     QPID_LOG(info, "Cluster member " << m << " has URL " << url);
+    // My brain dump is up to this point, stall till it is complete.
+    if (m == self && state == DISCARD) 
+        state = STALL;
     urls.insert(UrlMap::value_type(m,Url(url)));
 }
 
@@ -289,4 +309,18 @@
 
 broker::Broker& Cluster::getBroker(){ return broker; }
 
+void Cluster::stall() {
+    // Stop processing connection events. We still process config changes
+    // and cluster controls in deliver()
+
+    // FIXME aconway 2008-09-11: Flow control, we should slow down or
+    // stop reading from local connections while stalled to avoid an
+    // unbounded queue.
+    connectionEventQueue.stop();
+}
+
+void Cluster::unStall() {
+    connectionEventQueue.start(poller);
+}
+
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Sep 12 11:07:47 2008
@@ -75,11 +75,14 @@
     /** Leave the cluster */
     void leave();
     
-    void joining(const MemberId&, const std::string& url);
+    void urlNotice(const MemberId&, const std::string& url);
     void ready(const MemberId&);
 
     MemberId getSelf() const { return self; }
 
+    void stall();
+    void unStall();
+
     void shutdown();
 
     broker::Broker& getBroker();
@@ -88,15 +91,13 @@
     typedef std::map<MemberId, Url>  UrlMap;
     typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
     typedef sys::PollableQueue<Event> EventQueue;
+    enum State {
+        DISCARD,                // Initially discard connection events up to my own join message.
+        READY,                  // Normal processing.
+        STALL                   // Stalled while a new member joins.
+    };
 
-    boost::function<void()> shutdownNext;
-
-    /** Handle a delivered frame */
-    void deliverFrame(framing::AMQFrame&, const ConnectionId&);
-
-    void deliverBuffer(const char*, size_t, const ConnectionId&);
-
-    void deliverEvent(const Event&);
+    void connectionEvent(const Event&);
     
     /** CPG deliver callback. */
     void deliver(
@@ -136,7 +137,8 @@
     ConnectionMap connections;
     NoOpConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
-    EventQueue deliverQueue;
+    EventQueue connectionEventQueue;
+    State state;
     
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=694758&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Fri Sep 12 11:07:47 2008
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClusterMap.h"
+#include "qpid/Url.h"
+#include "qpid/framing/FieldTable.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+#include <functional>
+
+namespace qpid {
+using namespace framing;
+
+namespace cluster {
+
+ClusterMap::ClusterMap() {}
+
+MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) {
+    if (isMember(id)) return MemberId(); // Ignore notices from established members.
+    if (isDumpee(id)) {
+        // Dumpee caught up, graduate to member with new URL and remove dumper from list.
+        dumpees.erase(id);
+        members[id] = url;
+    }
+    else if (members.empty()) {
+        // First in cluster, congratulations!
+        members[id] = url;
+    }
+    else {
+        // New member needs brain dump.
+        MemberId dumper = nextDumper();
+        Dumpee& d = dumpees[id];
+        d.url = url;
+        d.dumper = dumper;
+        return dumper;
+    }
+    return MemberId();
+}
+
+MemberId ClusterMap::nextDumper() const {
+    // Choose the first member in member-id order of the group that
+    // has the least number of dumps-in-progress.
+    assert(!members.empty());
+    MemberId dumper = members.begin()->first;
+    int minDumps  = dumps(dumper);
+    MemberMap::const_iterator i = ++members.begin();
+    while (i != members.end()) {
+        int d = dumps(i->first);
+        if (d < minDumps) {
+            minDumps = d;
+            dumper = i->first;
+        }
+        ++i;
+    }
+    return dumper;
+}
+
+void ClusterMap::leave(const MemberId& id) {
+    if (isDumpee(id))
+        dumpees.erase(id);
+    if (isMember(id)) {
+        members.erase(id);
+        DumpeeMap::iterator i = dumpees.begin();
+        while (i != dumpees.end()) {
+            if (i->second.dumper == id) dumpees.erase(i++);
+            else ++i;
+        }
+    }
+}
+
+struct ClusterMap::MatchDumper {
+    MemberId d;
+    MatchDumper(const MemberId& i) : d(i) {}
+    bool operator()(const DumpeeMap::value_type& v) const { return v.second.dumper == d; }
+};
+
+int ClusterMap::dumps(const MemberId& id) const {
+    return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id));
+}
+
+void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); }
+
+framing::ClusterMapBody ClusterMap::toControl() const {
+    framing::ClusterMapBody b;
+    for (MemberMap::const_iterator i = members.begin(); i != members.end(); ++i) 
+        b.getMembers().setString(i->first.str(), i->second.str());
+    for (DumpeeMap::const_iterator i = dumpees.begin(); i != dumpees.end(); ++i)  {
+        b.getDumpees().setString(i->first.str(), i->second.url.str());
+        b.getDumps().setString(i->first.str(), i->second.dumper.str());
+    }
+    return b;
+}
+
+void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
+    *this = ClusterMap();       // Reset any current contents.
+    FieldTable::ValueMap::const_iterator i;
+    for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i) 
+        members[i->first] = Url(i->second->get<std::string>());
+    for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i)
+        dumpees[i->first].url = Url(i->second->get<std::string>());
+    for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i)
+        dumpees[i->first].dumper = MemberId(i->second->get<std::string>());
+}
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=694758&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Fri Sep 12 11:07:47 2008
@@ -0,0 +1,86 @@
+#ifndef QPID_CLUSTER_CLUSTERMAP_H
+#define QPID_CLUSTER_CLUSTERMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/framing/ClusterMapBody.h"
+#include "qpid/Url.h"
+#include <boost/optional.hpp>
+#include <vector>
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Map of established cluster members and brain-dumps in progress.
+ * A dumper is an established member that is sending catch-up data.
+ * A dumpee is an aspiring member that is receiving catch-up data.
+ */
+class ClusterMap
+{
+  public:
+    ClusterMap();
+    
+    /** Update map for url-notice event.
+     *@param from Member that sent the notice.
+     *@param url  URL for from.
+     *@return MemberId of member that should dump to URL, or a null
+     * MemberId() if no dump is needed.
+     */
+    MemberId urlNotice(const MemberId& from, const Url& url);
+
+    /** Dump failed notice */
+    void dumpFailed(const MemberId&);
+    
+    /** Update map for leave event */
+    void leave(const MemberId&);
+
+    /** Number of unfinished dumps for member. */
+    int dumps(const MemberId&) const;
+
+    /** Convert map contents to a cluster control body. */
+    framing::ClusterMapBody toControl() const;
+
+    /** Initialize map contents from a cluster control body. */
+    void fromControl(const framing::ClusterMapBody&);
+
+    size_t memberCount() const { return members.size(); }    
+    size_t dumpeeCount() const { return dumpees.size(); }    
+    bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }  
+    bool isDumpee(const MemberId& id) const { return dumpees.find(id) != dumpees.end(); }
+
+  private:
+    struct Dumpee { Url url; MemberId dumper; };
+    typedef std::map<MemberId, Url> MemberMap;
+    typedef std::map<MemberId, Dumpee> DumpeeMap;
+    struct MatchDumper;
+    
+    MemberId nextDumper() const;
+
+    MemberMap members;
+    DumpeeMap dumpees;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CLUSTERMAP_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Sep 12 11:07:47 2008
@@ -76,7 +76,6 @@
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         if (!broker || values.name.empty()) return;  // Only if --cluster-name option was specified.
-        QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin.");
         cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Sep 12 11:07:47 2008
@@ -19,6 +19,7 @@
  *
  */
 #include "Connection.h"
+#include "Cluster.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/Invoker.h"
 #include "qpid/framing/AllInvoker.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Sep 12 11:07:47 2008
@@ -23,9 +23,9 @@
  */
 
 #include "types.h"
-#include "Cluster.h"
 #include "WriteEstimate.h"
 #include "OutputInterceptor.h"
+#include "NoOpConnectionOutputHandler.h"
 
 #include "qpid/broker/Connection.h"
 #include "qpid/amqp_0_10/Connection.h"
@@ -39,6 +39,7 @@
 namespace framing { class AMQFrame; }
 
 namespace cluster {
+class Cluster;
 
 /** Intercept broker::Connection calls for shadow and local cluster connections. */
 class Connection :

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Fri Sep 12 11:07:47 2008
@@ -20,6 +20,7 @@
  */
 #include "ConnectionCodec.h"
 #include "Connection.h"
+#include "Cluster.h"
 #include "ProxyInputHandler.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Sep 12 11:07:47 2008
@@ -183,4 +183,15 @@
     return o << c.first << "-" << c.second;
 }
 
+std::string MemberId::str() const  {
+    char s[8];
+    reinterpret_cast<uint32_t&>(s[0]) = htonl(first);
+    reinterpret_cast<uint32_t&>(s[4]) = htonl(second);
+    return std::string(s,8);
+}
+
+MemberId::MemberId(const std::string& s) {
+    first = ntohl(reinterpret_cast<const uint32_t&>(s[0]));
+    second = ntohl(reinterpret_cast<const uint32_t&>(s[4]));
+}
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Fri Sep 12 11:07:47 2008
@@ -34,7 +34,7 @@
 const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t);
 
 Event::Event(EventType t, const ConnectionId c, const size_t s)
-    : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {}
+    : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)) {}
 
 Event Event::delivered(const MemberId& m, void* d, size_t s) {
     Buffer buf(static_cast<char*>(d), s);
@@ -50,7 +50,7 @@
     char header[OVERHEAD];
     Buffer b(header, OVERHEAD);
     b.putOctet(type);
-    b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr()));
+    b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getConnectionPtr()));
     iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } };
     cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
 }
@@ -61,7 +61,7 @@
 
 static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
 std::ostream& operator << (std::ostream& o, const Event& e) {
-    o << "[event: " << e.getConnection()
+    o << "[event: " << e.getConnectionId()
       << " " << EVENT_TYPE_NAMES[e.getType()]
       << " " << e.getSize() << " bytes: ";
     std::ostream_iterator<char> oi(o,"");

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Fri Sep 12 11:07:47 2008
@@ -24,6 +24,7 @@
 
 #include "types.h"
 #include "Cpg.h"
+#include "Connection.h"
 #include "qpid/RefCountedBuffer.h"
 #include "qpid/framing/Buffer.h"
 #include <iosfwd>
@@ -39,7 +40,7 @@
  * Events are sent to/received from the cluster.
  * Refcounted so they can be stored on queues.
  */
-struct Event {
+class Event {
   public:
     /** Create an event to mcast with a buffer of size bytes. */
     Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t size=0);
@@ -50,17 +51,21 @@
     void mcast(const Cpg::Name& name, Cpg& cpg) const;
     
     EventType getType() const { return type; }
-    ConnectionId getConnection() const { return connection; }
+    ConnectionId getConnectionId() const { return connectionId; }
     size_t getSize() const { return size; }
     char* getData() { return data; }
     const char* getData() const { return data; }
 
+    boost::intrusive_ptr<Connection> getConnection() const { return connection; }
+    void setConnection(const boost::intrusive_ptr<Connection>& c) { connection=c; }
+
     operator framing::Buffer() const;
 
   private:
     static const size_t OVERHEAD;
     EventType type;
-    ConnectionId connection;
+    ConnectionId connectionId;
+    boost::intrusive_ptr<Connection> connection;
     size_t size;
     RefCountedBuffer::pointer data;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Fri Sep 12 11:07:47 2008
@@ -20,6 +20,7 @@
  */
 #include "OutputInterceptor.h"
 #include "Connection.h"
+#include "Cluster.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/log/Statement.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Fri Sep 12 11:07:47 2008
@@ -23,6 +23,8 @@
  */
 #include <utility>
 #include <iosfwd>
+#include <string>
+
 #include <stdint.h>
 
 extern "C" {
@@ -39,10 +41,15 @@
 
 /** first=node-id, second=pid */
 struct MemberId : std::pair<uint32_t, uint32_t> {
-    MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
+    explicit MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
     MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {}
+    MemberId(const std::string&); // Decode from string.
     uint32_t getNode() const { return first; }
     uint32_t getPid() const { return second; }
+    operator bool() const { return first || second; }
+
+    // Encode as string, network byte order.
+    std::string str() const;
 };
 
 inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id == MemberId(caddr); }
@@ -55,6 +62,13 @@
     Connection* getConnectionPtr() const { return second; }
 };
 
+/** State of a cluster member */
+enum State {
+    DISCARD, // Initially discard connection events up to my own join message.
+    STALL,   // All members stall while a new member joins.
+    READY    // Normal processing.
+};
+
 std::ostream& operator<<(std::ostream&, const ConnectionId&);
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Fri Sep 12 11:07:47 2008
@@ -24,7 +24,7 @@
 
 #include "qpid/sys/PollableCondition.h"
 #include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
 #include <algorithm>
@@ -54,58 +54,84 @@
     /** Callback to process a range of items. */
     typedef boost::function<void (const iterator&, const iterator&)> Callback;
 
-    /** Functor tempalate to create a Callback from a functor that handles a single item. */
+    /** @see forEach() */
     template <class F> struct ForEach {
         F handleOne;
         ForEach(const F& f) : handleOne(f) {}
         void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); }
     };
-    /** Function to create ForEach instances */
+
+    /** Create a range callback from a functor that processes a single item. */
     template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); }
     
     /** When the queue is selected by the poller, values are passed to callback cb. */
     explicit PollableQueue(const Callback& cb);
 
     /** Push a value onto the queue. Thread safe */
-    void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); }
+    void push(const T& t);
 
     /** Start polling. */ 
-    void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); }
+    void start(const boost::shared_ptr<sys::Poller>& poller);
 
-    /** Stop polling. */
-    void stop() { handle.stopWatch(); }
+    /** Stop polling and wait for the current callback, if any, to complete. */
+    void stop();
     
   private:
-    typedef sys::Mutex::ScopedLock ScopedLock;
-    typedef sys::Mutex::ScopedUnlock ScopedUnlock;
+    typedef sys::Monitor::ScopedLock ScopedLock;
+    typedef sys::Monitor::ScopedUnlock ScopedUnlock;
 
     void dispatch(sys::DispatchHandle&);
     
-    sys::Mutex lock;
+    sys::Monitor lock;
     Callback callback;
     PollableCondition condition;
     sys::DispatchHandle handle;
     Queue queue;
     Queue batch;
+    bool dispatching, stopped;
 };
 
 template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: 
     : callback(cb),
-      handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0)
+      handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0),
+      dispatching(false), stopped(true)
 {}
 
+template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) {
+    ScopedLock l(lock);
+    stopped = false;
+    handle.startWatch(poller);
+}
+
+template <class T> void PollableQueue<T>::push(const T& t) {
+    ScopedLock l(lock);
+    queue.push_back(t);
+    condition.set();
+}
+
 template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
-    ScopedLock l(lock);         // Lock for concurrent push() 
-    batch.clear();
-    batch.swap(queue);
+    ScopedLock l(lock);
+    if (stopped) return;
+    dispatching = true;
     condition.clear();
+    batch.clear();
+    batch.swap(queue);          // Snapshot of current queue contents.
     {
         // Process outside the lock to allow concurrent push.
         ScopedUnlock u(lock);
         callback(batch.begin(), batch.end()); 
-        h.rewatch();
     }
     batch.clear();
+    dispatching = false;
+    if (stopped) lock.notifyAll();
+    else h.rewatch();
+}
+
+template <class T> void PollableQueue<T>::stop() {
+    ScopedLock l(lock);
+    handle.stopWatch();
+    stopped = true;
+    while (dispatching) lock.wait();
 }
 
 }} // namespace qpid::sys

Added: incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp?rev=694758&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp Fri Sep 12 11:07:47 2008
@@ -0,0 +1,191 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/cluster/ClusterMap.h"
+#include "qpid/framing/ClusterMapBody.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/Url.h"
+#include <boost/assign.hpp>
+
+QPID_AUTO_TEST_SUITE(CluterMapTest)
+
+using namespace std;
+using namespace qpid;
+using namespace cluster;
+using namespace framing;
+
+MemberId id(int i) { return MemberId(i,i); }
+
+Url url(const char* host) { return Url(TcpAddress(host)); }
+
+QPID_AUTO_TEST_CASE(testNotice) {
+    ClusterMap m;
+    BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); // id(0) member, no dump.
+    BOOST_CHECK(m.isMember(id(0)));
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+    BOOST_CHECK_EQUAL(m.memberCount(), 1);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+    BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); // Newbie, needs dump
+    BOOST_CHECK(m.isMember(id(0)));
+    BOOST_CHECK(m.isDumpee(id(1)));
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+    BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
+    BOOST_CHECK_EQUAL(m.memberCount(), 1);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+    BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); // id(1) is ready.
+    BOOST_CHECK(m.isMember(id(0)));
+    BOOST_CHECK(m.isMember(id(1)));
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+    BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
+    BOOST_CHECK_EQUAL(m.memberCount(), 2);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+    BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); // id(2) needs dump
+    BOOST_CHECK(m.isDumpee(id(2)));
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+    BOOST_CHECK_EQUAL(id(1), m.urlNotice(id(3), url("3-dump"))); // 0 busy, dump to id(1).
+    BOOST_CHECK(m.isDumpee(id(3)));    
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+    BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 2);
+
+    BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); // Equally busy, 0 is first on list.
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 2);
+    BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 3);    
+
+    // My dumpees both complete
+    BOOST_CHECK(!m.urlNotice(id(2), url("2-ready"))); 
+    BOOST_CHECK(!m.urlNotice(id(4), url("4-ready"))); 
+    BOOST_CHECK(m.isMember(id(2)));
+    BOOST_CHECK(m.isMember(id(4)));
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);    
+    BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);    
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+    // Final dumpee completes.
+    BOOST_CHECK(!m.urlNotice(id(3), url("3-ready")));
+    BOOST_CHECK(m.isMember(id(3)));
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);    
+    BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);    
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+}
+
+QPID_AUTO_TEST_CASE(testLeave) {
+    ClusterMap m;
+    BOOST_CHECK(!m.urlNotice(id(0), url("0-ready")));
+    BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump")));
+    BOOST_CHECK(!m.urlNotice(id(1), url("1-ready")));
+    BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump")));
+    BOOST_CHECK(!m.urlNotice(id(2), url("2-ready")));
+    BOOST_CHECK_EQUAL(m.memberCount(), 3);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+    
+    m.leave(id(1));
+    BOOST_CHECK_EQUAL(m.memberCount(), 2);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+    BOOST_CHECK(m.isMember(id(0)));
+    BOOST_CHECK(m.isMember(id(2)));
+
+    BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump")));
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+    BOOST_CHECK(m.isDumpee(id(4)));
+    BOOST_CHECK_EQUAL(m.memberCount(), 2);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+    m.dumpFailed(id(4));        // Dumper detected a failure.
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+    BOOST_CHECK(!m.isDumpee(id(4)));
+    BOOST_CHECK(!m.isMember(id(4)));
+    BOOST_CHECK_EQUAL(m.memberCount(), 2);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+    m.leave(id(4));             // Dumpee leaves, no-op since we already know it failed.
+    BOOST_CHECK_EQUAL(m.memberCount(), 2);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+    BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(5), url("5-dump")));
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
+    BOOST_CHECK(m.isDumpee(id(5)));
+    BOOST_CHECK_EQUAL(m.memberCount(), 2);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
+
+    m.leave(id(5));             // Dumpee detects failure and leaves cluster.
+    BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
+    BOOST_CHECK(!m.isDumpee(id(5)));
+    BOOST_CHECK(!m.isMember(id(5)));
+    BOOST_CHECK_EQUAL(m.memberCount(), 2);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+
+    m.dumpFailed(id(5));        // Dumper reports failure - no op, we already know.
+    BOOST_CHECK_EQUAL(m.memberCount(), 2);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
+}
+
+QPID_AUTO_TEST_CASE(testToControl) {
+    ClusterMap m;
+    m.urlNotice(id(0), url("0"));
+    m.urlNotice(id(1), url("1dump"));
+    m.urlNotice(id(1), url("1"));
+    m.urlNotice(id(2), url("2dump"));
+    m.urlNotice(id(3), url("3dump"));
+    m.urlNotice(id(4), url("4dump"));
+
+    BOOST_CHECK_EQUAL(m.memberCount(), 2);
+    BOOST_CHECK_EQUAL(m.dumpeeCount(), 3);
+
+    ClusterMapBody b = m.toControl();
+
+    BOOST_CHECK_EQUAL(b.getMembers().count(), 2);
+    BOOST_CHECK_EQUAL(b.getMembers().getString(id(0).str()), url("0").str());
+    BOOST_CHECK_EQUAL(b.getMembers().getString(id(1).str()), url("1").str());
+
+    BOOST_CHECK_EQUAL(b.getDumpees().count(), 3);
+    BOOST_CHECK_EQUAL(b.getDumpees().getString(id(2).str()), url("2dump").str());
+    BOOST_CHECK_EQUAL(b.getDumpees().getString(id(3).str()), url("3dump").str());
+    BOOST_CHECK_EQUAL(b.getDumpees().getString(id(4).str()), url("4dump").str());
+
+    BOOST_CHECK_EQUAL(b.getDumps().count(), 3);
+    BOOST_CHECK_EQUAL(b.getDumps().getString(id(2).str()), id(0).str());
+    BOOST_CHECK_EQUAL(b.getDumps().getString(id(3).str()), id(1).str());
+    BOOST_CHECK_EQUAL(b.getDumps().getString(id(4).str()), id(0).str());
+
+    std::string s(b.size(), '\0');
+    Buffer buf(&s[0], s.size());
+    b.encode(buf);
+
+    ClusterMap m2;
+    m2.fromControl(b);
+    ClusterMapBody b2 = m2.toControl();
+    std::string s2(b2.size(), '\0');
+    Buffer buf2(&s2[0], s2.size());
+    b2.encode(buf2);
+
+    // Verify a round-trip encoding produces identical results.
+    BOOST_CHECK_EQUAL(s,s2);
+}
+
+QPID_AUTO_TEST_SUITE_END()

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ClusterMapTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Sep 12 11:07:47 2008
@@ -28,7 +28,7 @@
 TESTS+=unit_test
 check_PROGRAMS+=unit_test
 unit_test_LDADD=-lboost_unit_test_framework -lboost_regex  \
-	$(lib_client) $(lib_broker) # $(lib_amqp_0_10)
+	$(lib_client) $(lib_broker)
 
 unit_test_SOURCES= unit_test.cpp unit_test.h \
 	BrokerFixture.h SocketProxy.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Fri Sep 12 11:07:47 2008
@@ -18,4 +18,7 @@
 cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
 cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
 
+unit_test_SOURCES+=ClusterMapTest.cpp
+unit_test_LDADD+=$(lib_cluster)
+
 endif

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Sep 12 11:07:47 2008
@@ -59,12 +59,19 @@
 using qpid::cluster::Cluster;
 using qpid::cluster::getGlobalCluster;
 
+/** Parse broker & cluster options */
+Broker::Options parseOpts(size_t argc, const char* argv[]) {
+    Broker::Options opts;
+    Plugin::addOptions(opts); // Pick up cluster options.
+    opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
+    return opts;
+}
+
 /** Cluster fixture is a vector of ports for the replicas.
  * Replica 0 is in the current process, all others are forked as children.
  */
 struct ClusterFixture : public vector<uint16_t>  {
     string name;
-    Broker::Options opts;
     std::auto_ptr<BrokerFixture> broker0;
     boost::ptr_vector<ForkedBroker> forkedBrokers;
 
@@ -96,7 +103,7 @@
 
     const char* argv[] = {
         "qpidd " __FILE__ ,
-        "--load-module=../.libs/libqpidcluster.so",
+        "--load-module=../.libs/cluster.so",
         "--cluster-name", name.c_str(), 
         "--auth=no", "--no-data-dir",
         "--log-prefix", prefix.c_str(),
@@ -108,11 +115,8 @@
         push_back(forkedBrokers.back().getPort());
     }
     else {                      // First broker, run in this process.
-        Broker::Options opts;
         qpid::log::Logger::instance().setPrefix("main");
-        Plugin::addOptions(opts); // Pick up cluster options.
-        opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
-        broker0.reset(new BrokerFixture(opts));
+        broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
         push_back(broker0->getPort());
     }
 }
@@ -136,38 +140,16 @@
     return o;
 }
 
-#if 0                           // FIXME aconway 2008-09-10: finish & enable
-QPID_AUTO_TEST_CASE(testDumpConsumers) {
-    ClusterFixture cluster(1);
-    Client a(cluster[0]);
-    a.session.queueDeclare("q");
-    a.subs.subscribe(a.lq, "q");
-
-    cluster.add();
-    Client b(cluster[1]);
-    try {
-        b.connection.newSession(a.session.getId().getName());
-        BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
-    } catch (const SessionBusyException&) {}
-
-    // Transfer some messages to the subscription by client a.
-    Message m;
-    a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", "q"));
-    BOOST_CHECK(a.lq.get(m, TIME_SEC));
-    BOOST_CHECK_EQUAL(m.getData(), "aaa");
-
-    b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", "q"));
-    BOOST_CHECK(a.lq.get(m, TIME_SEC));
-    BOOST_CHECK_EQUAL(m.getData(), "bbb");
-
-    // Verify that the queue has been drained on both brokers.
-    // This proves that the consumer was replicated when the second broker joined.
-    BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
-}
-#endif
-
+// FIXME aconway 2008-09-11: This test has to be first otherwise
+// it picks up the cluster name from a previous test and runs the
+// brokers as cluster nodes. Something wrong with option parsing...
+// 
 QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
-    BrokerFixture donor, receiver;
+    // In this test we don't want the cluster plugin to initialize, so set --cluster-name=""
+    const char* argv[] = { "--cluster-name", "" };
+    Broker::Options opts = parseOpts(sizeof(argv)/sizeof(*argv), argv);
+
+    BrokerFixture donor(opts), receiver(opts);
     {
         Client c(donor.getPort());
         FieldTable args;
@@ -246,6 +228,94 @@
     }
 }
 
+
+// FIXME aconway 2008-09-12: finish the new join protocol.
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) {
+    ClusterFixture cluster(1);
+    Client c0(cluster[0], "c0");
+    // Create some shared state.
+    c0.session.queueDeclare("q");
+    c0.session.messageTransfer(arg::content=Message("foo","q"));
+    while (c0.session.queueQuery("q").getMessageCount() != 1)
+        ::usleep(1000);         // Wait for message to show up on broker 0.
+
+    // Now join new broker, should catch up.
+    cluster.add();
+    c0.session.messageTransfer(arg::content=Message("bar","q"));
+    c0.session.queueDeclare("p");
+    c0.session.messageTransfer(arg::content=Message("poo","p"));
+
+    // Verify new broker has all state.
+    Message m;
+    Client c1(cluster[1], "c1");
+    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "foo");
+    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "bar");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0);
+    BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "poo");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0);
+}
+
+QPID_AUTO_TEST_CASE(testStall) {
+    ClusterFixture cluster(2);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+
+    // Declare on all to avoid race condition.
+    c0.session.queueDeclare("q");
+    c1.session.queueDeclare("q");
+    
+    // Stall 0, verify it does not process deliverys while stalled.
+    getGlobalCluster().stall();
+    c1.session.messageTransfer(arg::content=Message("foo","q"));
+    while (c1.session.queueQuery("q").getMessageCount() != 1)
+        ::usleep(1000);         // Wait for message to show up on broker 1.
+    sleep(2);               // FIXME aconway 2008-09-11: remove.
+    // But it should not be on broker 0.
+    boost::shared_ptr<broker::Queue> q0 = cluster.broker0->broker->getQueues().find("q");
+    BOOST_REQUIRE(q0);
+    BOOST_CHECK_EQUAL(q0->getMessageCount(), 0);
+    // Now unstall and we should get the message.
+    getGlobalCluster().unStall();
+    Message m;
+    BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "foo");
+}
+
+#if 0                           // FIXME aconway 2008-09-10: finish & enable
+QPID_AUTO_TEST_CASE(testDumpConsumers) {
+    ClusterFixture cluster(1);
+    Client a(cluster[0]);
+    a.session.queueDeclare("q");
+    a.subs.subscribe(a.lq, "q");
+
+    cluster.add();
+    Client b(cluster[1]);
+    try {
+        b.connection.newSession(a.session.getId().getName());
+        BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
+    } catch (const SessionBusyException&) {}
+
+    // Transfer some messages to the subscription by client a.
+    Message m;
+    a.session.messageTransfer(arg::content=Message("aaa", "q"));
+    BOOST_CHECK(a.lq.get(m, TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "aaa");
+
+    b.session.messageTransfer(arg::content=Message("bbb", "q"));
+    BOOST_CHECK(a.lq.get(m, TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "bbb");
+
+    // Verify that the queue has been drained on both brokers.
+    // This proves that the consumer was replicated when the second broker joined.
+    BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
+}
+
+
+#endif
+
 QPID_AUTO_TEST_CASE(testForkedBroker) {
     // Verify the ForkedBroker works as expected.
     const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster Fri Sep 12 11:07:47 2008
@@ -12,7 +12,7 @@
 rm -f cluster*.log
 SIZE=$1; shift
 CLUSTER=`pwd`		# Cluster name=pwd, avoid clashes.
-OPTS="-d --load-module ../.libs/libqpidcluster.so  --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
+OPTS="-d --load-module ../.libs/cluster.so  --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
 
 if test "$SIZE" = "one"; then	# Special case of singleton cluster, use default port.
     ../qpidd -q

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts Fri Sep 12 11:07:47 2008
@@ -15,7 +15,7 @@
 # 
 
 QPIDD=${QPIDD:-$PWD/../qpidd}
-LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/libqpidcluster.so}
+LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/cluster.so}
 NAME=$USER			# User name is default cluster name.
 RESTART=NO
 

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=694758&r1=694757&r2=694758&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Fri Sep 12 11:07:47 2008
@@ -29,17 +29,14 @@
 
     <!-- Cluster membership -->
 
-    <control name = "joining" code="0x1">
-      <field name="joining" type="str16" label="URL of new member joining cluster."/>
+    <control name = "url-notice" code="0x1" label="Url to use for a cluster member">
+      <field name="url" type="str16" label="URL for brain dump to new member."/>
     </control>
 
-
-    <control name="ready" code="0x2" label="New member is ready."/>
-
-    <control name="members" code="0x3" label="Cluster map sent to new members.">
+    <control name="map" code="0x3" label="Cluster map sent to new members.">
       <field name="members" type="map"/>     <!-- member-id -> URL -->
-      <field name="donors" type="map"/>	     <!-- member-id -> uint32 (donor-count) -->
-      <field name="newbies" type="map"/>     <!-- member-id -> URL -->
+      <field name="dumpees" type="map"/> <!-- dumpee-id -> braindump URL -->
+      <field name="dumps" type="map"/>	<!-- dumpee-id -> donor-id -->
     </control>
 
     <!-- Transferring broker state -->