You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/07/17 02:03:51 UTC

svn commit: r677471 - in /incubator/qpid/trunk/qpid/cpp: src/ src/qpid/ src/qpid/amqp_0_10/ src/qpid/broker/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/

Author: aconway
Date: Wed Jul 16 17:03:50 2008
New Revision: 677471

URL: http://svn.apache.org/viewvc?rev=677471&view=rev
Log:

Cluster: shadow connections, fix lifecycle & valgrind issues.
 - tests/ForkedBroker: improved broker forking, exec full qpidd.
 - Plugin::addFinalizer - more flexible way to shutdown plugins.
 - Reworked cluster extension points using boost::function.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Options.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/RefCounted.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/run_test
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Jul 16 17:03:50 2008
@@ -353,7 +353,6 @@
   qpid/amqp_0_10/Exception.h \
   qpid/Msg.h \
   qpid/Options.h \
-  qpid/HandlerChain.h \
   qpid/Plugin.h \
   qpid/ptr_map.h \
   qpid/RangeSet.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Wed Jul 16 17:03:50 2008
@@ -12,6 +12,8 @@
   qpid/cluster/Cpg.h \
   qpid/cluster/Dispatchable.h \
   qpid/cluster/ClusterPlugin.cpp \
+  qpid/cluster/ConnectionInterceptor.h \
+  qpid/cluster/ConnectionInterceptor.cpp \
   qpid/cluster/ClassifierHandler.h \
   qpid/cluster/ClassifierHandler.cpp \
   qpid/cluster/ShadowConnectionOutputHandler.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Options.cpp Wed Jul 16 17:03:50 2008
@@ -176,7 +176,7 @@
 
 
 
-void Options::parse(int argc, char** argv, const std::string& configFile, bool allowUnknown)
+void Options::parse(int argc, char const* const* argv, const std::string& configFile, bool allowUnknown)
 {
     string defaultConfigFile = configFile; // May be changed by env/cmdline
     string parsing;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Options.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Options.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Options.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Options.h Wed Jul 16 17:03:50 2008
@@ -209,7 +209,7 @@
      * Note the filename argument can reference an options variable that
      * is updated by argc/argv or environment variable parsing.
      */
-    void parse(int argc, char** argv,
+    void parse(int argc, char const* const* argv,
                const std::string& configfile=std::string(),
                bool  allowUnknown = false);
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Wed Jul 16 17:03:50 2008
@@ -35,15 +35,20 @@
     return plugins;
 }
 
-void call(boost::function<void()> f) { f(); }
+void invoke(boost::function<void()> f) { f(); }
 
 } // namespace
 
-Plugin::Target::~Target() {
-    std::for_each(cleanup.begin(), cleanup.end(), &call);
+Plugin::Target::~Target() { finalize(); }
+
+void Plugin::Target::finalize() {
+    for_each(finalizers.begin(), finalizers.end(), invoke);
+    finalizers.clear();
 }
 
-void Plugin::Target::addCleanup(const boost::function<void()>& f) { cleanup.push_back(f); }
+void Plugin::Target::addFinalizer(const boost::function<void()>& f) {
+    finalizers.push_back(f);
+}
 
 Plugin::Plugin() {
     // Register myself.
@@ -69,7 +74,7 @@
     }
 }
 
-void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, t)); }
-void Plugin::initAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, t)); }
+void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, boost::ref(t))); }
+void Plugin::initializeAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, boost::ref(t))); }
 
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h Wed Jul 16 17:03:50 2008
@@ -21,11 +21,9 @@
  *
  */
 
-#include "qpid/shared_ptr.h"
 #include <boost/noncopyable.hpp>
-#include <vector>
 #include <boost/function.hpp>
-
+#include <vector>
 
 /**@file Generic plug-in framework. */
 
@@ -35,30 +33,36 @@
 /**
  * Plug-in base class.
  */
-class Plugin : boost::noncopyable
-{
+class Plugin : private boost::noncopyable {
   public:
+    typedef std::vector<Plugin*> Plugins;
+    
     /**
-     * Base interface for targets that receive plug-ins.
-     * Plug-ins can register clean-up functions to execute when
-     * the target is destroyed.
+     * Base interface for targets that can receive plug-ins.
+     * Also allows plug-ins to attach a a function to be called
+     * when the target is 'finalized'.
      */
-    struct Target {
+    class Target : private boost::noncopyable
+    {
       public:
+        /** Calls finalize() if not already called. */
         virtual ~Target();
-        void addCleanup(const boost::function<void()>& cleanupFunction);
+
+        /** Run all the finalizers */
+        void finalize();
+
+        /** Add a function to run when finalize() is called */
+        void addFinalizer(const boost::function<void()>&);
 
       private:
-        std::vector<boost::function<void()> > cleanup;
+        std::vector<boost::function<void()> > finalizers;
     };
 
-    typedef std::vector<Plugin*> Plugins;
-    
     /**
-     * Construct registers the plug-in to appear in getPlugins().
+     * Constructor registers the plug-in to appear in getPlugins().
      * 
      * A concrete Plugin is instantiated as a global or static
-     * member variable in a library so it is registered during static
+     * member variable in a library so it is registered during 
      * initialization when the library is loaded.
      */
     Plugin();
@@ -103,7 +107,7 @@
     static void earlyInitAll(Target&);
 
     /** Call initialize() on all registered plugins */
-    static void initAll(Target&);
+    static void initializeAll(Target&);
 
     /** For each registered plugin, add plugin.getOptions() to opts. */
     static void addOptions(Options& opts);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCounted.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/RefCounted.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/RefCounted.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/RefCounted.h Wed Jul 16 17:03:50 2008
@@ -46,23 +46,6 @@
     virtual ~RefCounted() {};
 };
 
-/**
- * Reference-counted member of a reference-counted parent class.
- * Delegates reference counts to the parent so that the parent is
- * deleted only when there are no references to the parent or any of
- * its children.
- * TODO: Delete this class if it's unused as I don't think this class makes much sense:
- */
-struct RefCountedChild {
-    RefCounted& parent;
-
-protected:
-    RefCountedChild(RefCounted& parent_) : parent(parent_) {}
-
-public:
-    void addRef() const { parent.addRef(); }
-    void release() const { parent.release(); }
-};
 
 } // namespace qpid
 
@@ -70,8 +53,6 @@
 namespace boost {
 inline void intrusive_ptr_add_ref(const qpid::RefCounted* p) { p->addRef(); }
 inline void intrusive_ptr_release(const qpid::RefCounted* p) { p->release(); }
-inline void intrusive_ptr_add_ref(const qpid::RefCountedChild* p) { p->addRef(); }
-inline void intrusive_ptr_release(const qpid::RefCountedChild* p) { p->release(); }
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Wed Jul 16 17:03:50 2008
@@ -29,7 +29,7 @@
 
 Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
     : frameQueueClosed(false), output(o),
-      connection(this, broker, id, _isClient),
+      connection(new broker::Connection(this, broker, id, _isClient)),
       identifier(id), initialized(false), isClient(_isClient) {}
 
 size_t  Connection::decode(const char* buffer, size_t size) {
@@ -46,13 +46,13 @@
     framing::AMQFrame frame;
     while(frame.decode(in)) {
         QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
-        connection.received(frame);
+         connection->received(frame);
     }
     return in.getPosition();
 }
 
 bool Connection::canEncode() {
-    if (!frameQueueClosed) connection.doOutput();
+    if (!frameQueueClosed) connection->doOutput();
     Mutex::ScopedLock l(frameQueueLock);
     return (!isClient && !initialized) || !frameQueue.empty();
 }
@@ -91,7 +91,7 @@
 }
 
 void  Connection::closed() {
-    connection.closed();
+    connection->closed();
 }
 
 void Connection::send(framing::AMQFrame& f) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Wed Jul 16 17:03:50 2008
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_CONNECTION_H
-#define QPID_BROKER_CONNECTION_H
+#ifndef QPID_AMQP_0_10_CONNECTION_H
+#define QPID_AMQP_0_10_CONNECTION_H
 
 /*
  *
@@ -24,8 +24,8 @@
 #include "qpid/sys/ConnectionCodec.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/sys/Mutex.h"
-#include "Connection.h"
 #include "qpid/broker/Connection.h"
+#include <boost/intrusive_ptr.hpp>
 #include <queue>
 #include <memory>
 
@@ -40,7 +40,7 @@
     bool frameQueueClosed;
     mutable sys::Mutex frameQueueLock;
     sys::OutputControl& output;
-    broker::Connection connection;
+    boost::intrusive_ptr<broker::Connection> connection;
     std::string identifier;
     bool initialized;
     bool isClient;
@@ -60,4 +60,4 @@
 
 }} // namespace qpid::amqp_0_10
 
-#endif  /*!QPID_BROKER_CONNECTION_H*/
+#endif  /*!QPID_AMQP_0_10_CONNECTION_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jul 16 17:03:50 2008
@@ -285,6 +285,7 @@
     // call any function that is not async-signal safe.
     // Any unsafe shutdown actions should be done in the destructor.
     poller->shutdown();
+    finalize();                 // Finalize any plugins.
 }
 
 Broker::~Broker() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Jul 16 17:03:50 2008
@@ -49,14 +49,14 @@
 
 Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
     ConnectionState(out_, broker_),
+    receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
+    closedFn(boost::bind(&Connection::closedImpl, this)),
     adapter(*this, isLink_),
     isLink(isLink_),
     mgmtClosing(false),
     mgmtId(mgmtId_),
     mgmtObject(0),
-    links(broker_.getLinks()),
-    lastInHandler(*this),
-    inChain(lastInHandler)
+    links(broker_.getLinks())
 {
     Manageable* parent = broker.GetVhostObject();
 
@@ -71,6 +71,8 @@
             mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink);
         agent->addObject(mgmtObject);
     }
+
+    Plugin::initializeAll(*this); // Let plug-ins update extension points.
 }
 
 void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -79,7 +81,6 @@
     out->activateOutput();
 }
 
-
 Connection::~Connection()
 {
     if (mgmtObject != 0)
@@ -88,9 +89,9 @@
         links.notifyClosed(mgmtId);
 }
 
-void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); }
-    
-void Connection::receivedLast(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); }
+
+void Connection::receivedImpl(framing::AMQFrame& frame){
     if (frame.getChannel() == 0 && frame.getMethod()) {
         adapter.handle(frame);
     } else {
@@ -170,10 +171,13 @@
 
 void Connection::idleIn(){}
 
-void Connection::closed(){ // Physically closed, suspend open sessions.
+void Connection::closed() { closedFn(); }
+
+void Connection::closedImpl(){ // Physically closed, suspend open sessions.
     try {
         while (!channels.empty()) 
             ptr_map_ptr(channels.begin())->handleDetach();
+        // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10.
         while (!exclusiveQueues.empty()) {
             Queue::shared_ptr q(exclusiveQueues.front());
             q->releaseExclusiveOwnership();
@@ -183,8 +187,7 @@
             exclusiveQueues.erase(exclusiveQueues.begin());
         }
     } catch(std::exception& e) {
-        QPID_LOG(error, " Unhandled exception while closing session: " <<
-                 e.what());
+        QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
         assert(0);
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Wed Jul 16 17:03:50 2008
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_CONNECTION_H
+#define QPID_BROKER_CONNECTION_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -18,8 +21,6 @@
  * under the License.
  *
  */
-#ifndef _Connection_
-#define _Connection_
 
 #include <memory>
 #include <sstream>
@@ -43,7 +44,8 @@
 #include "SessionHandler.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/Connection.h"
-#include "qpid/HandlerChain.h"
+#include "qpid/Plugin.h"
+#include "qpid/RefCounted.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
 
@@ -53,11 +55,11 @@
 class LinkRegistry;
 
 class Connection : public sys::ConnectionInputHandler, 
-                   public ConnectionState
+                   public ConnectionState,
+                   public Plugin::Target,
+                   public RefCounted
 {
   public:
-    typedef boost::shared_ptr<Connection> shared_ptr;
-
     Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
     ~Connection ();
 
@@ -74,8 +76,8 @@
     void received(framing::AMQFrame& frame);
     void idleOut();
     void idleIn();
-    void closed();
     bool doOutput();
+    void closed();
 
     void closeChannel(framing::ChannelId channel);
 
@@ -92,12 +94,16 @@
     void notifyConnectionForced(const std::string& text);
     void setUserId(const string& uid);
 
+    // Extension points: allow plugins to insert additional functionality.
+    boost::function<void(framing::AMQFrame&)> receivedFn;
+    boost::function<void()> closedFn; 
+
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
 
-    // End of the received handler chain.
-    void receivedLast(framing::AMQFrame& frame);
+    void receivedImpl(framing::AMQFrame& frame);
+    void closedImpl();
 
     ChannelMap channels;
     framing::AMQP_ClientProxy::Connection* client;
@@ -108,10 +114,8 @@
     boost::function0<void> ioCallback;
     management::Connection* mgmtObject;
     LinkRegistry& links;
-    framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler;
-    PluginHandlerChain<framing::FrameHandler, Connection> inChain;
 };
 
 }}
 
-#endif
+#endif  /*!QPID_BROKER_CONNECTION_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Wed Jul 16 17:03:50 2008
@@ -70,6 +70,8 @@
     sys::ConnectionOutputHandler& getOutput() const { return *out; }
     framing::ProtocolVersion getVersion() const { return version; }
 
+    void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; }
+
   protected:
     framing::ProtocolVersion version;
     sys::ConnectionOutputHandler* out;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Jul 16 17:03:50 2008
@@ -54,11 +54,7 @@
       adapter(semanticState),
       msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
       enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
-      mgmtObject(0),
-      inLastHandler(*this),
-      outLastHandler(*this),
-      inChain(inLastHandler),
-      outChain(outLastHandler)
+      mgmtObject(0)
 {
     Manageable* parent = broker.GetVhostObject ();
     if (parent != 0) {
@@ -75,9 +71,6 @@
 
 SessionState::~SessionState() {
     // Remove ID from active session list.
-    // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge,
-    // they don't belong in the manager. For now rely on uniqueness of UUIDs.
-    // 
     broker.getSessionManager().forget(getId());
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
@@ -126,7 +119,6 @@
     Mutex::ScopedLock l(lock);
     if (isAttached()) 
         getConnection().outputTasks.activateOutput();
-    // FIXME aconway 2008-05-22: should we hold the lock over activateOutput??
 }
 
 ManagementObject* SessionState::GetManagementObject (void) const
@@ -224,10 +216,7 @@
         getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));        
 }
 
-void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); }
-void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); }
-
-void SessionState::handleInLast(AMQFrame& frame) {
+void SessionState::handleIn(AMQFrame& frame) {
     SequenceNumber commandId = receiverGetCurrent();
     try {
         //TODO: make command handling more uniform, regardless of whether
@@ -258,7 +247,7 @@
     }
 }
 
-void SessionState::handleOutLast(AMQFrame& frame) {
+void SessionState::handleOut(AMQFrame& frame) {
     assert(handler);
     handler->out(frame);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Jul 16 17:03:50 2008
@@ -23,7 +23,6 @@
  */
 
 #include "qpid/SessionState.h"
-#include "qpid/HandlerChain.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/sys/Mutex.h"
@@ -102,10 +101,6 @@
 
     void readyToSend();
 
-    // Tag types to identify PluginHandlerChains. 
-    struct InTag {};
-    struct OutTag {};
-
   private:
 
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
@@ -131,11 +126,6 @@
     IncompleteMessageList incomplete;
     IncompleteMessageList::CompletionListener enqueuedOp;
     management::Session* mgmtObject;
-    framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler;
-    framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler;
-
-    qpid::PluginHandlerChain<framing::FrameHandler, InTag> inChain;
-    qpid::PluginHandlerChain<framing::FrameHandler, OutTag> outChain;
 
   friend class SessionManager;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jul 16 17:03:50 2008
@@ -17,15 +17,19 @@
  */
 
 #include "Cluster.h"
+#include "ConnectionInterceptor.h"
+
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterNotifyBody.h"
+#include "qpid/framing/ClusterConnectionCloseBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/memory.h"
+#include "qpid/shared_ptr.h"
 #include <boost/bind.hpp>
-#include <boost/scoped_array.hpp>
+#include <boost/cast.hpp>
 #include <algorithm>
 #include <iterator>
 #include <map>
@@ -37,24 +41,6 @@
 using namespace std;
 using broker::Connection;
 
-// Beginning of inbound chain: send to cluster.
-struct ClusterSendHandler : public HandlerChain<FrameHandler>::Handler {
-    Cluster::ConnectionChain& connection;
-    Cluster& cluster;
-    
-    ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {}
-
-    void handle(AMQFrame& f) {
-        // FIXME aconway 2008-01-29: Refcount Connections to ensure
-        // Connection not destroyed till message is self delivered.
-        cluster.send(f, &connection, next); // Indirectly send to next via cluster.
-    }
-};
-
-void Cluster::initialize(Cluster::ConnectionChain& cc) {
-    cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this)));
-}
-
 ostream& operator <<(ostream& out, const Cluster& cluster) {
     return out << cluster.name.str() << "-" << cluster.self;
 }
@@ -69,14 +55,14 @@
     return out;
 }
 
-// FIXME aconway 2008-07-02: create a Connection for the cluster.
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
-    broker(b),
     cpg(*this),
+    broker(&b),
     name(name_),
     url(url_),
     self(cpg.self())
 {
+    broker->addFinalizer(boost::bind(&Cluster::leave, this));
     QPID_LOG(trace, "Joining cluster: " << name_);
     cpg.join(name);
     notify();
@@ -90,15 +76,27 @@
 }
 
 Cluster::~Cluster() {
-    QPID_LOG(trace, *this << " Leaving cluster.");
-    try {
-        cpg.leave(name);
-        cpg.shutdown();
-        dispatcher.join();
-    }
-    catch (const std::exception& e) {
-        QPID_LOG(error, "Exception leaving cluster " << *this << ": "
-                 << e.what());
+    cpg.shutdown();
+    dispatcher.join();
+}
+
+// local connection initializes plugins
+void Cluster::initialize(broker::Connection& c) {
+    bool isLocal = &c.getOutput() != &shadowOut;
+    if (isLocal)
+        new ConnectionInterceptor(c, *this);
+}
+
+void Cluster::leave() {
+    if (!broker.get()) return;  // Already left
+    QPID_LOG(info, QPID_MSG("Leaving cluster " << *this));
+    // Must not be called in the dispatch thread.
+    assert(Thread::current().id() != dispatcher.id());
+    cpg.leave(name);
+    // Wait till final config-change is delivered and broker is released.
+    {
+        Mutex::ScopedLock l(lock);
+        while(broker.get()) lock.wait();
     }
 }
 
@@ -112,22 +110,20 @@
     buf.putLongLong(value);
 }
 
-void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) {
+void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
     QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
-    // TODO aconway 2008-07-03: More efficient buffer management.
+    // FIXME aconway 2008-07-03: More efficient buffer management.
     // Cache coded form of decoded frames for re-encoding?
     Buffer buf(buffer);
-    assert(frame.size() + 128 < sizeof(buffer));
+    assert(frame.size() + 64 < sizeof(buffer));
     frame.encode(buf);
     encodePtr(buf, connection);
-    encodePtr(buf, next);
     iovec iov = { buffer, buf.getPosition() };
     cpg.mcast(name, &iov, 1);
 }
 
 void Cluster::notify() {
-    AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
-    send(frame, 0, 0);
+    send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0);
 }
 
 size_t Cluster::size() const {
@@ -143,19 +139,17 @@
     return result;        
 }
 
-boost::shared_ptr<broker::Connection>
-Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) {
-    // FIXME aconway 2008-07-02: locking - called by deliver in
-    // cluster thread so no locks but may need to revisit as model
-    // changes.
-    ShadowConnectionId id(member, connectionPtr);
-    boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id];
-    if (!ptr) {
+ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
+    ShadowConnectionId id(member, remotePtr);
+    ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
+    if (i == shadowConnectionMap.end()) { // A new shadow connection.
         std::ostringstream os;
-        os << name << ":"  << member << ":" << std::hex << connectionPtr;
-        ptr.reset(new broker::Connection(&shadowOut, broker, os.str()));
+        os << name << ":"  << member << ":" << remotePtr;
+        broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str());
+        ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id));
+        i = shadowConnectionMap.insert(value).first;
     }
-    return ptr;
+    return i->second;
 }
 
 void Cluster::deliver(
@@ -171,78 +165,75 @@
         Buffer buf(static_cast<char*>(msg), msg_len);
         AMQFrame frame;
         frame.decode(buf);
-        void* connectionId;
-        decodePtr(buf, connectionId);
+        ConnectionInterceptor* connection;
+        decodePtr(buf, connection);
+        QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
+
+        if (!broker.get()) {
+            QPID_LOG(warning, "Ignoring late DLVR, already left the cluster.");
+            return;
+        }
 
-        QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame);
+        if (connection && from != self) // Look up shadow for remote connections
+            connection = getShadowConnection(from, connection);
 
-        if (connectionId == 0) // A cluster control frame.
-            handleClusterFrame(from, frame);
-        else if (from == self) { // My own frame, carries a next pointer.
-            FrameHandler* next; 
-            decodePtr(buf, next);
-            next->handle(frame);
-        }
-        else {                  // Foreign frame, forward to shadow connection.
-            // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr.
-            boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId);
-            shadow->received(frame);
-        }
+        if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) 
+            handleMethod(from, connection, *frame.getMethod());
+        else 
+            connection->deliver(frame);
     }
     catch (const std::exception& e) {
         // FIXME aconway 2008-01-30: exception handling.
-        QPID_LOG(error, "Error handling frame from cluster " << e.what());
+        QPID_LOG(critical, "Error in cluster delivery: " << e.what());
+        assert(0);
+        throw;
     }
 }
 
-bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
-                   Duration timeout) const
-{
-    AbsTime deadline(now(), timeout);
-    Mutex::ScopedLock l(lock);
-    while (!predicate(*this) && lock.wait(deadline))
-        ;
-    return (predicate(*this));
-}
-
-// Handle cluster control frame .
-void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
-    // TODO aconway 2007-06-20: use visitor pattern here.
-    ClusterNotifyBody* notifyIn=
-        dynamic_cast<ClusterNotifyBody*>(frame.getBody());
-    assert(notifyIn);
-    MemberList list;
-    {
-        Mutex::ScopedLock l(lock);
-        members[from].url=notifyIn->getUrl();
-        lock.notifyAll();
-        QPID_LOG(debug, "Cluster join: " << members);
+// Handle cluster methods
+// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
+void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {
+    assert(method.amqpClassId() == CLUSTER_CLASS_ID);
+    switch (method.amqpMethodId()) {
+      case CLUSTER_NOTIFY_METHOD_ID: {
+          ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method);
+          Mutex::ScopedLock l(lock);
+          members[from].url=notify.getUrl();
+          lock.notifyAll();
+          break;
+      }
+      case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
+          if (!connection->isLocal())
+              shadowConnectionMap.erase(connection->getShadowId());
+          connection->deliverClosed();
+          break;
+      }
+      default:
+        assert(0);
     }
 }
 
 void Cluster::configChange(
     cpg_handle_t /*handle*/,
     cpg_name */*group*/,
-    cpg_address */*current*/, int /*nCurrent*/,
+    cpg_address *current, int nCurrent,
     cpg_address *left, int nLeft,
-    cpg_address *joined, int nJoined)
+    cpg_address */*joined*/, int nJoined)
 {
-    bool newMembers=false;
-    MemberList updated;
-    {
-        Mutex::ScopedLock l(lock);
-        if (nLeft) {
-            for (int i = 0; i < nLeft; ++i) 
-                members.erase(Id(left[i]));
-            QPID_LOG(debug, "Cluster leave: " << members);
-            lock.notifyAll();
-        }
-        newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
-        // We don't record members joining here, we record them when
-        // we get their ClusterNotify message.
+    Mutex::ScopedLock l(lock);
+    for (int i = 0; i < nLeft; ++i)  
+        members.erase(left[i]);
+    for(int j = 0; j < nCurrent; ++j) 
+        members[current[j]].id = current[j];
+    QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):"
+             << members);
+    assert(members.size() == size_t(nCurrent));
+    if (members.find(self) == members.end()) {
+        QPID_LOG(debug, "Left cluster " << *this);
+        broker = 0;             // Release broker reference.
     }
-    if (newMembers)             // Notify new members of my presence.
-        notify();
+
+    lock.notifyAll();     // Threads waiting for membership changes.  
 }
 
 void Cluster::run() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jul 16 17:03:50 2008
@@ -22,14 +22,14 @@
 #include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/ShadowConnectionOutputHandler.h"
 
-#include "qpid/HandlerChain.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/log/Logger.h"
 #include "qpid/Url.h"
-
+#include "qpid/RefCounted.h"
 
 #include <boost/optional.hpp>
 #include <boost/function.hpp>
@@ -41,19 +41,21 @@
 namespace qpid {
 namespace cluster {
 
+class ConnectionInterceptor;
+
 /**
  * Connection to the cluster.
  * Keeps cluster membership data.
  */
-class Cluster : private sys::Runnable, private Cpg::Handler
+class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted
 {
   public:
-    typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
+    typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
 
     /** Details of a cluster member */
     struct Member {
-        Member(const Url& url_=Url()) : url(url_) {}
-        Url url;        ///< Broker address.
+        Cpg::Id  id;
+        Url url;
     };
     
     typedef std::vector<Member> MemberList;
@@ -65,11 +67,11 @@
      */
     Cluster(const std::string& name, const Url& url, broker::Broker&);
 
-    // Add cluster handlers to broker chains.
-    void initialize(ConnectionChain&);
-
     virtual ~Cluster();
 
+    /** Initialize interceptors for a new connection */
+    void initialize(broker::Connection&);
+    
     /** Get the current cluster membership. */
     MemberList getMembers() const;
 
@@ -78,22 +80,22 @@
 
     bool empty() const { return size() == 0; }
     
-    /** Wait for predicate(*this) to be true, up to timeout.
-     *@return True if predicate became true, false if timed out.
-     *Note the predicate may not be true after wait returns,
-     *all the caller can say is it was true at some earlier point.
-     */
-    bool wait(boost::function<bool(const Cluster&)> predicate,
-              sys::Duration timeout=sys::TIME_INFINITE) const;
-
     /** Send frame to the cluster */
-    void send(framing::AMQFrame&, void* connection, framing::FrameHandler*);
+    void send(const framing::AMQFrame&, ConnectionInterceptor*);
+
+    /** Leave the cluster */
+    void leave();
+    
+    // Cluster frame handing functions
+    void notify(const std::string& url);
+    void connectionClose();
     
   private:
     typedef Cpg::Id Id;
     typedef std::map<Id, Member>  MemberMap;
-    typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
-    typedef std::map<ShadowConnectionId, boost::shared_ptr<broker::Connection> > ShadowConnectionMap;
+    typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
+
+    boost::function<void()> shutdownNext;
     
     void notify();              ///< Notify cluster of my details.
 
@@ -114,19 +116,18 @@
     );
 
     void run();
-    
-    void handleClusterFrame(Id from, framing::AMQFrame&);
 
-    boost::shared_ptr<broker::Connection> getShadowConnection(const Cpg::Id&, void*);
+    void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
+
+    ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*);
 
-    mutable sys::Monitor lock;
-    broker::Broker& broker;
+    mutable sys::Monitor lock;  // Protect access to members.
     Cpg cpg;
+    boost::intrusive_ptr<broker::Broker> broker;
     Cpg::Name name;
     Url url;
     MemberMap members;
     sys::Thread dispatcher;
-    boost::function<void()> callback;
     Id self;
     ShadowConnectionMap shadowConnectionMap;
     ShadowConnectionOutputHandler shadowOut;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Jul 16 17:03:50 2008
@@ -15,8 +15,8 @@
  * limitations under the License.
  *
  */
-#include <boost/program_options/value_semantic.hpp>
 
+#include "ConnectionInterceptor.h"
 
 
 #include "qpid/broker/Broker.h"
@@ -25,61 +25,81 @@
 #include "qpid/Options.h"
 #include "qpid/shared_ptr.h"
 
-#include <boost/optional.hpp>
 #include <boost/utility/in_place_factory.hpp>
 
-
 namespace qpid {
 namespace cluster {
 
 using namespace std;
 
-struct ClusterOptions : public Options {
+struct ClusterValues {
     string name;
     string url;
 
-    ClusterOptions() : Options("Cluster Options") {
+    Url getUrl(uint16_t port) const {
+        if (url.empty()) return Url::getIpAddressesUrl(port);
+        return Url(url);
+    }
+};
+
+/** Note separating options from values to work around boost version differences.
+ *  Old boost takes a reference to options objects, but new boost makes a copy.
+ *  New boost allows a shared_ptr but that's not compatible with old boost.
+ */
+struct ClusterOptions : public Options {
+    ClusterValues& values; 
+
+    ClusterOptions(ClusterValues& v) : Options("Cluster Options"), values(v) {
         addOptions()
-            ("cluster-name", optValue(name, "NAME"), "Name of cluster to join")
-            ("cluster-url", optValue(url,"URL"),
+            ("cluster-name", optValue(values.name, "NAME"), "Name of cluster to join")
+            ("cluster-url", optValue(values.url,"URL"),
              "URL of this broker, advertized to the cluster.\n"
              "Defaults to a URL listing all the local IP addresses\n")
             ;
     }
-
-    Url getUrl(uint16_t port) const {
-        if (url.empty()) return Url::getIpAddressesUrl(port);
-        return Url(url);
-    }
 };
 
 struct ClusterPlugin : public Plugin {
-    typedef PluginHandlerChain<framing::FrameHandler, broker::Connection> ConnectionChain;
 
+    ClusterValues values;
     ClusterOptions options;
-    boost::optional<Cluster> cluster;
+    boost::intrusive_ptr<Cluster> cluster;
+
+    ClusterPlugin() : options(values) {}
+
+    Options* getOptions() { return &options; }
 
-    template <class Chain> void init(Plugin::Target& t) {
-        Chain* c = dynamic_cast<Chain*>(&t);
-        if (c) cluster->initialize(*c);
+    void init(broker::Broker& b) {
+        if (values.name.empty()) return;  // Only if --cluster-name option was specified.
+        if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process.");
+        cluster = new Cluster(values.name, values.getUrl(b.getPort()), b);
+        b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
+    }
+
+    template <class T> void init(T& t) {
+        if (cluster) cluster->initialize(t);
+    }
+    
+    template <class T> bool init(Plugin::Target& target) {
+        T* t = dynamic_cast<T*>(&target);
+        if (t) init(*t);
+        return t;
     }
 
     void earlyInitialize(Plugin::Target&) {}
 
     void initialize(Plugin::Target& target) {
-        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
-        if (broker && !options.name.empty()) {
-            if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process.");
-            cluster = boost::in_place(options.name,
-                                      options.getUrl(broker->getPort()),
-                                      boost::ref(*broker));
-            return;
-        }
-        if (!cluster) return;   // Ignore chain handlers if we didn't init a cluster.
-        init<ConnectionChain>(target);
+        if (init<broker::Broker>(target)) return;
+        if (!cluster) return;   // Remaining plugins only valid if cluster initialized.
+        if (init<broker::Connection>(target)) return;
     }
+
+    void shutdown() { cluster = 0; }
 };
 
 static ClusterPlugin instance; // Static initialization.
+
+// For test purposes.
+boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; }
     
 }} // namespace qpid::cluster

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp?rev=677471&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp Wed Jul 16 17:03:50 2008
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionInterceptor.h"
+#include "qpid/framing/ClusterConnectionCloseBody.h"
+#include "qpid/framing/AMQFrame.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+template <class T, class U, class V> void shift(T& a, U& b, const V& c) { a = b; b = c; }
+
+ConnectionInterceptor::ConnectionInterceptor(
+    broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_)
+    : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_)
+{
+    connection->addFinalizer(boost::bind(operator delete, this));
+    // Attach  my functions to Connection extension points.
+    shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1));
+    shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this));
+}
+
+ConnectionInterceptor::~ConnectionInterceptor() {
+    assert(isClosed);
+    assert(connection == 0);
+}
+
+void ConnectionInterceptor::received(framing::AMQFrame& f) {
+    if (isClosed) return;
+    cluster.send(f, this);
+}
+
+void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
+    receivedNext(f);
+}
+
+void ConnectionInterceptor::closed() {
+    if (isClosed) return;
+    try {
+        // Called when the local network connection is closed. We still
+        // need to process any outstanding cluster frames for this
+        // connection to ensure our sessions are up-to-date. We defer
+        // closing the Connection object till deliverClosed(), but replace
+        // its output handler with a null handler since the network output
+        // handler will be deleted.
+        // 
+        connection->setOutputHandler(&discardHandler); 
+        cluster.send(AMQFrame(in_place<ClusterConnectionCloseBody>()), this);
+        isClosed = true;
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
+    }
+}
+
+void ConnectionInterceptor::deliverClosed() {
+    closedNext();
+    // Drop reference so connection will be deleted, which in turn
+    // will delete this via finalizer added in ctor.
+    connection = 0;             
+}
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h?rev=677471&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h Wed Jul 16 17:03:50 2008
@@ -0,0 +1,77 @@
+#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H
+#define QPID_CLUSTER_CONNECTIONPLUGIN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cluster.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+
+namespace qpid {
+namespace framing { class AMQFrame; }
+namespace cluster {
+
+/**
+ * Plug-in associated with broker::Connections, both local and shadow.
+ */
+class ConnectionInterceptor {
+  public:
+    ConnectionInterceptor(broker::Connection&, Cluster&,
+                          Cluster::ShadowConnectionId shadowId=Cluster::ShadowConnectionId(0,0));
+    ~ConnectionInterceptor();
+
+    // Called on self-delivery
+    void deliver(framing::AMQFrame& f);
+
+    // Called on self-delivery of my own cluster.connection-close
+    void deliverClosed();
+
+    Cluster::ShadowConnectionId getShadowId() const { return shadowId; }
+
+    bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); }
+    
+  private:
+    struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
+        void close() {}
+        void send(framing::AMQFrame&) {}
+        void doOutput() {}
+        void activateOutput() {}
+    };
+    
+    // Functions to add to Connection extension points.
+    void received(framing::AMQFrame&);
+    void closed();
+
+    boost::function<void(framing::AMQFrame&)> receivedNext;
+    boost::function<void()> closedNext;
+
+    boost::intrusive_ptr<broker::Connection> connection;
+    Cluster& cluster;
+    NullConnectionHandler discardHandler;
+    bool isClosed;
+    Cluster::ShadowConnectionId shadowId;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Wed Jul 16 17:03:50 2008
@@ -62,7 +62,7 @@
     cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
 }
 
-Cpg::Cpg(Handler& h) : handler(h) {
+Cpg::Cpg(Handler& h) : handler(h), isShutdown(false) {
     cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
     check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
     check(cpg_context_set(handle, this), "Cannot set CPG context");
@@ -78,10 +78,10 @@
 }
 
 void Cpg::shutdown() {
-    if (handle) {
-        cpg_context_set(handle, 0);
+    if (!isShutdown) {
+        QPID_LOG(debug,"Shutting down CPG");
+        isShutdown=true;
         check(cpg_finalize(handle), "Error in shutdown of CPG");
-        handle = 0;
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Wed Jul 16 17:03:50 2008
@@ -165,6 +165,7 @@
 
     cpg_handle_t handle;
     Handler& handler;
+    bool isShutdown;
 };
 
 std::ostream& operator <<(std::ostream& out, const cpg_name& name);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h Wed Jul 16 17:03:50 2008
@@ -66,7 +66,7 @@
         MemFunRef(X& x, Handler<T>* next=0) : Handler(next), target(&x) {}
         void handle(T t) { (target->*F)(t); }
 
-        /** Allow calling with -> syntax, like a qpid::HandlerChain */
+        /** Allow calling with -> syntax */
         MemFunRef* operator->() { return this; }
 
       private:

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Wed Jul 16 17:03:50 2008
@@ -1,5 +1,5 @@
 #ifndef TESTS_FORKEDBROKER_H
-#define TESTS_FORKEDBROKER_H
+
 
 /*
  *
@@ -23,16 +23,11 @@
  */
 
 #include "qpid/Exception.h"
-#include "qpid/sys/Fork.h"
-#include "qpid/log/Logger.h"
+#include "qpid/log/Statement.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/SignalHandler.h"
-
 #include <boost/lexical_cast.hpp>
-
 #include <string>
-
-#include <signal.h>
+#include <stdio.h>
 #include <sys/wait.h>
 
 /**
@@ -48,63 +43,66 @@
  * process.)
  * 
  */
-class ForkedBroker : public qpid::sys::ForkWithMessage {
-    pid_t pid;
-    uint16_t port;
-    qpid::broker::Broker::Options opts;
-    std::string prefix;
-
+class ForkedBroker {
   public:
-    struct ChildExit {};   // Thrown in child processes.
+    ForkedBroker(std::vector<const char*> argv) { init(argv); }
 
-    ForkedBroker(const qpid::broker::Broker::Options& opts_=qpid::broker::Broker::Options(),
-                 const std::string& prefix_=std::string())
-        : pid(0), port(0), opts(opts_), prefix(prefix_) { fork(); } 
+    ForkedBroker(int argc, const char* const argv[]) {
+        std::vector<const char*> args(argv, argv+argc);
+        init(args);
+    }
 
     ~ForkedBroker() {
-        try { stop(); }
-        catch(const std::exception& e) {
-            QPID_LOG(error, e.what());
+        try { stop(); } catch(const std::exception& e) {
+            QPID_LOG(error, QPID_MSG("Stopping forked broker: " << e.what()));
         }
     }
 
     void stop() {
-        if (pid > 0) {     // I am the parent, clean up children.
-            if (::kill(pid, SIGINT) < 0)
-                throw qpid::Exception(QPID_MSG("Can't kill process " << pid << ": " << qpid::strError(errno)));
-            int status = 0;
-            if (::waitpid(pid, &status, 0) < 0)
-                throw qpid::Exception(QPID_MSG("Waiting for process " << pid << ": " << qpid::strError(errno)));
-            if (WEXITSTATUS(status) != 0)
-                throw qpid::Exception(QPID_MSG("Process " << pid << " exited with status: " << WEXITSTATUS(status)));
-        }
+        using qpid::ErrnoException;
+        if (pid == 0) return;
+        if (::kill(pid, SIGINT) < 0) throw ErrnoException("kill failed");
+        int status;
+        if (::waitpid(pid, &status, 0) < 0) throw ErrnoException("wait for forked process failed");
+        if (WEXITSTATUS(status) != 0)
+            throw qpid::Exception(QPID_MSG("forked broker exited with: " << WEXITSTATUS(status)));
+        pid = 0;
     }
 
-    void parent(pid_t pid_) {
-        pid = pid_;
-        qpid::log::Logger::instance().setPrefix("parent");
-        std::string portStr = wait(5);
-        port = boost::lexical_cast<uint16_t>(portStr);
-    }
+    uint16_t getPort() { return port; }
 
-    void child() {
-        prefix += boost::lexical_cast<std::string>(long(getpid()));
-        qpid::log::Logger::instance().setPrefix(prefix);
-        opts.port = 0;
-        boost::intrusive_ptr<qpid::broker::Broker> broker(new qpid::broker::Broker(opts));
-        qpid::broker::SignalHandler::setBroker(broker);
-        QPID_LOG(info, "ForkedBroker started on " << broker->getPort());
-        ready(boost::lexical_cast<std::string>(broker->getPort())); // Notify parent.
-        broker->run();
-        QPID_LOG(notice, "ForkedBroker exiting.");
-
-        // Force exit in the child process, otherwise we will try to
-        // carry with parent tests. 
-        broker = 0;             // Run broker dtor before we exit.
-        exit(0);
+  private:
+
+    void init(const std::vector<const char*>& args) {
+        using qpid::ErrnoException;
+        pid = 0;
+        port = 0;
+        int pipeFds[2];
+        if(::pipe(pipeFds) < 0) throw ErrnoException("Can't create pipe");
+        pid = ::fork();
+        if (pid < 0) throw ErrnoException("Fork failed");
+        if (pid) {              // parent
+            ::close(pipeFds[1]);
+            FILE* f = ::fdopen(pipeFds[0], "r");
+            if (!f) throw ErrnoException("fopen failed");
+            if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("fscanf failed");
+        }
+        else {                  // child
+            ::close(pipeFds[0]);
+            int fd = ::dup2(pipeFds[1], 1);
+            if (fd < 0) throw ErrnoException("dup2 failed");
+            const char* prog = "../qpidd";
+            std::vector<const char*> args2(args);
+            args2.push_back("--port=0");
+            args2.push_back("--mgmt-enable=no"); // TODO aconway 2008-07-16: why does mgmt cause problems?
+            args2.push_back(0);
+            execv(prog, const_cast<char* const*>(&args2[0]));
+            throw ErrnoException("execv failed");
+        }
     }
 
-    uint16_t getPort() { return port; }
+    pid_t pid;
+    int port;
 };
 
 #endif  /*!TESTS_FORKEDBROKER_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Wed Jul 16 17:03:50 2008
@@ -10,9 +10,8 @@
 # 
 
 
-# FIXME aconway 2008-07-04: disabled till process leak is plugged.
-# ais_check checks conditions for cluster tests and run them if ok.
-#TESTS+=ais_check
+# ais_check checks pre-requisites for cluster tests and runs them if ok.
+TESTS+=ais_check
 EXTRA_DIST+=ais_check
 
 check_PROGRAMS+=cluster_test

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Jul 16 17:03:50 2008
@@ -16,13 +16,13 @@
  *
  */
 
-
 #include "test_tools.h"
 #include "unit_test.h"
 #include "ForkedBroker.h"
 #include "BrokerFixture.h"
 
 #include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/Cluster.h"
 #include "qpid/framing/AMQBody.h"
 #include "qpid/client/Connection.h"
 #include "qpid/client/Session.h"
@@ -37,10 +37,13 @@
 #include <vector>
 #include <algorithm>
 
-#include <signal.h>
+namespace qpid {
+namespace cluster {
+boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
+}} // namespace qpid::cluster
 
-QPID_AUTO_TEST_SUITE(CpgTestSuite)
 
+QPID_AUTO_TEST_SUITE(CpgTestSuite)
 
 using namespace std;
 using namespace qpid;
@@ -49,27 +52,60 @@
 using namespace qpid::client;
 using qpid::broker::Broker;
 using boost::ptr_vector;
+using qpid::cluster::Cluster;
+using qpid::cluster::getGlobalCluster;
 
-struct ClusterFixture : public ptr_vector<ForkedBroker> {
+/** Cluster fixture is a vector of ports for the replicas.
+ * Replica 0 is in the current process, all others are forked as children.
+ */
+struct ClusterFixture : public vector<uint16_t>  {
     string name;
+    Broker::Options opts;
+    std::auto_ptr<BrokerFixture> broker0;
+    boost::ptr_vector<ForkedBroker> forkedBrokers;
 
-    ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); }
+    ClusterFixture(size_t n);
     void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
     void add();
+    void setup();
 };
 
+ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) {
+    add(n);
+    // Wait for all n members to join the cluster
+    int retry=20;            // TODO aconway 2008-07-16: nasty sleeps, clean this up.
+    while (retry && getGlobalCluster()->size() != n) {
+        ::sleep(1);
+        --retry;
+    }
+    BOOST_CHECK_EQUAL(n, getGlobalCluster()->size());
+}
+
 void ClusterFixture::add() {
-    broker::Broker::Options opts;
-    Plugin::addOptions(opts); // For cluster options.
+    std::ostringstream os;
+    os << "broker" << size();
+    std::string prefix = os.str();
+
     const char* argv[] = {
-        "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir"
+        "qpidd " __FILE__ ,
+        "--load-module=../.libs/libqpidcluster.so",
+        "--cluster-name", name.c_str(), 
+        "--auth=no", "--no-data-dir",
+        "--log-prefix", prefix.c_str(),
     };
-    opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
-    ostringstream prefix;
-    prefix << "b" << size() << "-";
-    QPID_LOG(info, "ClusterFixture adding broker " << prefix.str());
-    push_back(new ForkedBroker(opts, prefix.str()));
-    QPID_LOG(info, "ClusterFixture added broker " << prefix.str());
+    size_t argc = sizeof(argv)/sizeof(argv[0]);
+
+    if (size())  {              // Not the first broker, fork.
+        forkedBrokers.push_back(new ForkedBroker(argc, argv));
+        push_back(forkedBrokers.back().getPort());
+    }
+    else {                      // First broker, run in this process.
+        Broker::Options opts;
+        Plugin::addOptions(opts); // Pick up cluster options.
+        opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
+        broker0.reset(new BrokerFixture(opts));
+        push_back(broker0->getPort());
+    }
 }
 
 // For debugging: op << for CPG types.
@@ -149,26 +185,25 @@
 
 QPID_AUTO_TEST_CASE(testForkedBroker) {
     // Verify the ForkedBroker works as expected.
-    Broker::Options opts;
-    opts.auth="no";
-    opts.noDataDir=true;
-    ForkedBroker broker(opts);
+    const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
+    ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv);
     Client c(broker.getPort());
     BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); 
 }
 
 QPID_AUTO_TEST_CASE(testWiringReplication) {
-    ClusterFixture cluster(2);  // FIXME aconway 2008-07-02: 3 brokers
-    Client c0(cluster[0].getPort());
+    ClusterFixture cluster(3);
+    Client c0(cluster[0]);
     BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
     BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); 
     c0.session.queueDeclare("q");
     c0.session.exchangeDeclare("ex", arg::type="direct");
     c0.session.close();
+    c0.connection.close();
     // Verify all brokers get wiring update.
     for (size_t i = 0; i < cluster.size(); ++i) {
         BOOST_MESSAGE("i == "<< i);
-        Client c(cluster[i].getPort());
+        Client c(cluster[i]);
         BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
         BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
     }    
@@ -177,12 +212,12 @@
 QPID_AUTO_TEST_CASE(testMessageEnqueue) {
     // Enqueue on one broker, dequeue on another.
     ClusterFixture cluster(2);
-    Client c0(cluster[0].getPort());
+    Client c0(cluster[0]);
     c0.session.queueDeclare("q");
     c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
     c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
     c0.session.close();
-    Client c1(cluster[1].getPort());
+    Client c1(cluster[1]);
     Message msg;
     BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
     BOOST_CHECK_EQUAL(string("foo"), msg.getData());
@@ -190,10 +225,14 @@
     BOOST_CHECK_EQUAL(string("bar"), msg.getData());
 }
 
+#if 0
+
+// FIXME aconway 2008-07-16: Implement cluster dequeue notification, enable this test.
+
 QPID_AUTO_TEST_CASE(testMessageDequeue) {
     // Enqueue on one broker, dequeue on two others.
     ClusterFixture cluster (3);
-    Client c0(cluster[0].getPort());
+    Client c0(cluster[0]);
     c0.session.queueDeclare("q");
     c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
     c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
@@ -201,11 +240,11 @@
 
     Message msg;
 
-    Client c1(cluster[1].getPort());
+    Client c1(cluster[1]);
     BOOST_CHECK(c1.subs.get(msg, "q"));
     BOOST_CHECK_EQUAL("foo", msg.getData());
     
-    Client c2(cluster[2].getPort());
+    Client c2(cluster[2]);
     BOOST_CHECK(c1.subs.get(msg, "q"));
     BOOST_CHECK_EQUAL("bar", msg.getData());
     QueueQueryResult r = c2.session.queueQuery("q");
@@ -214,4 +253,6 @@
 
 // TODO aconway 2008-06-25: failover.
 
+#endif
+
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/run_test
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/run_test?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/run_test (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/run_test Wed Jul 16 17:03:50 2008
@@ -38,9 +38,11 @@
 --demangle=yes
 --suppressions=$srcdir/.valgrind.supp
 --num-callers=25
---trace-children=yes
 --log-file=$VG_LOG --
 "
+# FIXME aconway 2008-07-16: removed --trace-children=yes, problems with cluster tests forking
+# qpidd libtool script. Investigate & restore --trace-children if possible.
+
 ERROR=0
 if grep -l "^# Generated by .*libtool" "$1" >/dev/null 2>&1; then
     # This is a libtool "executable". Valgrind it if VALGRIND specified.

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=677471&r1=677470&r2=677471&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Wed Jul 16 17:03:50 2008
@@ -22,12 +22,14 @@
 
 <amqp major="0" minor="10" port="5672">
 
-  <class name = "cluster" code = "0x80" label="clustering extensions">
+  <class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
     <doc>Qpid extension class to allow clustered brokers to communicate.</doc>
-    <control name = "notify" code="0x1" label="notify cluster of a new member">
-      <doc>Notify the cluster of a member URL</doc>
-      <!-- No chassis element, this is handled by separte cluster code for now.-->
+
+    <control name = "notify" code="0x1">
       <field name="url" type="str16" />
     </control>
+
+    <control name="connection-close" code="0x2">
+    </control>
   </class>
 </amqp>