You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/08/30 19:06:45 UTC

svn commit: r571246 - in /incubator/qpid/trunk/qpid/cpp: ./ src/qpid/cluster/ src/qpid/framing/ src/tests/

Author: aconway
Date: Thu Aug 30 10:06:44 2007
New Revision: 571246

URL: http://svn.apache.org/viewvc?rev=571246&view=rev
Log:
 - Update cluster code to work with new FrameHandler
 - Update ClassifierHandler to  use Visitor rather than map.
 - Replace heap allocation in cluster classes with boost::optional.

Modified:
    incubator/qpid/trunk/qpid/cpp/configure.ac
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Thu Aug 30 10:06:44 2007
@@ -142,10 +142,6 @@
 AC_CHECK_HEADER([openais/cpg.h],[cpg_h=yes])
 test x$cpg_lib = xyes -a x$cpg_h = xyes && enable_CLUSTER=yes 
 
-# FIXME aconway 2007-08-30: Disable cluster till build problems
-# are fixed
-enable_CLUSTER=no
-     
 if test x$enable_CLUSTER = xyes; then
   AC_ARG_ENABLE([cluster],  
     [AS_HELP_STRING([--disable-cluster],

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp Thu Aug 30 10:06:44 2007
@@ -18,59 +18,34 @@
 
 #include "ClassifierHandler.h"
 
+#include "qpid/framing/FrameDefaultVisitor.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ExchangeDeclareBody.h"
-#include "qpid/framing/ExchangeDeleteBody.h"
-#include "qpid/framing/QueueBindBody.h"
-#include "qpid/framing/QueueDeclareBody.h"
-#include "qpid/framing/QueueDeleteBody.h"
-#include "qpid/framing/QueueUnbindBody.h"
-
 
 namespace qpid {
 namespace cluster {
 
 using namespace framing;
 
-typedef uint32_t FullMethodId;  // Combind class & method ID.
-
-FullMethodId fullId(ClassId c, MethodId m) { return c<<16+m; }
+struct ClassifierHandler::Visitor : public FrameDefaultVisitor {
+    Visitor(AMQFrame& f, ClassifierHandler& c)
+        : chosen(0), frame(f), classifier(c) { f.getBody()->accept(*this); }
+
+    void visit(const ExchangeDeclareBody&) { chosen=&classifier.wiring; }
+    void visit(const ExchangeDeleteBody&) { chosen=&classifier.wiring; }
+    void visit(const QueueBindBody&) { chosen=&classifier.wiring; }
+    void visit(const QueueDeclareBody&) { chosen=&classifier.wiring; }
+    void visit(const QueueDeleteBody&) { chosen=&classifier.wiring; }
+    void visit(const QueueUnbindBody&) { chosen=&classifier.wiring; }
+    void defaultVisit(const AMQBody&) { chosen=&classifier.other; }
+
+    using framing::FrameDefaultVisitor::visit;
+    using framing::FrameDefaultVisitor::defaultVisit;
+
+    FrameHandler::Chain chosen;
+    AMQFrame& frame;
+    ClassifierHandler& classifier;
+};
 
-FullMethodId fullId(const AMQMethodBody* body) {
-    return fullId(body->amqpClassId(), body->amqpMethodId());
-}
-
-template <class M>
-FullMethodId fullId() { return fullId(M::CLASS_ID, M::METHOD_ID); }
-
-
-ClassifierHandler::ClassifierHandler(Chain wiring, Chain other)
-    : FrameHandler(other)
-{
-    map[fullId<ExchangeDeclareBody>()] = wiring;
-    map[fullId<ExchangeDeleteBody>()] = wiring;
-    map[fullId<QueueBindBody>()] = wiring;
-    map[fullId<QueueDeclareBody>()] = wiring;
-    map[fullId<QueueDeleteBody>()] = wiring;
-    map[fullId<QueueUnbindBody>()] = wiring;
-}
-
-void  ClassifierHandler::handle(AMQFrame& frame) {
-    // TODO aconway 2007-07-03: Flatten the frame hierarchy so we
-    // can do a single lookup to dispatch a frame.
-    Chain chosen;
-    AMQMethodBody* method = dynamic_cast<AMQMethodBody*>(frame.getBody());
-
-    // FIXME aconway 2007-07-05: Need to stop bypassed frames
-    // from overtaking mcast frames.
-    //
-    if (method) 
-        chosen=map[fullId(method)];
-    if (chosen)
-        chosen->handle(frame);
-    else
-        next->handle(frame);
-}
- 
+void ClassifierHandler::handle(AMQFrame& f) { Visitor(f, *this).chosen(f); }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.h?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.h Thu Aug 30 10:06:44 2007
@@ -20,9 +20,6 @@
  */
 
 #include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/amqp_types.h"
-
-#include <map>
 
 namespace qpid {
 namespace cluster {
@@ -33,12 +30,17 @@
 class ClassifierHandler : public framing::FrameHandler
 {
   public:
-    ClassifierHandler(Chain wiring, Chain other);
-
-    void handle(framing::AMQFrame& frame);
+    ClassifierHandler(framing::FrameHandler& wiring_,
+                      framing::FrameHandler& other_)
+        : wiring(wiring_), other(other_) {}
 
+    void handle(framing::AMQFrame&);
+    
   private:
-    std::map<uint32_t, framing::FrameHandler::Chain> map;
+    struct Visitor;
+  friend struct Visitor;
+    framing::FrameHandler& wiring;
+    framing::FrameHandler& other;
 };
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Aug 30 10:06:44 2007
@@ -37,7 +37,7 @@
 }
 
 ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
-    return out << m.first << "=" << m.second->url;
+    return out << m.first << "=" << m.second.url;
 }
 
 ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
@@ -46,21 +46,16 @@
     return out;
 }
 
-
-
-
-Cluster::Cluster(
-    const std::string& name_, const std::string& url_,
-    const FrameHandler::Chain& next
-) :
-    FrameHandler(next), 
-    cpg(new Cpg(*this)),
+Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker& broker) :
+    FrameHandler(&sessions), 
+    cpg(*this),
     name(name_),
     url(url_), 
-    self(cpg->getLocalNoideId(), getpid())
+    self(Id::self(cpg)),
+    sessions(broker, *this)
 {
     QPID_LOG(trace, *this << " Joining cluster: " << name_);
-    cpg->join(name);
+    cpg.join(name);
     notify();
     dispatcher=Thread(*this);
     // Wait till we show up in the cluster map.
@@ -74,8 +69,7 @@
 Cluster::~Cluster() {
     QPID_LOG(trace, *this << " Leaving cluster.");
     try {
-        cpg->leave(name);
-        cpg.reset();
+        cpg.leave(name);
         dispatcher.join();
     }
     catch (const std::exception& e) {
@@ -90,7 +84,7 @@
     frame.encode(buf);
     buf.flip();
     iovec iov = { buf.start(), frame.size() };
-    cpg->mcast(name, &iov, 1);
+    cpg.mcast(name, &iov, 1);
 }
 
 void Cluster::notify() {
@@ -104,7 +98,6 @@
 }
 
 Cluster::MemberList Cluster::getMembers() const {
-    // TODO aconway 2007-07-04: use read/write lock?
     Mutex::ScopedLock l(lock);
     MemberList result(members.size());
     std::transform(members.begin(), members.end(), result.begin(),
@@ -150,11 +143,7 @@
     MemberList list;
     {
         Mutex::ScopedLock l(lock);
-        shared_ptr<Member>& member=members[from];
-        if (!member) 
-            member.reset(new Member(notifyIn->getUrl()));
-        else 
-            member->url = notifyIn->getUrl();
+        members[from].url=notifyIn->getUrl();
         lock.notifyAll();
         QPID_LOG(trace, *this << ": members joined: " << members);
     }
@@ -186,7 +175,7 @@
 }
 
 void Cluster::run() {
-    cpg->dispatchBlocking();
+    cpg.dispatchBlocking();
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Aug 30 10:06:44 2007
@@ -19,16 +19,16 @@
  *
  */
 
-#include "qpid/cluster/Cpg.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/shared_ptr.h"
+#include "SessionManager.h"
+#include "Cpg.h"
+
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/log/Logger.h"
 
+#include <boost/optional.hpp>
 #include <boost/function.hpp>
-#include <boost/scoped_ptr.hpp>
 
 #include <map>
 #include <vector>
@@ -42,8 +42,6 @@
  * As FrameHandler, handles frames by sending them to the
  * cluster. Frames received from the cluster are sent to the next
  * FrameHandler in the chain.
- * 
- * 
  */
 class Cluster : public framing::FrameHandler,
                 private sys::Runnable, private Cpg::Handler
@@ -51,24 +49,23 @@
   public:
     /** Details of a cluster member */
     struct Member {
-        typedef shared_ptr<const Member> Ptr;
-        Member(const std::string& url_) : url(url_) {}
+        Member(const std::string& url_=std::string()) : url(url_) {}
         std::string url;        ///< Broker address.
     };
     
-    typedef std::vector<Member::Ptr> MemberList;
+    typedef std::vector<Member> MemberList;
 
     /**
      * Join a cluster.
      * @param name of the cluster.
      * @param url of this broker, sent to the cluster.
-     * @param handler for frames received from the cluster.
      */
-    Cluster(const std::string& name, const std::string& url,
-            const framing::FrameHandler::Chain& next);
+    Cluster(const std::string& name, const std::string& url, broker::Broker&);
 
     virtual ~Cluster();
 
+    framing::HandlerUpdater& getHandlerUpdater() { return sessions; }
+    
     /** Get the current cluster membership. */
     MemberList getMembers() const;
 
@@ -90,7 +87,7 @@
     
   private:
     typedef Cpg::Id Id;
-    typedef std::map<Id, shared_ptr<Member> >  MemberMap;
+    typedef std::map<Id, Member>  MemberMap;
     
     void notify();              ///< Notify cluster of my details.
 
@@ -114,13 +111,14 @@
     void handleClusterFrame(Id from, framing::AMQFrame&);
 
     mutable sys::Monitor lock;
-    boost::scoped_ptr<Cpg> cpg;
+    Cpg cpg;
     Cpg::Name name;
     std::string url;
     Id self;
     MemberMap members;
     sys::Thread dispatcher;
     boost::function<void()> callback;
+    SessionManager sessions;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Aug 30 10:06:44 2007
@@ -20,6 +20,10 @@
 #include "qpid/cluster/SessionManager.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
+#include "qpid/shared_ptr.h"
+
+#include <boost/optional.hpp>
+#include <boost/utility/in_place_factory.hpp>
 
 namespace qpid {
 namespace cluster {
@@ -38,23 +42,18 @@
     };
 
     ClusterOptions options;
-    shared_ptr<Cluster> cluster;
-    shared_ptr<SessionManager> sessions;
+    boost::optional<Cluster> cluster;
+    boost::optional<SessionManager> sessions;
 
-    Options* getOptions() {
-        return &options;
-    }
+    Options* getOptions() { return &options; }
 
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         // Only provide to a Broker, and only if the --cluster config is set.
         if (broker && !options.clusterName.empty()) {
             assert(!cluster); // A process can only belong to one cluster.
-
-            sessions.reset(new SessionManager(*broker));
-            cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions));
-            sessions->setClusterSend(cluster); 
-            broker->add(sessions);
+            cluster = boost::in_place(options.clusterName, broker->getUrl(), boost::ref(*broker));
+            broker->add(make_shared_ptr(&cluster->getHandlerUpdater(), nullDeleter));
         }
     }
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Aug 30 10:06:44 2007
@@ -25,6 +25,8 @@
 #include <limits>
 #include <iterator>
 
+#include <unistd.h>
+
 namespace qpid {
 namespace cluster {
 
@@ -172,6 +174,11 @@
 
 ostream& operator <<(ostream& out, const cpg_name& name) {
     return out << string(name.value, name.length);
+}
+
+
+Cpg::Id Cpg::Id::self(Cpg& cpg) {
+    return Id(cpg.getLocalNoideId(), getpid());
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Thu Aug 30 10:06:44 2007
@@ -61,6 +61,8 @@
         Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
         Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
 
+        static Id self(Cpg& cpg);
+
         operator uint64_t() const { return id; }
         uint32_t nodeId() const { return id >> 32; }
         pid_t pid() const { return id & 0xFFFF; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp Thu Aug 30 10:06:44 2007
@@ -28,6 +28,8 @@
 #include "qpid/broker/BrokerChannel.h"
 #include "qpid/framing/ChannelAdapter.h"
 
+#include <boost/utility/in_place_factory.hpp>
+
 namespace qpid {
 namespace cluster {
 
@@ -36,7 +38,9 @@
 using namespace broker;
 
 /** Handler to send frames direct to local broker (bypass correlation etc.) */
-struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter {
+struct SessionManager::BrokerHandler :
+        public FrameHandler, private ChannelAdapter, private DeliveryAdapter
+{
     Connection connection;
     Channel channel;
     BrokerAdapter adapter;
@@ -74,38 +78,35 @@
     virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {}
 };
 
-SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {}
+SessionManager::~SessionManager(){}
+
+SessionManager::SessionManager(Broker& b, FrameHandler& c)
+    : cluster(c), localBroker(new BrokerHandler(b)) {}
 
 void SessionManager::update(ChannelId channel, FrameHandler::Chains& chains) {
     Mutex::ScopedLock l(lock);
     // Create a new local session, store local chains.
-    sessions[channel] = chains;
-    
-    // Replace local "in" chain to mcast wiring and process other frames
-    // as normal.
-    assert(clusterSend);
-    chains.in = make_shared_ptr(
-        new ClassifierHandler(clusterSend, chains.in));
+    assert(!sessions[channel]);
+    boost::optional<Session>& session=sessions[channel];
+    session = boost::in_place(boost::ref(cluster), boost::ref(chains.in));
+    chains.in = &session->classifier;
 }
 
 void SessionManager::handle(AMQFrame& frame) {
     // Incoming from cluster.
     {
         Mutex::ScopedLock l(lock);
-        SessionMap::iterator i = sessions.find(frame.getChannel());
+        SessionMap::iterator i=sessions.find(frame.getChannel());
         if (i == sessions.end()) {
             // Non-local wiring method frame, invoke locally.
-            localBroker->handle(frame);
+            (*localBroker)(frame);
         }
         else {
             // Local frame continuing on local chain
-            i->second.in->handle(frame);
+            assert(i->second);
+            i->second->cont(frame);
         }
     }
-}
-
-void SessionManager::setClusterSend(const FrameHandler::Chain& send) {
-    clusterSend=send;
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h Thu Aug 30 10:06:44 2007
@@ -19,12 +19,15 @@
  *
  */
 
+#include "ClassifierHandler.h"
+
 #include "qpid/framing/HandlerUpdater.h"
 #include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/Uuid.h"
 #include "qpid/sys/Mutex.h"
 
 #include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/optional.hpp>
 
 #include <map>
 
@@ -34,25 +37,31 @@
 class Broker;
 }
 
+namespace framing {
+class Uuid;
+}
+
 namespace cluster {
 
 /**
- * Manage sessions and handler chains for the cluster.
+ * Manage the clusters session map.
  * 
  */
 class SessionManager : public framing::HandlerUpdater, public framing::FrameHandler,
                        private boost::noncopyable
 {
   public:
-    SessionManager(broker::Broker& broker);
+    SessionManager(broker::Broker& broker, framing::FrameHandler& cluster);
+    ~SessionManager();
 
-    /** Set the handler to send to the cluster */
-    void setClusterSend(const framing::FrameHandler::Chain& send);
-    
-    /** As ChannelUpdater update the handler chains. */
+    /** ChannelUpdater: add cluster handlers to session. */
     void update(framing::ChannelId, framing::FrameHandler::Chains&);
 
-    /** As FrameHandler frames received from the cluster */
+    // FIXME aconway 2007-08-30: Need setUp and tearDown instead of just
+    // update, so we can tear down closed sesions.
+    // Or add FrameHandler::destroy(Session) to notify all handlers?
+
+    /** FrameHandler: map frames from the cluster to sessions. */
     void handle(framing::AMQFrame&);
 
     /** Get ChannelID for UUID. Return 0 if no mapping */
@@ -60,11 +69,20 @@
     
   private:
     class SessionOperations;
-    typedef std::map<framing::ChannelId,framing::FrameHandler::Chains> SessionMap;
+    class BrokerHandler;
+
+    struct Session {
+        Session(framing::FrameHandler& cluster, framing::FrameHandler& cont_)
+            : cont(cont_), classifier(cluster,cont_) {}
+        framing::FrameHandler& cont; // Continue local dispatch
+        ClassifierHandler classifier;
+    };
+
+    typedef std::map<framing::ChannelId,boost::optional<Session> > SessionMap;
 
     sys::Mutex lock;
-    framing::FrameHandler::Chain clusterSend;
-    framing::FrameHandler::Chain localBroker;
+    framing::FrameHandler& cluster;
+    boost::scoped_ptr<BrokerHandler> localBroker;
     SessionMap sessions;
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h Thu Aug 30 10:06:44 2007
@@ -45,24 +45,17 @@
  */
 struct FrameDefaultVisitor : public AMQBodyConstVisitor, public MethodBodyDefaultVisitor
 {
-    void visit(const AMQHeaderBody&) { defaultVisit(); }
-    void visit(const AMQContentBody&) { defaultVisit(); }
-    void visit(const AMQHeartbeatBody&) { defaultVisit(); }
-    void visit(const AMQMethodBody& method) { method.accept(static_cast<MethodBodyDefaultVisitor&>(*this)); }
+    virtual void defaultVisit(const AMQBody&) = 0;
 
+    void visit(const AMQHeaderBody& b) { defaultVisit(b); }
+    void visit(const AMQContentBody& b) { defaultVisit(b); }
+    void visit(const AMQHeartbeatBody& b) { defaultVisit(b); }
+    void visit(const AMQMethodBody& b) { b.accept(static_cast<MethodBodyDefaultVisitor&>(*this)); }
+    void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); }
+    
     using AMQBodyConstVisitor::visit;
     using MethodBodyDefaultVisitor::visit;
 };
-
-/**
- * A FrameHandler that is implemented as a visitor.
- */
-struct FrameVisitorHandler : public FrameHandler,
-                             protected FrameDefaultVisitor
-{
-    void handle(AMQFrame& f) { f.getBody()->accept(*this); }
-};
-
 
 }} // namespace qpid::framing
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp Thu Aug 30 10:06:44 2007
@@ -42,8 +42,8 @@
     BOOST_CHECK_EQUAL(1u, cluster.size());
     Cluster::MemberList members = cluster.getMembers();
     BOOST_CHECK_EQUAL(1u, members.size());
-    shared_ptr<const Cluster::Member> me=members.front();
-    BOOST_REQUIRE_EQUAL(me->url, "amqp:one:1");
+    Cluster::Member me=members.front();
+    BOOST_REQUIRE_EQUAL(me.url, "amqp:one:1");
 }
 
 /** Fork a process to test a cluster with two members */
@@ -93,18 +93,18 @@
 BOOST_AUTO_TEST_CASE(testClassifierHandlerWiring) {
     AMQFrame queueDecl(0, QueueDeclareBody(VER));
     AMQFrame messageTrans(0, MessageTransferBody(VER));
-    shared_ptr<CountHandler> wiring(new CountHandler());
-    shared_ptr<CountHandler> other(new CountHandler());
+    CountHandler wiring;
+    CountHandler other;
     
     ClassifierHandler classify(wiring, other);
 
     classify.handle(queueDecl);
-    BOOST_CHECK_EQUAL(1u, wiring->count);
-    BOOST_CHECK_EQUAL(0u, other->count);
+    BOOST_CHECK_EQUAL(1u, wiring.count);
+    BOOST_CHECK_EQUAL(0u, other.count);
     
     classify.handle(messageTrans);
-    BOOST_CHECK_EQUAL(1u, wiring->count);
-    BOOST_CHECK_EQUAL(1u, other->count);
+    BOOST_CHECK_EQUAL(1u, wiring.count);
+    BOOST_CHECK_EQUAL(1u, other.count);
 }
     
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h Thu Aug 30 10:06:44 2007
@@ -58,7 +58,7 @@
 struct TestCluster : public Cluster
 {
     TestCluster(string name, string url)
-        : Cluster(name, url, make_shared_ptr(&received, nullDeleter)) {}
+        : Cluster(name, url, *(qpid::broker::Broker*)0) {}
 
     /** Wait for cluster to be of size n. */
     bool waitFor(size_t n) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=571246&r1=571245&r2=571246&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Thu Aug 30 10:06:44 2007
@@ -20,7 +20,11 @@
 
 ais_check: ais_tests
 ais_tests:
-	echo $(AIS_TESTS) >$@
+	echo $(AIS_TESTS)
+	echo "# AIS tests" >$@
+	for t in $(AIS_TESTS); do echo ./$$t >$@; done
+	chmod a+x $@
+
 CLEANFILES+=ais_tests
 
 AIS_TESTS+=Cpg