You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/29 20:18:46 UTC

svn commit: r690358 - in /incubator/qpid/trunk/qpid/cpp: src/ src/qpid/ src/qpid/amqp_0_10/ src/qpid/broker/ src/qpid/cluster/ src/qpid/sys/ src/tests/ xml/

Author: aconway
Date: Fri Aug 29 11:18:45 2008
New Revision: 690358

URL: http://svn.apache.org/viewvc?rev=690358&view=rev
Log:
Refactored cluster to intercept at ConnectionCode, using sys:: interfaces rather than boost functions.
Use framing::Operations and Invoker to dispatch cluster methods.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Aug 29 11:18:45 2008
@@ -6,24 +6,26 @@
 if CPG
 
 libqpidcluster_la_SOURCES = \
+  qpid/cluster/types.h \
   qpid/cluster/Cluster.cpp \
   qpid/cluster/Cluster.h \
   qpid/cluster/Cpg.cpp \
   qpid/cluster/Cpg.h \
   qpid/cluster/Dispatchable.h \
   qpid/cluster/ClusterPlugin.cpp \
-  qpid/cluster/ConnectionInterceptor.h \
-  qpid/cluster/ConnectionInterceptor.cpp \
-  qpid/cluster/ClassifierHandler.h \
-  qpid/cluster/ClassifierHandler.cpp \
-  qpid/cluster/ShadowConnectionOutputHandler.h \
+  qpid/cluster/ConnectionCodec.h \
+  qpid/cluster/ConnectionCodec.cpp \
+  qpid/cluster/Connection.h \
+  qpid/cluster/Connection.cpp \
+  qpid/cluster/NoOpConnectionOutputHandler.h \
   qpid/cluster/PollableCondition.h \
   qpid/cluster/PollableCondition.cpp \
   qpid/cluster/PollableQueue.h \
   qpid/cluster/WriteEstimate.h \
   qpid/cluster/WriteEstimate.cpp \
   qpid/cluster/OutputInterceptor.h \
-  qpid/cluster/OutputInterceptor.cpp
+  qpid/cluster/OutputInterceptor.cpp \
+  qpid/cluster/ProxyInputHandler.h
 
 libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h Fri Aug 29 11:18:45 2008
@@ -76,7 +76,9 @@
     /** Parse url, throw InvalidUrl if invalid. */
     explicit Url(const char* url) { parse(url); }
 
-    template<class T> Url& operator=(T s) { parse(s); return *this; }
+    Url& operator=(const Url& u) { this->std::vector<Address>::operator=(u); cache=u.cache; return *this; }
+    Url& operator=(const char* s) { parse(s); return *this; }
+    Url& operator=(const std::string& s) { parse(s); return *this; }
     
     /** Throw InvalidUrl if the URL does not contain any addresses. */
     void throwIfEmpty() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Fri Aug 29 11:18:45 2008
@@ -21,16 +21,22 @@
 #include "Connection.h"
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/exceptions.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/ProtocolInitiation.h"
 
 namespace qpid {
 namespace amqp_0_10 {
 
 using sys::Mutex;
 
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
-    : frameQueueClosed(false), output(o),
-      connection(new broker::Connection(this, broker, id, _isClient)),
-      identifier(id), initialized(false), isClient(_isClient), buffered(0) {}
+Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
+    : frameQueueClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0)
+{}
+
+void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) {
+    connection = c;
+}
 
 size_t  Connection::decode(const char* buffer, size_t size) {
     framing::Buffer in(const_cast<char*>(buffer), size);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Fri Aug 29 11:18:45 2008
@@ -22,15 +22,19 @@
  *
  */
 #include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/broker/Connection.h"
 #include <boost/intrusive_ptr.hpp>
-#include <deque>
 #include <memory>
+#include <deque>
 
 namespace qpid {
-namespace broker { class Broker; }
+
+namespace sys {
+class ConnectionInputHandlerFactory;
+}
+
 namespace amqp_0_10 {
 
 class Connection  : public sys::ConnectionCodec,
@@ -42,14 +46,15 @@
     bool frameQueueClosed;
     mutable sys::Mutex frameQueueLock;
     sys::OutputControl& output;
-    boost::intrusive_ptr<broker::Connection> connection;
+    std::auto_ptr<sys::ConnectionInputHandler> connection;
     std::string identifier;
     bool initialized;
     bool isClient;
     size_t buffered;
 
   public:
-    Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
+    Connection(sys::OutputControl&, const std::string& id, bool isClient);
+    void setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c);
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);
     bool isClosed() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Aug 29 11:18:45 2008
@@ -133,7 +133,7 @@
 	acl(0),
     dataDir(conf.noDataDir ? std::string () : conf.dataDir),
     links(this),
-    factory(*this),
+    factory(new ConnectionFactory(*this)),
     sessionManager(
         qpid::SessionState::Configuration(
             conf.replayFlushLimit*1024, // convert kb to bytes.
@@ -372,7 +372,7 @@
 // TODO: This should iterate over all protocolFactories
 void Broker::accept() {
     for (unsigned int i = 0; i < protocolFactories.size(); ++i)
-        protocolFactories[i]->accept(poller, &factory);
+        protocolFactories[i]->accept(poller, factory.get());
 }
 
 
@@ -382,7 +382,7 @@
     boost::function2<void, int, std::string> failed,
     sys::ConnectionCodec::Factory* f)
 {
-    getProtocolFactory()->connect(poller, host, port, f ? f : &factory, failed);
+    getProtocolFactory()->connect(poller, host, port, f ? f : factory.get(), failed);
 }
 
 void Broker::connect(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Aug 29 11:18:45 2008
@@ -103,7 +103,7 @@
     QueueRegistry queues;
     ExchangeRegistry exchanges;
     LinkRegistry links;
-    ConnectionFactory factory;
+    boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
     DtxManager dtxManager;
     SessionManager sessionManager;
     management::ManagementAgent* managementAgent;
@@ -178,7 +178,10 @@
     boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
 
     /** Expose poller so plugins can register their descriptors. */
-    boost::shared_ptr<sys::Poller> getPoller(); 
+    boost::shared_ptr<sys::Poller> getPoller();
+
+    boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
+    void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Aug 29 11:18:45 2008
@@ -49,9 +49,6 @@
 
 Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
     ConnectionState(out_, broker_),
-    receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
-    closedFn(boost::bind(&Connection::closedImpl, this)),
-    doOutputFn(boost::bind(&Connection::doOutputImpl, this)),
     adapter(*this, isLink_),
     isLink(isLink_),
     mgmtClosing(false),
@@ -72,8 +69,6 @@
             mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink);
         agent->addObject(mgmtObject);
     }
-
-    Plugin::initializeAll(*this); // Let plug-ins update extension points.
 }
 
 void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -90,9 +85,7 @@
         links.notifyClosed(mgmtId);
 }
 
-void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); }
-
-void Connection::receivedImpl(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame) {
     if (frame.getChannel() == 0 && frame.getMethod()) {
         adapter.handle(frame);
     } else {
@@ -172,9 +165,7 @@
 
 void Connection::idleIn(){}
 
-void Connection::closed() { closedFn(); }
-
-void Connection::closedImpl(){ // Physically closed, suspend open sessions.
+void Connection::closed(){ // Physically closed, suspend open sessions.
     try {
         while (!channels.empty()) 
             ptr_map_ptr(channels.begin())->handleDetach();
@@ -194,9 +185,7 @@
 
 bool Connection::hasOutput() { return outputTasks.hasOutput(); }
 
-bool Connection::doOutput() { return doOutputFn(); }
-
-bool Connection::doOutputImpl() {    
+bool Connection::doOutput() {    
     try{
         if (ioCallback)
             ioCallback(); // Lend the IO thread for management processing

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Aug 29 11:18:45 2008
@@ -44,7 +44,6 @@
 #include "SessionHandler.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/Connection.h"
-#include "qpid/Plugin.h"
 #include "qpid/RefCounted.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
@@ -56,7 +55,6 @@
 
 class Connection : public sys::ConnectionInputHandler, 
                    public ConnectionState,
-                   public Plugin::Target,
                    public RefCounted
 {
   public:
@@ -95,19 +93,10 @@
     void notifyConnectionForced(const std::string& text);
     void setUserId(const string& uid);
 
-    // Extension points: allow plugins to insert additional functionality.
-    boost::function<void(framing::AMQFrame&)> receivedFn;
-    boost::function<void ()> closedFn;
-    boost::function<bool ()> doOutputFn;
-
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
 
-    void receivedImpl(framing::AMQFrame& frame);
-    void closedImpl();
-    bool doOutputImpl();
-
     ChannelMap channels;
     framing::AMQP_ClientProxy::Connection* client;
     ConnectionHandler adapter;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Fri Aug 29 11:18:45 2008
@@ -21,11 +21,14 @@
 #include "ConnectionFactory.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/amqp_0_10/Connection.h"
+#include "qpid/broker/Connection.h"
 
 namespace qpid {
 namespace broker {
 
 using framing::ProtocolVersion;
+typedef std::auto_ptr<amqp_0_10::Connection> ConnectionPtr;
+typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
 
 ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {}
 
@@ -33,15 +36,21 @@
 
 sys::ConnectionCodec*
 ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
-    if (v == ProtocolVersion(0, 10))
-        return new amqp_0_10::Connection(out, broker, id);
+    if (v == ProtocolVersion(0, 10)) {
+        ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
+        c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false)));
+        return c.release();
+    }
     return 0;
 }
 
 sys::ConnectionCodec*
 ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
     // used to create connections from one broker to another
-    return new amqp_0_10::Connection(out, broker, id, true);
+    ConnectionPtr c(new amqp_0_10::Connection(out, id, true));
+    c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, true)));
+    return c.release();
 }
 
+    
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h Fri Aug 29 11:18:45 2008
@@ -27,7 +27,8 @@
 namespace broker {
 class Broker;
 
-class ConnectionFactory : public sys::ConnectionCodec::Factory {
+class ConnectionFactory : public sys::ConnectionCodec::Factory
+{
   public:
     ConnectionFactory(Broker& b);
             

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Aug 29 11:18:45 2008
@@ -4,7 +4,7 @@
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+n * You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -17,18 +17,17 @@
  */
 
 #include "Cluster.h"
-#include "ConnectionInterceptor.h"
+#include "Connection.h"
 
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/SessionState.h"
-#include "qpid/broker/Connection.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ClusterNotifyBody.h"
-#include "qpid/framing/ClusterConnectionCloseBody.h"
-#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "qpid/framing/ClusterJoinedBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Invoker.h"
 
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
@@ -39,22 +38,34 @@
 
 namespace qpid {
 namespace cluster {
+
 using namespace qpid::framing;
 using namespace qpid::sys;
 using namespace std;
-using broker::Connection;
 
+// Handle cluster controls from a given member.
+struct ClusterOperations :  public framing::AMQP_AllOperations::ClusterHandler {
+    Cluster& cluster;
+    MemberId member;
+    
+    ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {}
+
+    void joined(const std::string& url) {
+        cluster.joined(member, url);
+    }
+};
+    
 ostream& operator <<(ostream& out, const Cluster& cluster) {
     return out << cluster.name.str() << "-" << cluster.self;
 }
 
-ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
-    return out << m.first << "=" << m.second.url;
+ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) {
+    return out << m.first << " at " << m.second;
 }
 
-ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
-    ostream_iterator<Cluster::MemberMap::value_type> o(out, " ");
-    copy(members.begin(), members.end(), o);
+ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) {
+    ostream_iterator<Cluster::UrlMap::value_type> o(out, " ");
+    copy(urls.begin(), urls.end(), o);
     return out;
 }
 
@@ -74,9 +85,9 @@
     mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
 {
     broker->addFinalizer(boost::bind(&Cluster::leave, this));
-    QPID_LOG(trace, "Joining cluster: " << name_);
+    QPID_LOG(trace, "Node " << self << " joining cluster: " << name_);
     cpg.join(name);
-    notify();
+    send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0));
 
     // Start dispatching from the poller.
     cpgDispatchHandle.startWatch(poller);
@@ -84,31 +95,15 @@
     mcastQueue.start(poller);
 }
 
-Cluster::~Cluster() {
-    for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin();
-         i != shadowConnectionMap.end();
-         ++i)
-    {
-        i->second->dirtyClose(); 
-    }
-    std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
-}
-
-void Cluster::initialize(broker::Connection& c) {
-    bool isLocal = c.getOutput().get() != &shadowOut;
-    if (isLocal)
-        localConnectionSet.insert(new ConnectionInterceptor(c, *this));
-}
+Cluster::~Cluster() {}
 
 void Cluster::leave() {
     Mutex::ScopedLock l(lock);
     if (!broker) return;                               // Already left.
     // Leave is called by from Broker destructor after the poller has
     // been shut down. No dispatches can occur.
-    
-    QPID_LOG(debug, "Leaving cluster " << *this);
     cpg.leave(name);
-    // broker= is set to 0 when the final config-change is delivered.
+    // broker is set to 0 when the final config-change is delivered.
     while(broker) {
         Mutex::ScopedUnlock u(lock);
         cpg.dispatchAll();
@@ -126,9 +121,9 @@
     buf.putLongLong(value);
 }
 
-void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
-    QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
-    mcastQueue.push(Message(frame, self, connection));
+void Cluster::send(const AMQFrame& frame, const ConnectionId& id) {
+    QPID_LOG(trace, "MCAST [" << id << "] " << frame);
+    mcastQueue.push(Message(frame, id));
 }
 
 void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
@@ -137,48 +132,40 @@
     // Static is OK because there is only one cluster allowed per
     // process and only one thread in mcastQueueCb at a time.
     static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
-    MessageQueue::iterator i = begin;
-    while (i != end) {
-        Buffer buf(buffer, sizeof(buffer));
-        while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) {
-            i->frame.encode(buf);
-            encodePtr(buf, i->connection);
-            ++i;
-        }
-        iovec iov = { buffer, buf.getPosition() };
-        cpg.mcast(name, &iov, 1);
+    Buffer buf(buffer, sizeof(buffer));
+    for (MessageQueue::iterator i = begin; i != end; ++i) {
+        AMQFrame& frame =i->first;
+        ConnectionId id =i->second;
+        if (buf.available() < frame.size() + sizeof(uint64_t))
+            break;
+        frame.encode(buf);
+        encodePtr(buf, id.second);
     }
-}
-
-void Cluster::notify() {
-    send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0);
+    iovec iov = { buffer, buf.getPosition() };
+    cpg.mcast(name, &iov, 1);
 }
 
 size_t Cluster::size() const {
     Mutex::ScopedLock l(lock);
-    return members.size();
+    return urls.size();
 }
 
-Cluster::MemberList Cluster::getMembers() const {
+std::vector<Url> Cluster::getUrls() const {
     Mutex::ScopedLock l(lock);
-    MemberList result(members.size());
-    std::transform(members.begin(), members.end(), result.begin(),
-                   boost::bind(&MemberMap::value_type::second, _1));
+    std::vector<Url> result(urls.size());
+    std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1));
     return result;        
 }
 
-ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
-    ShadowConnectionId id(member, remotePtr);
-    ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
-    if (i == shadowConnectionMap.end()) { // A new shadow connection.
+boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
+    boost::intrusive_ptr<Connection> c = connections[id];
+    if (!c && id.first != self) { // Shadow connection
         std::ostringstream os;
-        os << name << ":"  << member << ":" << remotePtr;
-        assert(broker);
-        broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str());
-        ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id));
-        i = shadowConnectionMap.insert(value).first;
+        os << id;
+        c = connections[id] = new Connection(*this, shadowOut, os.str(), id);
     }
-    return i->second;
+    assert(c);
+    return c;
 }
 
 void Cluster::deliver(
@@ -189,16 +176,17 @@
     void* msg,
     int msg_len)
 {
-    Id from(nodeid, pid);
+    MemberId from(nodeid, pid);
     try {
         Buffer buf(static_cast<char*>(msg), msg_len);
         while (buf.available() > 0) {
             AMQFrame frame;
             if (!frame.decode(buf))  // Not enough data.
                 throw Exception("Received incomplete cluster event.");
-            void* connection;
-            decodePtr(buf, connection);
-            deliverQueue.push(Message(frame, from, connection));
+            Connection* cp;
+            decodePtr(buf, cp);
+            QPID_LOG(critical, "deliverQ.push " << frame);
+            deliverQueue.push(Message(frame, ConnectionId(from, cp)));
         }
     }
     catch (const std::exception& e) {
@@ -213,23 +201,21 @@
                              const MessageQueue::iterator& end)
 {
     for (MessageQueue::iterator i = begin; i != end; ++i) {
-        AMQFrame& frame(i->frame);
-        Id from(i->from);
-        ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
+        AMQFrame& frame(i->first);
+        ConnectionId connectionId(i->second);
         try {
-            QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
-
+            QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame);
             if (!broker) {
-                QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
+                QPID_LOG(error, "Unexpected DLVR after leaving the cluster.");
                 return;
             }
-            if (connection && from != self) // Look up shadow for remote connections
-                connection = getShadowConnection(from, connection);
-
-            if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) 
-                handleMethod(from, connection, *frame.getMethod());
-            else 
-                connection->deliver(frame);
+            if (connectionId.getConnectionPtr()) // Connection control
+                getConnection(connectionId)->deliver(frame);
+            else {              // Cluster control
+                ClusterOperations cops(*this, connectionId.getMember());
+                bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled();
+                assert(invoked);
+            }
         }
         catch (const std::exception& e) {
             // FIXME aconway 2008-01-30: exception handling.
@@ -240,54 +226,30 @@
     }
 }
 
-// Handle cluster methods
-// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
-void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {
-    assert(method.amqpClassId() == CLUSTER_CLASS_ID);
-    switch (method.amqpMethodId()) {
-      case CLUSTER_NOTIFY_METHOD_ID: {
-          ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method);
-          Mutex::ScopedLock l(lock);
-          members[from].url=notify.getUrl();
-          lock.notifyAll();
-          break;
-      }
-      case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
-          if (!connection->isLocal())
-              shadowConnectionMap.erase(connection->getShadowId());
-          else
-              localConnectionSet.erase(connection);
-          connection->deliverClosed();
-          break;
-      }
-      case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
-          ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method);
-          connection->deliverDoOutput(doOutput.getBytes());
-          break;
-      }
-      default:
-        assert(0);
-    }
+void Cluster::joined(const MemberId& member, const string& url) {
+    Mutex::ScopedLock l(lock);
+    QPID_LOG(debug, member << " has URL " << url);
+    urls[member] = url;
+    lock.notifyAll();
 }
 
 void Cluster::configChange(
     cpg_handle_t /*handle*/,
     cpg_name */*group*/,
-    cpg_address *current, int nCurrent,
+    cpg_address */*current*/, int /*nCurrent*/,
     cpg_address *left, int nLeft,
-    cpg_address */*joined*/, int nJoined)
+    cpg_address *joined, int nJoined)
 {
+    QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft));
     Mutex::ScopedLock l(lock);
-    for (int i = 0; i < nLeft; ++i)  
-        members.erase(left[i]);
-    for(int j = 0; j < nCurrent; ++j) 
-        members[current[j]].id = current[j];
-    QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):"
-             << members);
-    assert(members.size() == size_t(nCurrent));
-    if (members.find(self) == members.end()) 
+    // We add URLs to the map in joined() we don't keep track of pre-URL members yet.
+    for (int l = 0; l < nLeft; ++l) urls.erase(left[l]);
+
+    if (std::find(left, left+nLeft, self) != left+nLeft) {
         broker = 0;       // We have left the group, this is the final config change.
-    lock.notifyAll();     // Threads waiting for membership changes.  
+        QPID_LOG(debug, "Leaving cluster " << *this);
+    }
+    lock.notifyAll();     // Threads waiting for url changes.  
 }
 
 void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -301,6 +263,16 @@
     broker->shutdown();
 }
 
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+    Mutex::ScopedLock l(lock);
+    connections[c->getId()] = c;
+}
+
+void Cluster::erase(ConnectionId id) {
+    Mutex::ScopedLock l(lock);
+    connections.erase(id);
+}
+
 }} // namespace qpid::cluster
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Aug 29 11:18:45 2008
@@ -19,22 +19,16 @@
  *
  */
 
+#include "qpid/cluster/types.h"
 #include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/ShadowConnectionOutputHandler.h"
 #include "qpid/cluster/PollableQueue.h"
+#include "qpid/cluster/NoOpConnectionOutputHandler.h"
 
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
-#include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Monitor.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/log/Logger.h"
+#include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/Url.h"
-#include "qpid/RefCounted.h"
 
-#include <boost/optional.hpp>
-#include <boost/function.hpp>
 #include <boost/intrusive_ptr.hpp>
 
 #include <map>
@@ -43,24 +37,15 @@
 namespace qpid {
 namespace cluster {
 
-class ConnectionInterceptor;
+class Connection;
 
 /**
  * Connection to the cluster.
  * Keeps cluster membership data.
  */
-class Cluster : private Cpg::Handler, public RefCounted
+class Cluster : public RefCounted, private Cpg::Handler
 {
   public:
-    typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
-
-    /** Details of a cluster member */
-    struct Member {
-        Cpg::Id  id;
-        Url url;
-    };
-    
-    typedef std::vector<Member> MemberList;
 
     /**
      * Join a cluster.
@@ -71,11 +56,11 @@
 
     virtual ~Cluster();
 
-    /** Initialize interceptors for a new connection */
-    void initialize(broker::Connection&);
+    void insert(const boost::intrusive_ptr<Connection>&); // Insert a local connection
+    void erase(ConnectionId);          // Erase a connection.
     
-    /** Get the current cluster membership. */
-    MemberList getMembers() const;
+    /** Get the URLs of current cluster members. */
+    std::vector<Url> getUrls() const;
 
     /** Number of members in the cluster. */
     size_t size() const;
@@ -83,33 +68,27 @@
     bool empty() const { return size() == 0; }
     
     /** Send frame to the cluster */
-    void send(const framing::AMQFrame&, ConnectionInterceptor*);
+    void send(const framing::AMQFrame&, const ConnectionId&);
 
     /** Leave the cluster */
     void leave();
     
-    // Cluster frame handing functions
-    void notify(const std::string& url);
-    void connectionClose();
+    void joined(const MemberId&, const std::string& url);
+
+    broker::Broker& getBroker() { assert(broker); return *broker; }
 
+    MemberId getSelf() const { return self; }
+    
   private:
-    typedef Cpg::Id Id;
-    typedef std::map<Id, Member>  MemberMap;
-    typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
-    typedef std::set<ConnectionInterceptor*> LocalConnectionSet;
+    typedef std::map<MemberId, Url>  UrlMap;
+    typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
 
     /** Message sent over the cluster. */
-    struct Message {
-        framing::AMQFrame frame; Id from; void* connection;
-        Message(const framing::AMQFrame& f, const Id i, void* c)
-            : frame(f), from(i), connection(c) {}
-    };
+    typedef std::pair<framing::AMQFrame, ConnectionId> Message;
     typedef PollableQueue<Message> MessageQueue;
 
     boost::function<void()> shutdownNext;
     
-    void notify();              ///< Notify cluster of my details.
-
     /** CPG deliver callback. */
     void deliver(
         cpg_handle_t /*handle*/,
@@ -142,9 +121,9 @@
     /** Callback if CPG fd is disconnected. */
     void disconnect(sys::DispatchHandle&);
 
-    void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
+    void handleMethod(MemberId from, cluster::Connection* connection, framing::AMQMethodBody& method);
 
-    ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*);
+    boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
 
     mutable sys::Monitor lock;  // Protect access to members.
     broker::Broker* broker;
@@ -152,18 +131,17 @@
     Cpg cpg;
     Cpg::Name name;
     Url url;
-    MemberMap members;
-    Id self;
-    ShadowConnectionMap shadowConnectionMap;
-    LocalConnectionSet localConnectionSet;
-    ShadowConnectionOutputHandler shadowOut;
+    UrlMap urls;
+    MemberId self;
+    ConnectionMap connections;
+    NoOpConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
     MessageQueue deliverQueue;
     MessageQueue mcastQueue;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
-  friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
-  friend std::ostream& operator <<(std::ostream&, const MemberMap&);
+  friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
+  friend std::ostream& operator <<(std::ostream&, const UrlMap&);
 };
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Aug 29 11:18:45 2008
@@ -16,10 +16,13 @@
  *
  */
 
-#include "ConnectionInterceptor.h"
+#include "Connection.h"
+#include "ConnectionCodec.h"
 
-#include "qpid/broker/Broker.h"
 #include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/ConnectionCodec.h"
+
+#include "qpid/broker/Broker.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 #include "qpid/shared_ptr.h"
@@ -63,36 +66,25 @@
     ClusterValues values;
     ClusterOptions options;
     boost::intrusive_ptr<Cluster> cluster;
+    boost::scoped_ptr<ConnectionCodec::Factory> factory;
 
     ClusterPlugin() : options(values) {}
 
     Options* getOptions() { return &options; }
 
-    void init(broker::Broker& b) {
-        if (values.name.empty()) return;  // Only if --cluster-name option was specified.
+    void initialize(Plugin::Target& target) {
+        broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+        if (!broker || values.name.empty()) return;  // Only if --cluster-name option was specified.
         if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process.");
-        cluster = new Cluster(values.name, values.getUrl(b.getPort()), b);
-        b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
-    }
-
-    template <class T> void init(T& t) {
-        if (cluster) cluster->initialize(t);
-    }
-    
-    template <class T> bool init(Plugin::Target& target) {
-        T* t = dynamic_cast<T*>(&target);
-        if (t) init(*t);
-        return t;
+        cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
+        broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
+        broker->setConnectionFactory(
+            boost::shared_ptr<sys::ConnectionCodec::Factory>(
+                new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
     }
 
     void earlyInitialize(Plugin::Target&) {}
 
-    void initialize(Plugin::Target& target) {
-        if (init<broker::Broker>(target)) return;
-        if (!cluster) return;   // Remaining plugins only valid if cluster initialized.
-        if (init<broker::Connection>(target)) return;
-    }
-
     void shutdown() { cluster = 0; }
 };
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Aug 29 11:18:45 2008
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connection.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Invoker.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/current_function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+                       const std::string& wrappedId, ConnectionId myId)
+    : cluster(c), self(myId), output(*this, out),
+      connection(&output, cluster.getBroker(), wrappedId)
+{}
+
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+                       const std::string& wrappedId, MemberId myId)
+    : cluster(c), self(myId, this), output(*this, out),
+      connection(&output, cluster.getBroker(), wrappedId)
+{}
+
+Connection::~Connection() {}
+
+// Forward all received frames to the cluster, continue handling on delivery.
+void Connection::received(framing::AMQFrame& f) {
+    cluster.send(f, self);
+}
+
+// Don't doOutput in the 
+bool Connection::doOutput() { return output.doOutput(); } 
+
+// Handle frames delivered from cluster.
+void Connection::deliver(framing::AMQFrame& f) {
+    // Handle connection controls, deliver other frames to connection.
+    if (!framing::invoke(*this, *f.getBody()).wasHandled())
+        connection.received(f);
+}
+
+void Connection::closed() {
+    try {
+        // Called when the local network connection is closed. We still
+        // need to process any outstanding cluster frames for this
+        // connection to ensure our sessions are up-to-date. We defer
+        // closing the Connection object till deliverClosed(), but replace
+        // its output handler with a null handler since the network output
+        // handler will be deleted.
+        // 
+        connection.setOutputHandler(&discardHandler); 
+        cluster.send(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
+    }
+}
+
+void Connection::deliverClose () {
+    connection.closed();
+    cluster.erase(self);
+}
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+// 
+void Connection::deliverDoOutput(size_t requested) {
+    output.deliverDoOutput(requested);
+}
+
+}} // namespace qpid::cluster
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,101 @@
+#ifndef QPID_CLUSTER_CONNECTION_H
+#define QPID_CLUSTER_CONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Cluster.h"
+#include "WriteEstimate.h"
+#include "OutputInterceptor.h"
+
+#include "qpid/broker/Connection.h"
+#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Plug-in associated with broker::Connections, both local and shadow.
+ */
+class Connection :
+        public RefCounted,
+        public sys::ConnectionInputHandler,
+        public sys::ConnectionOutputHandler,
+        public framing::AMQP_AllOperations::ClusterConnectionHandler
+        
+{
+  public:
+    /** Local connection, use this in ConnectionId */
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId);
+    /** Shadow connection */
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
+    ~Connection();
+    
+    ConnectionId getId() const { return self; }
+    bool isLocal() const { return self.second == this; }
+
+    // self-delivery of intercepted extension points.
+    void deliver(framing::AMQFrame& f);
+    void deliverClose();
+    void deliverDoOutput(size_t requested);
+
+    void codecDeleted();
+    
+    Cluster& getCluster() { return cluster; }
+
+    // ConnectionOutputHandler methods
+    void close() {}
+    void send(framing::AMQFrame&) {}
+    void activateOutput() {}
+    virtual size_t getBuffered() const { assert(0); return 0; }
+
+    // ConnectionInputHandler methods
+    void received(framing::AMQFrame&);
+    void closed();
+    bool doOutput();
+    bool hasOutput() { return connection.hasOutput(); }
+    void idleOut() { idleOut(); }
+    void idleIn() { idleIn(); }
+
+    // ConnectionInputHandlerFactory
+    sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient);
+
+    broker::Connection& getBrokerConnection() { return connection; }
+  private:
+    void sendDoOutput();
+
+    Cluster& cluster;
+    ConnectionId self;
+    NoOpConnectionOutputHandler discardHandler;
+    WriteEstimate writeEstimate;
+    OutputInterceptor output;
+    broker::Connection connection;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CONNECTION_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Fri Aug 29 11:18:45 2008
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionCodec.h"
+#include "Connection.h"
+#include "ProxyInputHandler.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/memory.h"
+
+namespace qpid {
+namespace cluster {
+
+sys::ConnectionCodec*
+ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
+    if (v == framing::ProtocolVersion(0, 10)) 
+        return new ConnectionCodec(out, id, cluster);
+    return 0;
+}
+
+sys::ConnectionCodec*
+ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
+    // FIXME aconway 2008-08-27: outbound connections need to be made
+    // with proper qpid::client code for failover, get rid of this
+    // broker-side hack.
+    return next->create(out, id);
+}
+
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster)
+    : codec(out, id, false),
+      interceptor(new Connection(cluster, codec, id, cluster.getSelf()))
+{
+    std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
+    codec.setInputHandler(ih);
+    cluster.insert(interceptor);
+}
+
+ConnectionCodec::~ConnectionCodec() {}
+
+// ConnectionCodec functions delegate to the codecOutput
+size_t ConnectionCodec::decode(const char* buffer, size_t size) { return codec.decode(buffer, size); }
+size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
+bool ConnectionCodec::canEncode() { return codec.canEncode(); }
+void ConnectionCodec::closed() { codec.closed(); }
+bool ConnectionCodec::isClosed() const { return codec.isClosed(); }
+framing::ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); }
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,77 @@
+#ifndef QPID_CLUSTER_CONNCTIONCODEC_H
+#define QPID_CLUSTER_CONNCTIONCODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/cluster/Connection.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Connection;
+}
+
+namespace cluster {
+class Cluster;
+
+/**
+ * Encapsulates the standard amqp_0_10::ConnectionCodec and sets up
+ * a cluster::Connection for the connection.
+ *
+ * The ConnectionCodec is deleted by the network layer when the
+ * connection closes. The cluster::Connection needs to be kept
+ * around until all cluster business on the connection is complete.
+ *
+ */
+class ConnectionCodec : public sys::ConnectionCodec {
+  public:
+    struct Factory : public sys::ConnectionCodec::Factory {
+        boost::shared_ptr<sys::ConnectionCodec::Factory> next;
+        Cluster& cluster;
+        Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {}
+        sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
+        sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
+    };
+
+    ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c);
+    ~ConnectionCodec();
+
+    // ConnectionCodec functions delegate to the codecOutput
+    size_t decode(const char* buffer, size_t size);
+    size_t encode(const char* buffer, size_t size);
+    bool canEncode();
+    void closed();
+    bool isClosed() const;
+    framing::ProtocolVersion getVersion() const;
+    
+
+  private:
+    amqp_0_10::Connection codec;
+    boost::intrusive_ptr<cluster::Connection> interceptor;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CONNCTIONCODEC_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Aug 29 11:18:45 2008
@@ -18,7 +18,6 @@
 
 #include "Cpg.h"
 #include "qpid/sys/Mutex.h"
-// Note cpg is currently unix-specific. Refactor if availble on other platforms.
 #include "qpid/sys/posix/PrivatePosix.h"
 #include "qpid/log/Statement.h"
 
@@ -170,27 +169,50 @@
     return "Cannot mcast to CPG group "+group.str();
 }
 
-Cpg::Id Cpg::self() const {
+MemberId Cpg::self() const {
     unsigned int nodeid;
     check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
-    return Id(nodeid, getpid());
+    return MemberId(nodeid, getpid());
 }
 
-ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
-    ostream_iterator<Cpg::Id> i(o, " ");
-    std::copy(a.first, a.first+a.second, i);
-    return o;
+ostream& operator <<(ostream& out, const MemberId& id) {
+    return out << std::hex << id.first << ":" << std::dec << id.second;
 }
 
-ostream& operator <<(ostream& out, const Cpg::Id& id) {
-    return out << id.getNodeId() << "-" << id.getPid();
+ostream& operator<<(ostream& o, const ConnectionId& c) {
+    return o << c.first << "-" << c.second;
 }
 
-ostream& operator <<(ostream& out, const cpg_name& name) {
-    return out << string(name.value, name.length);
+ostream& operator<<(ostream& o, const cpg_name& name) {
+    return o << string(name.value, name.length);
 }
 
 }} // namespace qpid::cluster
 
 
+// In proper namespace for ADL.
+
+std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) {
+    const char* reasonString;
+    switch (a.reason) {
+      case CPG_REASON_JOIN: reasonString = "joined"; break;
+      case CPG_REASON_LEAVE: reasonString = "left";break;
+      case CPG_REASON_NODEDOWN: reasonString = "node-down";break;
+      case CPG_REASON_NODEUP: reasonString = "node-up";break;
+      case CPG_REASON_PROCDOWN: reasonString = "process-down";break;
+      default:
+        assert(0);
+        reasonString = "";
+    }
+    return o << qpid::cluster::MemberId(a.nodeid, a.pid) << " " << reasonString;
+}
+
+namespace std {
+ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
+    for (cpg_address* p = a.first; p < a.first+a.second; ++p)
+        o << *p << " ";
+    return o;
+}
+}
+
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Fri Aug 29 11:18:45 2008
@@ -19,12 +19,12 @@
  *
  */
 
+#include "qpid/cluster/types.h"
+#include "qpid/cluster/Dispatchable.h"
+
 #include "qpid/Exception.h"
 #include "qpid/sys/IOHandle.h"
-#include "qpid/cluster/Dispatchable.h"
 
-#include <boost/tuple/tuple.hpp>
-#include <boost/tuple/tuple_comparison.hpp>
 #include <boost/scoped_ptr.hpp>
 
 #include <cassert>
@@ -65,14 +65,6 @@
         std::string str() const { return std::string(value, length); }
     };
 
-    // boost::tuple gives us == and < for free.
-    struct Id : public boost::tuple<uint32_t, uint32_t>  {
-        Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {}
-        Id(const cpg_address& addr) : boost::tuple<uint32_t, uint32_t>(addr.nodeid, addr.pid) {}
-        uint32_t getNodeId() const { return boost::get<0>(*this); }
-        uint32_t getPid() const { return boost::get<1>(*this); }
-    };
-
     static std::string str(const cpg_name& n) {
         return std::string(n.value, n.length);
     }
@@ -127,7 +119,7 @@
 
     cpg_handle_t getHandle() const { return handle; }
 
-    Id self() const;
+    MemberId self() const;
 
     int getFd();
     
@@ -166,9 +158,7 @@
     bool isShutdown;
 };
 
-std::ostream& operator <<(std::ostream& out, const cpg_name& name);
-std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
-std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses);
+std::ostream& operator <<(std::ostream& out, const MemberId& id);
 
 inline bool operator==(const cpg_name& a, const cpg_name& b) {
     return a.length==b.length &&  strncmp(a.value, b.value, a.length) == 0;
@@ -177,5 +167,12 @@
 
 }} // namespace qpid::cluster
 
+// In proper namespaces for ADL
+std::ostream& operator <<(std::ostream& out, const cpg_name& name);
+std::ostream& operator<<(std::ostream& o, const cpg_address& a);
+namespace std {
+std::ostream& operator <<(std::ostream& out, std::pair<cpg_address*,int> addresses);
+}
+
 
 #endif  /*!CPG_H*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H
+#define QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/sys/ConnectionOutputHandler.h>
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Output handler for frames sent to noop connections.
+ * Simply discards frames.
+ */
+class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
+{
+  public:
+    virtual void send(framing::AMQFrame&) {}
+    virtual void close() {}
+    virtual void activateOutput() {}
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Fri Aug 29 11:18:45 2008
@@ -19,9 +19,10 @@
  *
  */
 #include "OutputInterceptor.h"
-#include "ConnectionInterceptor.h"
-#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "Connection.h"
+#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/log/Statement.h"
 #include <boost/current_function.hpp>
 
 
@@ -30,7 +31,7 @@
 
 using namespace framing;
 
-OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h)
+OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h)
     : parent(p), next(h), sent(), moreOutput(), doingOutput()
 {}
 
@@ -57,8 +58,6 @@
 // which stocks up the write buffers with data.
 // 
 void OutputInterceptor::deliverDoOutput(size_t requested) {
-    if (parent.getClosed()) return;
-
     Locker l(lock);
     size_t buf = next.getBuffered();
     if (parent.isLocal())
@@ -68,7 +67,7 @@
     sent = 0;
     do {
         sys::Mutex::ScopedUnlock u(lock);
-        moreOutput = doOutputNext(); // Calls send()
+        moreOutput = parent.getBrokerConnection().doOutput();
     } while (sent < requested && moreOutput);
     sent += buf;                // Include buffered data in the sent total.
 
@@ -88,8 +87,8 @@
 // Send a doOutput request if one is not already in flight.
 void OutputInterceptor::sendDoOutput() {
     // Call with lock held.
-    if (parent.isShadow() || parent.getClosed())
-        return;
+    // FIXME aconway 2008-08-28: used to  have || parent.getClosed())
+    if (!parent.isLocal()) return;
 
     doingOutput = true;
     size_t request = writeEstimate.sending(getBuffered());
@@ -98,8 +97,8 @@
     // Send it anyway to keep the doOutput chain going until we are sure there's no more output
     // (in deliverDoOutput)
     // 
-    parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>(
-                                          framing::ProtocolVersion(), request)), &parent);
+    parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>(
+                                          framing::ProtocolVersion(), request)), parent.getId());
     QPID_LOG(trace, &parent << "Send doOutput request for " << request);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Fri Aug 29 11:18:45 2008
@@ -31,14 +31,14 @@
 namespace framing { class AMQFrame; }
 namespace cluster {
 
-class ConnectionInterceptor;
+class Connection;
 
 /**
  * Interceptor for connection OutputHandler, manages outgoing message replication.
  */
 class OutputInterceptor : public sys::ConnectionOutputHandler {
   public:
-    OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h);
+    OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h);
 
     // sys::ConnectionOutputHandler functions
     void send(framing::AMQFrame& f);
@@ -51,9 +51,7 @@
     // Intercept doOutput requests on Connection.
     bool doOutput();
 
-    boost::function<bool ()> doOutputNext;
-    
-    ConnectionInterceptor& parent;
+    cluster::Connection& parent;
     
   private:
     typedef sys::Mutex::ScopedLock Locker;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Fri Aug 29 11:18:45 2008
@@ -90,7 +90,7 @@
     batch.swap(queue);
     condition.clear();
     ScopedUnlock u(lock);
-    callback(batch.begin(), batch.end()); // Process the batch outside the lock.
+    callback(batch.begin(), batch.end()); // Process outside the lock to allow concurrent push.
     h.rewatch();
 }
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,57 @@
+#ifndef QPID_CLUSTER_PROXYINPUTHANDLER_H
+#define QPID_CLUSTER_PROXYINPUTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/ConnectionInputHandler.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Proxies ConnectionInputHandler functions and ensures target.closed() 
+ * is called, on deletion if not before.
+ */
+class ProxyInputHandler : public sys::ConnectionInputHandler
+{
+  public:
+    ProxyInputHandler(boost::intrusive_ptr<cluster::Connection> t) : target(t) {}
+    ~ProxyInputHandler() { closed(); }
+    
+    void received(framing::AMQFrame& f) { target->received(f); }
+    void closed() { if (target) target->closed(); target = 0; }
+    void idleOut() { target->idleOut(); }
+    void idleIn() { target->idleIn(); }
+    bool doOutput() { return target->doOutput(); }
+    bool hasOutput() { return target->hasOutput(); }
+    
+  private:
+    boost::intrusive_ptr<cluster::Connection> target;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_PROXYINPUTHANDLER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,58 @@
+#ifndef QPID_CLUSTER_TYPES_H
+#define QPID_CLUSTER_TYPES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <utility>
+#include <iosfwd>
+#include <stdint.h>
+
+extern "C" {
+#include <openais/cpg.h>
+}
+
+namespace qpid {
+namespace cluster {
+
+class Connection;
+
+/** first=node-id, second=pid */
+struct MemberId : std::pair<uint32_t, uint32_t> {
+    MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
+    MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {}
+    uint32_t getNode() const { return first; }
+    uint32_t getPid() const { return second; }
+};
+
+inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id == MemberId(caddr); }
+
+std::ostream& operator<<(std::ostream&, const MemberId&);
+
+struct ConnectionId : public std::pair<MemberId, Connection*>  {
+    ConnectionId(const MemberId& m=MemberId(), Connection* c=0) :  std::pair<MemberId, Connection*> (m,c) {}
+    MemberId getMember() const { return first; }
+    Connection* getConnectionPtr() const { return second; }
+};
+std::ostream& operator<<(std::ostream&, const ConnectionId&);
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_TYPES_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h Fri Aug 29 11:18:45 2008
@@ -22,14 +22,14 @@
  *
  */
 #include "qpid/framing/ProtocolVersion.h"
-#include "OutputControl.h"
-#include <memory>
-#include <map>
 
 namespace qpid {
 
 namespace sys {
 
+class InputHandlerFactory;
+class OutputControl;
+
 /**
  * Interface of coder/decoder for a connection of a specific protocol
  * version.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h Fri Aug 29 11:18:45 2008
@@ -33,6 +33,7 @@
         public TimeoutHandler, public OutputTask
     {
     public:
+
         virtual void closed() = 0;
     };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h Fri Aug 29 11:18:45 2008
@@ -42,7 +42,8 @@
      *@param id identify the connection for management purposes.
      */
     virtual ConnectionInputHandler* create(ConnectionOutputHandler* out,
-                                           const std::string& id) = 0;
+                                           const std::string& id,
+                                           bool isClient) = 0;
     
     virtual ~ConnectionInputHandlerFactory(){}
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Fri Aug 29 11:18:45 2008
@@ -53,12 +53,12 @@
     }
 
     ~ForkedBroker() {
-        try { stop(); } catch(const std::exception& e) {
-            QPID_LOG(error, QPID_MSG("Stopping forked broker: " << e.what()));
+        try { kill(); } catch(const std::exception& e) {
+            QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what()));
         }
     }
 
-    void stop() {
+    void kill() {
         using qpid::ErrnoException;
         if (pid == 0) return;
         if (::kill(pid, SIGINT) < 0) throw ErrnoException("kill failed");

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Aug 29 11:18:45 2008
@@ -69,8 +69,8 @@
     void add();
     void setup();
     void kill(size_t n) {
-        if (n) forkedBrokers[n-1]->stop();
-        else broker0.shutdown();
+        if (n) forkedBrokers[n-1].kill();
+        else broker0->broker->shutdown();
     }
 };
 
@@ -139,6 +139,14 @@
     BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); 
 }
 
+QPID_AUTO_TEST_CASE(testSingletonCluster) {
+    // Test against a singleton cluster, verify basic operation.
+    ClusterFixture cluster(1);
+    Client c(cluster[0]);
+    BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
+    BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty()); 
+}
+
 QPID_AUTO_TEST_CASE(testWiringReplication) {
     ClusterFixture cluster(3);
     Client c0(cluster[0]);

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Fri Aug 29 11:18:45 2008
@@ -25,19 +25,17 @@
   <class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
     <doc>Qpid extension class to allow clustered brokers to communicate.</doc>
 
-    <control name = "notify" code="0x1">
-      <role name="server" implement="MUST" />
+    <control name = "joined" code="0x1">
       <field name="url" type="str16" />
     </control>
+  </class>
 
-    <control name="connection-close" code="0x2">
-      <role name="server" implement="MUST" />
+  <class name="cluster-connection" code="0x81" label="Qpid clustering extensions.">
+    <control name="deliver-close" code="0x2">
     </control>
 
-    <control name="connection-do-output" code="0x3">
-      <role name="server" implement="MUST" />
+    <control name="deliver-do-output" code="0x3">
       <field name="bytes" type="uint32"/>
     </control>
-
   </class>
 </amqp>