You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/07/09 00:58:38 UTC

svn commit: r675017 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/ qpid/amqp_0_10/ qpid/broker/ qpid/cluster/ qpid/framing/ tests/

Author: aconway
Date: Tue Jul  8 15:58:37 2008
New Revision: 675017

URL: http://svn.apache.org/viewvc?rev=675017&view=rev
Log:
HandlerChain: plug-in handler chain extension points. Replaces Handler<T>::Chain.
Updated Sessoin & Connection handler chains and Cluster.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Jul  8 15:58:37 2008
@@ -248,8 +248,6 @@
   qpid/amqp_0_10/Connection.cpp \
   qpid/broker/Broker.cpp \
   qpid/broker/BrokerSingleton.cpp \
-  qpid/broker/ConnectionManager.h \
-  qpid/broker/ConnectionManager.cpp \
   qpid/broker/Exchange.cpp \
   qpid/broker/Queue.cpp \
   qpid/broker/PersistableMessage.cpp \
@@ -354,6 +352,7 @@
   qpid/amqp_0_10/Exception.h \
   qpid/Msg.h \
   qpid/Options.h \
+  qpid/HandlerChain.h \
   qpid/Plugin.h \
   qpid/ptr_map.h \
   qpid/RangeSet.h \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h?rev=675017&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h Tue Jul  8 15:58:37 2008
@@ -0,0 +1,97 @@
+#ifndef QPID_HANDLERCHAIN_H
+#define QPID_HANDLERCHAIN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/Plugin.h>
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <memory>
+
+namespace qpid {
+
+/**
+ * Chain-of-responsibility design pattern.
+ * 
+ * Construct a chain of objects deriving from Base. Each implements
+ * Base::f by doing its own logic and then calling Base::f on the next
+ * handler (or not if it chooses not to.)
+ *
+ * HandlerChain acts as a smart pointer to the first object in the chain.
+ */
+template <class Base> 
+class HandlerChain {
+  public:
+    /** Base class for chainable handlers */
+    class Handler : public Base {
+      public:
+        Handler() : next() {}
+        virtual ~Handler() {}
+        virtual void setNext(Base* next_) { next = next_; }
+
+      protected:
+        Base* next;
+    };
+
+    typedef std::auto_ptr<Handler> HandlerAutoPtr;
+    
+    /**@param target is the object at the end of the chain. */
+    HandlerChain(Base& target) : first(&target) {}
+
+    /** HandlerChain owns the ChainableHandler. */
+    void push(HandlerAutoPtr h) {
+        handlers.push_back(h);
+        h->setNext(first);
+        first = h.get();
+    }
+
+    // Smart pointer functions
+    Base* operator*() { return first; }
+    const Base* operator*() const { return first; }
+    Base* operator->() { return first; }
+    const Base* operator->() const { return first; }
+    operator bool() const { return first; }
+    
+  private:
+    boost::ptr_vector<Base> handlers;
+    Base* first;
+};
+
+/**
+ * A PluginHandlerChain calls Plugin::initAll(*this) on construction,
+ * allowing plugins to add handlers.
+ *
+ * @param Tag can be any class, use to distinguish different plugin
+ * chains with the same Base type.
+ */
+template <class Base, class Tag=void>
+struct PluginHandlerChain : public HandlerChain<Base>,
+                            public Plugin::Target
+{
+    PluginHandlerChain(Base& target) : HandlerChain<Base>(target) {
+        Plugin::initAll(*this);
+    }
+};
+
+
+} // namespace qpid
+
+#endif  /*!QPID_HANDLERCHAIN_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Tue Jul  8 15:58:37 2008
@@ -20,10 +20,13 @@
 
 #include "Plugin.h"
 #include "qpid/Options.h"
+#include <boost/bind.hpp>
+#include <algorithm>
 
 namespace qpid {
 
 namespace {
+
 Plugin::Plugins& thePlugins() {
     // This is a single threaded singleton implementation so
     // it is important to be sure that the first use of this
@@ -31,8 +34,17 @@
     static Plugin::Plugins plugins;
     return plugins;
 }
+
+void call(boost::function<void()> f) { f(); }
+
+} // namespace
+
+Plugin::Target::~Target() {
+    std::for_each(cleanup.begin(), cleanup.end(), &call);
 }
 
+void Plugin::Target::addCleanup(const boost::function<void()>& f) { cleanup.push_back(f); }
+
 Plugin::Plugin() {
     // Register myself.
     thePlugins().push_back(this);
@@ -44,6 +56,12 @@
 
 const Plugin::Plugins& Plugin::getPlugins() { return thePlugins(); }
 
+namespace {
+template <class F> void each_plugin(const F& f) {
+    std::for_each(Plugin::getPlugins().begin(), Plugin::getPlugins().end(), f);
+}
+}
+
 void Plugin::addOptions(Options& opts) {
     for (Plugins::const_iterator i = getPlugins().begin(); i != getPlugins().end(); ++i) {
         if ((*i)->getOptions())
@@ -51,4 +69,7 @@
     }
 }
 
+void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, t)); }
+void Plugin::initAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, t)); }
+
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h Tue Jul  8 15:58:37 2008
@@ -40,11 +40,17 @@
   public:
     /**
      * Base interface for targets that receive plug-ins.
-     *
-     * The Broker is a plug-in target, there might be others
-     * in future.
+     * Plug-ins can register clean-up functions to execute when
+     * the target is destroyed.
      */
-    struct Target { virtual ~Target() {} };
+    struct Target {
+      public:
+        virtual ~Target();
+        void addCleanup(const boost::function<void()>& cleanupFunction);
+
+      private:
+        std::vector<boost::function<void()> > cleanup;
+    };
 
     typedef std::vector<Plugin*> Plugins;
     
@@ -69,7 +75,9 @@
     virtual Options* getOptions();
 
     /**
-     * Initialize Plugin functionality on a Target.
+     * Initialize Plugin functionality on a Target, called before
+     * initializing the target.
+     *
      * Plugins should ignore targets they don't recognize.
      *
      * Called before the target itself is initialized.
@@ -77,7 +85,9 @@
     virtual void earlyInitialize(Target&) = 0;
 
     /**
-     * Initialize Plugin functionality on a Target.
+     * Initialize Plugin functionality on a Target. Called after
+     * initializing the target.
+     * 
      * Plugins should ignore targets they don't recognize.
      *
      * Called after the target is fully initialized.
@@ -89,6 +99,12 @@
      */
     static const Plugins& getPlugins();
 
+    /** Call earlyInitialize() on all registered plugins */
+    static void earlyInitAll(Target&);
+
+    /** Call initialize() on all registered plugins */
+    static void initAll(Target&);
+
     /** For each registered plugin, add plugin.getOptions() to opts. */
     static void addOptions(Options& opts);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Tue Jul  8 15:58:37 2008
@@ -29,7 +29,7 @@
 
 Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
     : frameQueueClosed(false), output(o),
-      connection(broker.getConnectionManager().create(this, broker, id, _isClient)),
+      connection(this, broker, id, _isClient),
       identifier(id), initialized(false), isClient(_isClient) {}
 
 size_t  Connection::decode(const char* buffer, size_t size) {
@@ -46,13 +46,13 @@
     framing::AMQFrame frame;
     while(frame.decode(in)) {
         QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
-        connection->received(frame);
+        connection.received(frame);
     }
     return in.getPosition();
 }
 
 bool Connection::canEncode() {
-    if (!frameQueueClosed) connection->doOutput();
+    if (!frameQueueClosed) connection.doOutput();
     Mutex::ScopedLock l(frameQueueLock);
     return (!isClient && !initialized) || !frameQueue.empty();
 }
@@ -91,7 +91,7 @@
 }
 
 void  Connection::closed() {
-    connection->closed();
+    connection.closed();
 }
 
 void Connection::send(framing::AMQFrame& f) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Tue Jul  8 15:58:37 2008
@@ -33,7 +33,6 @@
 namespace broker { class Broker; }
 namespace amqp_0_10 {
 
-// FIXME aconway 2008-03-18: Update to 0-10.
 class Connection  : public sys::ConnectionCodec,
                     public sys::ConnectionOutputHandler
 {
@@ -41,7 +40,7 @@
     bool frameQueueClosed;
     mutable sys::Mutex frameQueueLock;
     sys::OutputControl& output;
-    std::auto_ptr<broker::Connection> connection; // FIXME aconway 2008-03-18: 
+    broker::Connection connection;
     std::string identifier;
     bool initialized;
     bool isClient;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Jul  8 15:58:37 2008
@@ -23,7 +23,6 @@
  */
 
 #include "ConnectionFactory.h"
-#include "ConnectionManager.h"
 #include "ConnectionToken.h"
 #include "DirectExchange.h"
 #include "DtxManager.h"
@@ -121,7 +120,6 @@
     Options& getOptions() { return config; }
 
     SessionManager& getSessionManager() { return sessionManager; }
-    ConnectionManager& getConnectionManager() { return connectionManager; }
 
     management::ManagementObject*     GetManagementObject (void) const;
     management::Manageable*           GetVhostObject      (void) const;
@@ -159,7 +157,6 @@
     ConnectionFactory factory;
     DtxManager dtxManager;
     SessionManager sessionManager;
-    ConnectionManager connectionManager;
     management::ManagementAgent* managementAgent;
     management::Broker*          mgmtObject;
     Vhost::shared_ptr            vhostObject;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jul  8 15:58:37 2008
@@ -88,7 +88,7 @@
         links.notifyClosed(mgmtId);
 }
 
-void Connection::received(framing::AMQFrame& frame){ inChain(frame); }
+void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); }
     
 void Connection::receivedLast(framing::AMQFrame& frame){
     if (frame.getChannel() == 0 && frame.getMethod()) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jul  8 15:58:37 2008
@@ -43,6 +43,7 @@
 #include "SessionHandler.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/Connection.h"
+#include "qpid/HandlerChain.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
 
@@ -91,8 +92,6 @@
     void notifyConnectionForced(const std::string& text);
     void setUserId(const string& uid);
 
-    framing::FrameHandler::Chain& getInChain() { return inChain; } 
-
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -110,8 +109,7 @@
     management::Connection* mgmtObject;
     LinkRegistry& links;
     framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler;
-    framing::FrameHandler::Chain inChain;
-
+    PluginHandlerChain<framing::FrameHandler, Connection> inChain;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Tue Jul  8 15:58:37 2008
@@ -55,11 +55,8 @@
         throw SessionBusyException(QPID_MSG("Session already attached: " << id));
     Detached::iterator i = std::find(detached.begin(), detached.end(), id);
     std::auto_ptr<SessionState> state;
-    if (i == detached.end())  {
+    if (i == detached.end())  
         state.reset(new SessionState(broker, h, id, config));
-    for_each(observers.begin(), observers.end(),
-                 boost::bind(&Observer::opened, _1,boost::ref(*state)));
-    }
     else {
         state.reset(detached.release(i).release());
         state->attach(h);
@@ -99,8 +96,4 @@
     }
 }
 
-void SessionManager::add(const intrusive_ptr<Observer>& o) {
-    observers.push_back(o);
-}
-
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Tue Jul  8 15:58:37 2008
@@ -46,14 +46,6 @@
  */
 class SessionManager : private boost::noncopyable {
   public:
-    /**
-     * Observer notified of SessionManager events.
-     */
-    struct Observer : public RefCounted {
-        /** Called when a stateless session is attached. */
-        virtual void opened(SessionState&) {}
-    };
-    
     SessionManager(const qpid::SessionState::Configuration&, Broker&);
     
     ~SessionManager();
@@ -67,9 +59,6 @@
     /** Forget about an attached session. Called by SessionState destructor. */
     void forget(const SessionId&);
 
-    /** Add an Observer. */
-    void add(const boost::intrusive_ptr<Observer>&);
-    
     Broker& getBroker() const { return broker; }
 
     const qpid::SessionState::Configuration& getSessionConfig() const { return config; }
@@ -77,7 +66,6 @@
   private:
     typedef boost::ptr_vector<SessionState> Detached; // Sorted in expiry order.
     typedef std::set<SessionId> Attached;
-    typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
 
     void eraseExpired();             
 
@@ -85,7 +73,6 @@
     Detached detached;
     Attached attached;
     qpid::SessionState::Configuration config;
-    Observers observers;
     Broker& broker;
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Jul  8 15:58:37 2008
@@ -224,8 +224,8 @@
         getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));        
 }
 
-void SessionState::handleIn(AMQFrame& f) { inChain.handle(f); }
-void SessionState::handleOut(AMQFrame& f) { outChain.handle(f); }
+void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); }
+void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); }
 
 void SessionState::handleInLast(AMQFrame& frame) {
     SequenceNumber commandId = receiverGetCurrent();
@@ -291,8 +291,4 @@
 
 Broker& SessionState::getBroker() { return broker; }
 
-framing::FrameHandler::Chain& SessionState::getInChain() { return inChain; }
-
-framing::FrameHandler::Chain& SessionState::getOutChain() { return outChain; }
-
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Jul  8 15:58:37 2008
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/SessionState.h"
+#include "qpid/HandlerChain.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/sys/Mutex.h"
@@ -58,8 +59,8 @@
 class SessionManager;
 
 /**
- * Broker-side session state includes sessions handler chains, which may
- * themselves have state. 
+ * Broker-side session state includes session's handler chains, which
+ * may themselves have state.
  */
 class SessionState : public qpid::SessionState, 
                      public SessionContext,
@@ -101,8 +102,9 @@
 
     void readyToSend();
 
-    framing::FrameHandler::Chain& getInChain(); 
-    framing::FrameHandler::Chain& getOutChain(); 
+    // Tag types to identify PluginHandlerChains. 
+    struct InTag {};
+    struct OutTag {};
 
   private:
 
@@ -131,7 +133,9 @@
     management::Session* mgmtObject;
     framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler;
     framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler;
-    framing::FrameHandler::Chain inChain, outChain;
+
+    qpid::PluginHandlerChain<framing::FrameHandler, InTag> inChain;
+    qpid::PluginHandlerChain<framing::FrameHandler, OutTag> outChain;
 
   friend class SessionManager;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jul  8 15:58:37 2008
@@ -23,6 +23,7 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterNotifyBody.h"
 #include "qpid/log/Statement.h"
+#include "qpid/memory.h"
 #include <boost/bind.hpp>
 #include <boost/scoped_array.hpp>
 #include <algorithm>
@@ -36,25 +37,12 @@
 using namespace std;
 using broker::Connection;
 
-namespace {
-
-// FIXME aconway 2008-07-01: sending every frame to cluster,
-// serializing all processing in cluster deliver thread. 
-// This will not perform at all, but provides a correct starting point.
-//
-// TODO:
-// - Fake "Connection" for cluster: owns shadow sessions.
-// - Maintain shadow sessions.
-// - Apply foreign frames to shadow sessions.
-// 
-
-
 // Beginning of inbound chain: send to cluster.
-struct ClusterSendHandler : public FrameHandler {
-    Connection& connection;
+struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler {
+    Cluster::ConnectionChain& connection;
     Cluster& cluster;
     
-    ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {}
+    ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {}
 
     void handle(AMQFrame& f) {
         // FIXME aconway 2008-01-29: Refcount Connections to ensure
@@ -63,16 +51,8 @@
     }
 };
 
-struct ConnectionObserver : public broker::ConnectionManager::Observer {
-    Cluster& cluster;
-    ConnectionObserver(Cluster& c) : cluster(c) {}
-    
-    void created(Connection& c) {
-        // FIXME aconway 2008-06-16: clean up chaining and observers.
-        ClusterSendHandler* sender=new ClusterSendHandler(c, cluster);
-        c.getInChain().insert(sender);
-    }
-};
+void Cluster::initialize(Cluster::ConnectionChain& cc) {
+    cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this)));
 }
 
 ostream& operator <<(ostream& out, const Cluster& cluster) {
@@ -95,7 +75,6 @@
     cpg(*this),
     name(name_),
     url(url_),
-    observer(new ConnectionObserver(*this)),
     self(cpg.self())
 {
     QPID_LOG(trace, "Joining cluster: " << name_);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jul  8 15:58:37 2008
@@ -22,6 +22,7 @@
 #include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/ShadowConnectionOutputHandler.h"
 
+#include "qpid/HandlerChain.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Runnable.h"
@@ -47,6 +48,8 @@
 class Cluster : private sys::Runnable, private Cpg::Handler
 {
   public:
+    typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
+
     /** Details of a cluster member */
     struct Member {
         Member(const Url& url_=Url()) : url(url_) {}
@@ -62,11 +65,11 @@
      */
     Cluster(const std::string& name, const Url& url, broker::Broker&);
 
+    // Add cluster handlers to broker chains.
+    void initialize(ConnectionChain&);
+
     virtual ~Cluster();
 
-    // FIXME aconway 2008-01-29: 
-    boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; }
-    
     /** Get the current cluster membership. */
     MemberList getMembers() const;
 
@@ -124,7 +127,6 @@
     MemberMap members;
     sys::Thread dispatcher;
     boost::function<void()> callback;
-    boost::intrusive_ptr<broker::ConnectionManager::Observer> observer;
     Id self;
     ShadowConnectionMap shadowConnectionMap;
     ShadowConnectionOutputHandler shadowOut;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Jul  8 15:58:37 2008
@@ -54,24 +54,29 @@
 };
 
 struct ClusterPlugin : public Plugin {
+    typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
 
     ClusterOptions options;
     boost::optional<Cluster> cluster;
 
-    Options* getOptions() { return &options; }
+    template <class Chain> void init(Plugin::Target& t) {
+        Chain* c = dynamic_cast<Chain*>(&t);
+        if (c) cluster->initialize(*c);
+    }
 
     void earlyInitialize(Plugin::Target&) {}
 
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
-        // Only provide to a Broker, and only if the --cluster config is set.
         if (broker && !options.name.empty()) {
-            assert(!cluster); // A process can only belong to one cluster.
+            if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process.");
             cluster = boost::in_place(options.name,
                                       options.getUrl(broker->getPort()),
                                       boost::ref(*broker));
-            broker->getConnectionManager().add(cluster->getObserver());	
+            return;
         }
+        if (!cluster) return;   // Ignore chain handlers if we didn't init a cluster.
+        init<ConnectionChain>(target);
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h Tue Jul  8 15:58:37 2008
@@ -28,7 +28,6 @@
 namespace qpid {
 namespace framing {
 
-/** Generic handler that can be linked into chains. */
 template <class T>
 struct Handler {
     typedef T HandledType;
@@ -46,23 +45,6 @@
     /** Pointer to next handler in a linked list. */
     Handler<T>* next;
 
-    /** A Chain is a handler holding a linked list of sub-handlers.
-     * Chain::next is invoked after the full chain, it is not itself part of the chain.
-     * Handlers inserted into the chain are deleted by the Chain dtor.
-     */
-    class Chain : public Handler<T> {
-      public:
-        Chain(Handler<T>& next_) : Handler(&next_), first(&next_) {}
-        ~Chain() { while (first != next) pop(); }
-        void handle(T t) { first->handle(t); }
-        void insert(Handler<T>* h) { h->next = first; first = h; }
-        bool empty() { return first == next; }
-
-      private:
-        void pop() { Handler<T>* p=first; first=first->next; delete p; }
-        Handler<T>* first;
-    };
-
     /** Adapt any void(T) functor as a Handler.
      * Functor<F>(f) will copy f.
      * Functor<F&>(f) will only take a reference to x.
@@ -84,7 +66,7 @@
         MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {}
         void handle(T t) { (target->*F)(t); }
 
-        /** Allow calling with -> syntax, compatible with Chains */
+        /** Allow calling with -> syntax, like a qpid::HandlerChain */
         MemFunRef* operator->() { return this; }
 
       private:
@@ -103,15 +85,13 @@
     };
         
     /** Support for implementing an in-out handler pair as a single class.
-     * Public interface is Handler<T>::Chains pair, but implementation
-     * overrides handleIn, handleOut functions in a single class.
+     * Overrides handleIn, handleOut functions in a single class.
      */
     struct InOutHandler : protected InOutHandlerInterface {
         InOutHandler(Handler<T>* nextIn=0, Handler<T>* nextOut=0) : in(*this, nextIn), out(*this, nextOut) {}
         MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleIn> in;
         MemFunRef<InOutHandlerInterface, &InOutHandlerInterface::handleOut> out;
     };
-
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=675017&r1=675016&r2=675017&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Jul  8 15:58:37 2008
@@ -174,7 +174,7 @@
     }    
 }
 
-QPID_AUTO_TEST_CASE(testMessageReplication) {
+QPID_AUTO_TEST_CASE(testMessageEnqueue) {
     // Enqueue on one broker, dequeue on another.
     ClusterFixture cluster(2);
     Client c0(cluster[0].getPort());
@@ -190,6 +190,28 @@
     BOOST_CHECK_EQUAL(string("bar"), msg.getData());
 }
 
-// TODO aconway 2008-06-25: dequeue replication, failover.
+QPID_AUTO_TEST_CASE(testMessageDequeue) {
+    // Enqueue on one broker, dequeue on two others.
+    ClusterFixture cluster (3);
+    Client c0(cluster[0].getPort());
+    c0.session.queueDeclare("q");
+    c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
+    c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+    c0.session.close();
+
+    Message msg;
+
+    Client c1(cluster[1].getPort());
+    BOOST_CHECK(c1.subs.get(msg, "q"));
+    BOOST_CHECK_EQUAL("foo", msg.getData());
+    
+    Client c2(cluster[2].getPort());
+    BOOST_CHECK(c1.subs.get(msg, "q"));
+    BOOST_CHECK_EQUAL("bar", msg.getData());
+    QueueQueryResult r = c2.session.queueQuery("q");
+    BOOST_CHECK_EQUAL(0, r.getMessageCount());
+}
+
+// TODO aconway 2008-06-25: failover.
 
 QPID_AUTO_TEST_SUITE_END()