You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/29 20:18:46 UTC
svn commit: r690358 - in /incubator/qpid/trunk/qpid/cpp: src/ src/qpid/
src/qpid/amqp_0_10/ src/qpid/broker/ src/qpid/cluster/ src/qpid/sys/
src/tests/ xml/
Author: aconway
Date: Fri Aug 29 11:18:45 2008
New Revision: 690358
URL: http://svn.apache.org/viewvc?rev=690358&view=rev
Log:
Refactored cluster to intercept at ConnectionCode, using sys:: interfaces rather than boost functions.
Use framing::Operations and Invoker to dispatch cluster methods.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (with props)
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClassifierHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Aug 29 11:18:45 2008
@@ -6,24 +6,26 @@
if CPG
libqpidcluster_la_SOURCES = \
+ qpid/cluster/types.h \
qpid/cluster/Cluster.cpp \
qpid/cluster/Cluster.h \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
qpid/cluster/Dispatchable.h \
qpid/cluster/ClusterPlugin.cpp \
- qpid/cluster/ConnectionInterceptor.h \
- qpid/cluster/ConnectionInterceptor.cpp \
- qpid/cluster/ClassifierHandler.h \
- qpid/cluster/ClassifierHandler.cpp \
- qpid/cluster/ShadowConnectionOutputHandler.h \
+ qpid/cluster/ConnectionCodec.h \
+ qpid/cluster/ConnectionCodec.cpp \
+ qpid/cluster/Connection.h \
+ qpid/cluster/Connection.cpp \
+ qpid/cluster/NoOpConnectionOutputHandler.h \
qpid/cluster/PollableCondition.h \
qpid/cluster/PollableCondition.cpp \
qpid/cluster/PollableQueue.h \
qpid/cluster/WriteEstimate.h \
qpid/cluster/WriteEstimate.cpp \
qpid/cluster/OutputInterceptor.h \
- qpid/cluster/OutputInterceptor.cpp
+ qpid/cluster/OutputInterceptor.cpp \
+ qpid/cluster/ProxyInputHandler.h
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.h Fri Aug 29 11:18:45 2008
@@ -76,7 +76,9 @@
/** Parse url, throw InvalidUrl if invalid. */
explicit Url(const char* url) { parse(url); }
- template<class T> Url& operator=(T s) { parse(s); return *this; }
+ Url& operator=(const Url& u) { this->std::vector<Address>::operator=(u); cache=u.cache; return *this; }
+ Url& operator=(const char* s) { parse(s); return *this; }
+ Url& operator=(const std::string& s) { parse(s); return *this; }
/** Throw InvalidUrl if the URL does not contain any addresses. */
void throwIfEmpty() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Fri Aug 29 11:18:45 2008
@@ -21,16 +21,22 @@
#include "Connection.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/exceptions.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/ProtocolInitiation.h"
namespace qpid {
namespace amqp_0_10 {
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o),
- connection(new broker::Connection(this, broker, id, _isClient)),
- identifier(id), initialized(false), isClient(_isClient), buffered(0) {}
+Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
+ : frameQueueClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0)
+{}
+
+void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) {
+ connection = c;
+}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Fri Aug 29 11:18:45 2008
@@ -22,15 +22,19 @@
*
*/
#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/broker/Connection.h"
#include <boost/intrusive_ptr.hpp>
-#include <deque>
#include <memory>
+#include <deque>
namespace qpid {
-namespace broker { class Broker; }
+
+namespace sys {
+class ConnectionInputHandlerFactory;
+}
+
namespace amqp_0_10 {
class Connection : public sys::ConnectionCodec,
@@ -42,14 +46,15 @@
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
- boost::intrusive_ptr<broker::Connection> connection;
+ std::auto_ptr<sys::ConnectionInputHandler> connection;
std::string identifier;
bool initialized;
bool isClient;
size_t buffered;
public:
- Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
+ Connection(sys::OutputControl&, const std::string& id, bool isClient);
+ void setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c);
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool isClosed() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Aug 29 11:18:45 2008
@@ -133,7 +133,7 @@
acl(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
links(this),
- factory(*this),
+ factory(new ConnectionFactory(*this)),
sessionManager(
qpid::SessionState::Configuration(
conf.replayFlushLimit*1024, // convert kb to bytes.
@@ -372,7 +372,7 @@
// TODO: This should iterate over all protocolFactories
void Broker::accept() {
for (unsigned int i = 0; i < protocolFactories.size(); ++i)
- protocolFactories[i]->accept(poller, &factory);
+ protocolFactories[i]->accept(poller, factory.get());
}
@@ -382,7 +382,7 @@
boost::function2<void, int, std::string> failed,
sys::ConnectionCodec::Factory* f)
{
- getProtocolFactory()->connect(poller, host, port, f ? f : &factory, failed);
+ getProtocolFactory()->connect(poller, host, port, f ? f : factory.get(), failed);
}
void Broker::connect(
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Aug 29 11:18:45 2008
@@ -103,7 +103,7 @@
QueueRegistry queues;
ExchangeRegistry exchanges;
LinkRegistry links;
- ConnectionFactory factory;
+ boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
DtxManager dtxManager;
SessionManager sessionManager;
management::ManagementAgent* managementAgent;
@@ -178,7 +178,10 @@
boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
/** Expose poller so plugins can register their descriptors. */
- boost::shared_ptr<sys::Poller> getPoller();
+ boost::shared_ptr<sys::Poller> getPoller();
+
+ boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
+ void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Aug 29 11:18:45 2008
@@ -49,9 +49,6 @@
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
ConnectionState(out_, broker_),
- receivedFn(boost::bind(&Connection::receivedImpl, this, _1)),
- closedFn(boost::bind(&Connection::closedImpl, this)),
- doOutputFn(boost::bind(&Connection::doOutputImpl, this)),
adapter(*this, isLink_),
isLink(isLink_),
mgmtClosing(false),
@@ -72,8 +69,6 @@
mgmtObject = new management::Connection(agent, this, parent, mgmtId, !isLink);
agent->addObject(mgmtObject);
}
-
- Plugin::initializeAll(*this); // Let plug-ins update extension points.
}
void Connection::requestIOProcessing(boost::function0<void> callback)
@@ -90,9 +85,7 @@
links.notifyClosed(mgmtId);
}
-void Connection::received(framing::AMQFrame& frame) { receivedFn(frame); }
-
-void Connection::receivedImpl(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame) {
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
@@ -172,9 +165,7 @@
void Connection::idleIn(){}
-void Connection::closed() { closedFn(); }
-
-void Connection::closedImpl(){ // Physically closed, suspend open sessions.
+void Connection::closed(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
@@ -194,9 +185,7 @@
bool Connection::hasOutput() { return outputTasks.hasOutput(); }
-bool Connection::doOutput() { return doOutputFn(); }
-
-bool Connection::doOutputImpl() {
+bool Connection::doOutput() {
try{
if (ioCallback)
ioCallback(); // Lend the IO thread for management processing
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Aug 29 11:18:45 2008
@@ -44,7 +44,6 @@
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Connection.h"
-#include "qpid/Plugin.h"
#include "qpid/RefCounted.h"
#include <boost/ptr_container/ptr_map.hpp>
@@ -56,7 +55,6 @@
class Connection : public sys::ConnectionInputHandler,
public ConnectionState,
- public Plugin::Target,
public RefCounted
{
public:
@@ -95,19 +93,10 @@
void notifyConnectionForced(const std::string& text);
void setUserId(const string& uid);
- // Extension points: allow plugins to insert additional functionality.
- boost::function<void(framing::AMQFrame&)> receivedFn;
- boost::function<void ()> closedFn;
- boost::function<bool ()> doOutputFn;
-
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- void receivedImpl(framing::AMQFrame& frame);
- void closedImpl();
- bool doOutputImpl();
-
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Fri Aug 29 11:18:45 2008
@@ -21,11 +21,14 @@
#include "ConnectionFactory.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/broker/Connection.h"
namespace qpid {
namespace broker {
using framing::ProtocolVersion;
+typedef std::auto_ptr<amqp_0_10::Connection> ConnectionPtr;
+typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {}
@@ -33,15 +36,21 @@
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
- if (v == ProtocolVersion(0, 10))
- return new amqp_0_10::Connection(out, broker, id);
+ if (v == ProtocolVersion(0, 10)) {
+ ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
+ c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, false)));
+ return c.release();
+ }
return 0;
}
sys::ConnectionCodec*
ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
// used to create connections from one broker to another
- return new amqp_0_10::Connection(out, broker, id, true);
+ ConnectionPtr c(new amqp_0_10::Connection(out, id, true));
+ c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, true)));
+ return c.release();
}
+
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.h Fri Aug 29 11:18:45 2008
@@ -27,7 +27,8 @@
namespace broker {
class Broker;
-class ConnectionFactory : public sys::ConnectionCodec::Factory {
+class ConnectionFactory : public sys::ConnectionCodec::Factory
+{
public:
ConnectionFactory(Broker& b);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Aug 29 11:18:45 2008
@@ -4,7 +4,7 @@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+n * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -17,18 +17,17 @@
*/
#include "Cluster.h"
-#include "ConnectionInterceptor.h"
+#include "Connection.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/SessionState.h"
-#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ClusterNotifyBody.h"
-#include "qpid/framing/ClusterConnectionCloseBody.h"
-#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "qpid/framing/ClusterJoinedBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Invoker.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -39,22 +38,34 @@
namespace qpid {
namespace cluster {
+
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::Connection;
+// Handle cluster controls from a given member.
+struct ClusterOperations : public framing::AMQP_AllOperations::ClusterHandler {
+ Cluster& cluster;
+ MemberId member;
+
+ ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {}
+
+ void joined(const std::string& url) {
+ cluster.joined(member, url);
+ }
+};
+
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << cluster.name.str() << "-" << cluster.self;
}
-ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
- return out << m.first << "=" << m.second.url;
+ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) {
+ return out << m.first << " at " << m.second;
}
-ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
- ostream_iterator<Cluster::MemberMap::value_type> o(out, " ");
- copy(members.begin(), members.end(), o);
+ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) {
+ ostream_iterator<Cluster::UrlMap::value_type> o(out, " ");
+ copy(urls.begin(), urls.end(), o);
return out;
}
@@ -74,9 +85,9 @@
mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(trace, "Joining cluster: " << name_);
+ QPID_LOG(trace, "Node " << self << " joining cluster: " << name_);
cpg.join(name);
- notify();
+ send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0));
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
@@ -84,31 +95,15 @@
mcastQueue.start(poller);
}
-Cluster::~Cluster() {
- for (ShadowConnectionMap::iterator i = shadowConnectionMap.begin();
- i != shadowConnectionMap.end();
- ++i)
- {
- i->second->dirtyClose();
- }
- std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
-}
-
-void Cluster::initialize(broker::Connection& c) {
- bool isLocal = c.getOutput().get() != &shadowOut;
- if (isLocal)
- localConnectionSet.insert(new ConnectionInterceptor(c, *this));
-}
+Cluster::~Cluster() {}
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
// Leave is called by from Broker destructor after the poller has
// been shut down. No dispatches can occur.
-
- QPID_LOG(debug, "Leaving cluster " << *this);
cpg.leave(name);
- // broker= is set to 0 when the final config-change is delivered.
+ // broker is set to 0 when the final config-change is delivered.
while(broker) {
Mutex::ScopedUnlock u(lock);
cpg.dispatchAll();
@@ -126,9 +121,9 @@
buf.putLongLong(value);
}
-void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
- QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- mcastQueue.push(Message(frame, self, connection));
+void Cluster::send(const AMQFrame& frame, const ConnectionId& id) {
+ QPID_LOG(trace, "MCAST [" << id << "] " << frame);
+ mcastQueue.push(Message(frame, id));
}
void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
@@ -137,48 +132,40 @@
// Static is OK because there is only one cluster allowed per
// process and only one thread in mcastQueueCb at a time.
static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
- MessageQueue::iterator i = begin;
- while (i != end) {
- Buffer buf(buffer, sizeof(buffer));
- while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) {
- i->frame.encode(buf);
- encodePtr(buf, i->connection);
- ++i;
- }
- iovec iov = { buffer, buf.getPosition() };
- cpg.mcast(name, &iov, 1);
+ Buffer buf(buffer, sizeof(buffer));
+ for (MessageQueue::iterator i = begin; i != end; ++i) {
+ AMQFrame& frame =i->first;
+ ConnectionId id =i->second;
+ if (buf.available() < frame.size() + sizeof(uint64_t))
+ break;
+ frame.encode(buf);
+ encodePtr(buf, id.second);
}
-}
-
-void Cluster::notify() {
- send(AMQFrame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str())), 0);
+ iovec iov = { buffer, buf.getPosition() };
+ cpg.mcast(name, &iov, 1);
}
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
- return members.size();
+ return urls.size();
}
-Cluster::MemberList Cluster::getMembers() const {
+std::vector<Url> Cluster::getUrls() const {
Mutex::ScopedLock l(lock);
- MemberList result(members.size());
- std::transform(members.begin(), members.end(), result.begin(),
- boost::bind(&MemberMap::value_type::second, _1));
+ std::vector<Url> result(urls.size());
+ std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1));
return result;
}
-ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
- ShadowConnectionId id(member, remotePtr);
- ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
- if (i == shadowConnectionMap.end()) { // A new shadow connection.
+boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
+ boost::intrusive_ptr<Connection> c = connections[id];
+ if (!c && id.first != self) { // Shadow connection
std::ostringstream os;
- os << name << ":" << member << ":" << remotePtr;
- assert(broker);
- broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str());
- ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id));
- i = shadowConnectionMap.insert(value).first;
+ os << id;
+ c = connections[id] = new Connection(*this, shadowOut, os.str(), id);
}
- return i->second;
+ assert(c);
+ return c;
}
void Cluster::deliver(
@@ -189,16 +176,17 @@
void* msg,
int msg_len)
{
- Id from(nodeid, pid);
+ MemberId from(nodeid, pid);
try {
Buffer buf(static_cast<char*>(msg), msg_len);
while (buf.available() > 0) {
AMQFrame frame;
if (!frame.decode(buf)) // Not enough data.
throw Exception("Received incomplete cluster event.");
- void* connection;
- decodePtr(buf, connection);
- deliverQueue.push(Message(frame, from, connection));
+ Connection* cp;
+ decodePtr(buf, cp);
+ QPID_LOG(critical, "deliverQ.push " << frame);
+ deliverQueue.push(Message(frame, ConnectionId(from, cp)));
}
}
catch (const std::exception& e) {
@@ -213,23 +201,21 @@
const MessageQueue::iterator& end)
{
for (MessageQueue::iterator i = begin; i != end; ++i) {
- AMQFrame& frame(i->frame);
- Id from(i->from);
- ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
+ AMQFrame& frame(i->first);
+ ConnectionId connectionId(i->second);
try {
- QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
-
+ QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame);
if (!broker) {
- QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
+ QPID_LOG(error, "Unexpected DLVR after leaving the cluster.");
return;
}
- if (connection && from != self) // Look up shadow for remote connections
- connection = getShadowConnection(from, connection);
-
- if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
- handleMethod(from, connection, *frame.getMethod());
- else
- connection->deliver(frame);
+ if (connectionId.getConnectionPtr()) // Connection control
+ getConnection(connectionId)->deliver(frame);
+ else { // Cluster control
+ ClusterOperations cops(*this, connectionId.getMember());
+ bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled();
+ assert(invoked);
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -240,54 +226,30 @@
}
}
-// Handle cluster methods
-// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
-void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {
- assert(method.amqpClassId() == CLUSTER_CLASS_ID);
- switch (method.amqpMethodId()) {
- case CLUSTER_NOTIFY_METHOD_ID: {
- ClusterNotifyBody& notify=static_cast<ClusterNotifyBody&>(method);
- Mutex::ScopedLock l(lock);
- members[from].url=notify.getUrl();
- lock.notifyAll();
- break;
- }
- case CLUSTER_CONNECTION_CLOSE_METHOD_ID: {
- if (!connection->isLocal())
- shadowConnectionMap.erase(connection->getShadowId());
- else
- localConnectionSet.erase(connection);
- connection->deliverClosed();
- break;
- }
- case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
- ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method);
- connection->deliverDoOutput(doOutput.getBytes());
- break;
- }
- default:
- assert(0);
- }
+void Cluster::joined(const MemberId& member, const string& url) {
+ Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, member << " has URL " << url);
+ urls[member] = url;
+ lock.notifyAll();
}
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
- cpg_address *current, int nCurrent,
+ cpg_address */*current*/, int /*nCurrent*/,
cpg_address *left, int nLeft,
- cpg_address */*joined*/, int nJoined)
+ cpg_address *joined, int nJoined)
{
+ QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft));
Mutex::ScopedLock l(lock);
- for (int i = 0; i < nLeft; ++i)
- members.erase(left[i]);
- for(int j = 0; j < nCurrent; ++j)
- members[current[j]].id = current[j];
- QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):"
- << members);
- assert(members.size() == size_t(nCurrent));
- if (members.find(self) == members.end())
+ // We add URLs to the map in joined() we don't keep track of pre-URL members yet.
+ for (int l = 0; l < nLeft; ++l) urls.erase(left[l]);
+
+ if (std::find(left, left+nLeft, self) != left+nLeft) {
broker = 0; // We have left the group, this is the final config change.
- lock.notifyAll(); // Threads waiting for membership changes.
+ QPID_LOG(debug, "Leaving cluster " << *this);
+ }
+ lock.notifyAll(); // Threads waiting for url changes.
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -301,6 +263,16 @@
broker->shutdown();
}
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(lock);
+ connections[c->getId()] = c;
+}
+
+void Cluster::erase(ConnectionId id) {
+ Mutex::ScopedLock l(lock);
+ connections.erase(id);
+}
+
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Aug 29 11:18:45 2008
@@ -19,22 +19,16 @@
*
*/
+#include "qpid/cluster/types.h"
#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/ShadowConnectionOutputHandler.h"
#include "qpid/cluster/PollableQueue.h"
+#include "qpid/cluster/NoOpConnectionOutputHandler.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
-#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/log/Logger.h"
+#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/Url.h"
-#include "qpid/RefCounted.h"
-#include <boost/optional.hpp>
-#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
#include <map>
@@ -43,24 +37,15 @@
namespace qpid {
namespace cluster {
-class ConnectionInterceptor;
+class Connection;
/**
* Connection to the cluster.
* Keeps cluster membership data.
*/
-class Cluster : private Cpg::Handler, public RefCounted
+class Cluster : public RefCounted, private Cpg::Handler
{
public:
- typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
-
- /** Details of a cluster member */
- struct Member {
- Cpg::Id id;
- Url url;
- };
-
- typedef std::vector<Member> MemberList;
/**
* Join a cluster.
@@ -71,11 +56,11 @@
virtual ~Cluster();
- /** Initialize interceptors for a new connection */
- void initialize(broker::Connection&);
+ void insert(const boost::intrusive_ptr<Connection>&); // Insert a local connection
+ void erase(ConnectionId); // Erase a connection.
- /** Get the current cluster membership. */
- MemberList getMembers() const;
+ /** Get the URLs of current cluster members. */
+ std::vector<Url> getUrls() const;
/** Number of members in the cluster. */
size_t size() const;
@@ -83,33 +68,27 @@
bool empty() const { return size() == 0; }
/** Send frame to the cluster */
- void send(const framing::AMQFrame&, ConnectionInterceptor*);
+ void send(const framing::AMQFrame&, const ConnectionId&);
/** Leave the cluster */
void leave();
- // Cluster frame handing functions
- void notify(const std::string& url);
- void connectionClose();
+ void joined(const MemberId&, const std::string& url);
+
+ broker::Broker& getBroker() { assert(broker); return *broker; }
+ MemberId getSelf() const { return self; }
+
private:
- typedef Cpg::Id Id;
- typedef std::map<Id, Member> MemberMap;
- typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
- typedef std::set<ConnectionInterceptor*> LocalConnectionSet;
+ typedef std::map<MemberId, Url> UrlMap;
+ typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
/** Message sent over the cluster. */
- struct Message {
- framing::AMQFrame frame; Id from; void* connection;
- Message(const framing::AMQFrame& f, const Id i, void* c)
- : frame(f), from(i), connection(c) {}
- };
+ typedef std::pair<framing::AMQFrame, ConnectionId> Message;
typedef PollableQueue<Message> MessageQueue;
boost::function<void()> shutdownNext;
- void notify(); ///< Notify cluster of my details.
-
/** CPG deliver callback. */
void deliver(
cpg_handle_t /*handle*/,
@@ -142,9 +121,9 @@
/** Callback if CPG fd is disconnected. */
void disconnect(sys::DispatchHandle&);
- void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
+ void handleMethod(MemberId from, cluster::Connection* connection, framing::AMQMethodBody& method);
- ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*);
+ boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
mutable sys::Monitor lock; // Protect access to members.
broker::Broker* broker;
@@ -152,18 +131,17 @@
Cpg cpg;
Cpg::Name name;
Url url;
- MemberMap members;
- Id self;
- ShadowConnectionMap shadowConnectionMap;
- LocalConnectionSet localConnectionSet;
- ShadowConnectionOutputHandler shadowOut;
+ UrlMap urls;
+ MemberId self;
+ ConnectionMap connections;
+ NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
MessageQueue deliverQueue;
MessageQueue mcastQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
- friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
- friend std::ostream& operator <<(std::ostream&, const MemberMap&);
+ friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
+ friend std::ostream& operator <<(std::ostream&, const UrlMap&);
};
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Aug 29 11:18:45 2008
@@ -16,10 +16,13 @@
*
*/
-#include "ConnectionInterceptor.h"
+#include "Connection.h"
+#include "ConnectionCodec.h"
-#include "qpid/broker/Broker.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/ConnectionCodec.h"
+
+#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
@@ -63,36 +66,25 @@
ClusterValues values;
ClusterOptions options;
boost::intrusive_ptr<Cluster> cluster;
+ boost::scoped_ptr<ConnectionCodec::Factory> factory;
ClusterPlugin() : options(values) {}
Options* getOptions() { return &options; }
- void init(broker::Broker& b) {
- if (values.name.empty()) return; // Only if --cluster-name option was specified.
+ void initialize(Plugin::Target& target) {
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified.
if (cluster) throw Exception("Cluster plugin cannot be initialized twice in one process.");
- cluster = new Cluster(values.name, values.getUrl(b.getPort()), b);
- b.addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
- }
-
- template <class T> void init(T& t) {
- if (cluster) cluster->initialize(t);
- }
-
- template <class T> bool init(Plugin::Target& target) {
- T* t = dynamic_cast<T*>(&target);
- if (t) init(*t);
- return t;
+ cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
+ broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
+ broker->setConnectionFactory(
+ boost::shared_ptr<sys::ConnectionCodec::Factory>(
+ new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
}
void earlyInitialize(Plugin::Target&) {}
- void initialize(Plugin::Target& target) {
- if (init<broker::Broker>(target)) return;
- if (!cluster) return; // Remaining plugins only valid if cluster initialized.
- if (init<broker::Connection>(target)) return;
- }
-
void shutdown() { cluster = 0; }
};
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Aug 29 11:18:45 2008
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connection.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Invoker.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/current_function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+ const std::string& wrappedId, ConnectionId myId)
+ : cluster(c), self(myId), output(*this, out),
+ connection(&output, cluster.getBroker(), wrappedId)
+{}
+
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+ const std::string& wrappedId, MemberId myId)
+ : cluster(c), self(myId, this), output(*this, out),
+ connection(&output, cluster.getBroker(), wrappedId)
+{}
+
+Connection::~Connection() {}
+
+// Forward all received frames to the cluster, continue handling on delivery.
+void Connection::received(framing::AMQFrame& f) {
+ cluster.send(f, self);
+}
+
+// Don't doOutput in the
+bool Connection::doOutput() { return output.doOutput(); }
+
+// Handle frames delivered from cluster.
+void Connection::deliver(framing::AMQFrame& f) {
+ // Handle connection controls, deliver other frames to connection.
+ if (!framing::invoke(*this, *f.getBody()).wasHandled())
+ connection.received(f);
+}
+
+void Connection::closed() {
+ try {
+ // Called when the local network connection is closed. We still
+ // need to process any outstanding cluster frames for this
+ // connection to ensure our sessions are up-to-date. We defer
+ // closing the Connection object till deliverClosed(), but replace
+ // its output handler with a null handler since the network output
+ // handler will be deleted.
+ //
+ connection.setOutputHandler(&discardHandler);
+ cluster.send(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
+ }
+}
+
+void Connection::deliverClose () {
+ connection.closed();
+ cluster.erase(self);
+}
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void Connection::deliverDoOutput(size_t requested) {
+ output.deliverDoOutput(requested);
+}
+
+}} // namespace qpid::cluster
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,101 @@
+#ifndef QPID_CLUSTER_CONNECTION_H
+#define QPID_CLUSTER_CONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Cluster.h"
+#include "WriteEstimate.h"
+#include "OutputInterceptor.h"
+
+#include "qpid/broker/Connection.h"
+#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Plug-in associated with broker::Connections, both local and shadow.
+ */
+class Connection :
+ public RefCounted,
+ public sys::ConnectionInputHandler,
+ public sys::ConnectionOutputHandler,
+ public framing::AMQP_AllOperations::ClusterConnectionHandler
+
+{
+ public:
+ /** Local connection, use this in ConnectionId */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId);
+ /** Shadow connection */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
+ ~Connection();
+
+ ConnectionId getId() const { return self; }
+ bool isLocal() const { return self.second == this; }
+
+ // self-delivery of intercepted extension points.
+ void deliver(framing::AMQFrame& f);
+ void deliverClose();
+ void deliverDoOutput(size_t requested);
+
+ void codecDeleted();
+
+ Cluster& getCluster() { return cluster; }
+
+ // ConnectionOutputHandler methods
+ void close() {}
+ void send(framing::AMQFrame&) {}
+ void activateOutput() {}
+ virtual size_t getBuffered() const { assert(0); return 0; }
+
+ // ConnectionInputHandler methods
+ void received(framing::AMQFrame&);
+ void closed();
+ bool doOutput();
+ bool hasOutput() { return connection.hasOutput(); }
+ void idleOut() { idleOut(); }
+ void idleIn() { idleIn(); }
+
+ // ConnectionInputHandlerFactory
+ sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient);
+
+ broker::Connection& getBrokerConnection() { return connection; }
+ private:
+ void sendDoOutput();
+
+ Cluster& cluster;
+ ConnectionId self;
+ NoOpConnectionOutputHandler discardHandler;
+ WriteEstimate writeEstimate;
+ OutputInterceptor output;
+ broker::Connection connection;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CONNECTION_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Fri Aug 29 11:18:45 2008
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionCodec.h"
+#include "Connection.h"
+#include "ProxyInputHandler.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/memory.h"
+
+namespace qpid {
+namespace cluster {
+
+sys::ConnectionCodec*
+ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
+ if (v == framing::ProtocolVersion(0, 10))
+ return new ConnectionCodec(out, id, cluster);
+ return 0;
+}
+
+sys::ConnectionCodec*
+ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
+ // FIXME aconway 2008-08-27: outbound connections need to be made
+ // with proper qpid::client code for failover, get rid of this
+ // broker-side hack.
+ return next->create(out, id);
+}
+
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster)
+ : codec(out, id, false),
+ interceptor(new Connection(cluster, codec, id, cluster.getSelf()))
+{
+ std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
+ codec.setInputHandler(ih);
+ cluster.insert(interceptor);
+}
+
+ConnectionCodec::~ConnectionCodec() {}
+
+// ConnectionCodec functions delegate to the codecOutput
+size_t ConnectionCodec::decode(const char* buffer, size_t size) { return codec.decode(buffer, size); }
+size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
+bool ConnectionCodec::canEncode() { return codec.canEncode(); }
+void ConnectionCodec::closed() { codec.closed(); }
+bool ConnectionCodec::isClosed() const { return codec.isClosed(); }
+framing::ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); }
+
+}} // namespace qpid::cluster
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,77 @@
+#ifndef QPID_CLUSTER_CONNCTIONCODEC_H
+#define QPID_CLUSTER_CONNCTIONCODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/cluster/Connection.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Connection;
+}
+
+namespace cluster {
+class Cluster;
+
+/**
+ * Encapsulates the standard amqp_0_10::ConnectionCodec and sets up
+ * a cluster::Connection for the connection.
+ *
+ * The ConnectionCodec is deleted by the network layer when the
+ * connection closes. The cluster::Connection needs to be kept
+ * around until all cluster business on the connection is complete.
+ *
+ */
+class ConnectionCodec : public sys::ConnectionCodec {
+ public:
+ struct Factory : public sys::ConnectionCodec::Factory {
+ boost::shared_ptr<sys::ConnectionCodec::Factory> next;
+ Cluster& cluster;
+ Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {}
+ sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
+ sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
+ };
+
+ ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c);
+ ~ConnectionCodec();
+
+ // ConnectionCodec functions delegate to the codecOutput
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+ void closed();
+ bool isClosed() const;
+ framing::ProtocolVersion getVersion() const;
+
+
+ private:
+ amqp_0_10::Connection codec;
+ boost::intrusive_ptr<cluster::Connection> interceptor;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CONNCTIONCODEC_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Aug 29 11:18:45 2008
@@ -18,7 +18,6 @@
#include "Cpg.h"
#include "qpid/sys/Mutex.h"
-// Note cpg is currently unix-specific. Refactor if availble on other platforms.
#include "qpid/sys/posix/PrivatePosix.h"
#include "qpid/log/Statement.h"
@@ -170,27 +169,50 @@
return "Cannot mcast to CPG group "+group.str();
}
-Cpg::Id Cpg::self() const {
+MemberId Cpg::self() const {
unsigned int nodeid;
check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
- return Id(nodeid, getpid());
+ return MemberId(nodeid, getpid());
}
-ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
- ostream_iterator<Cpg::Id> i(o, " ");
- std::copy(a.first, a.first+a.second, i);
- return o;
+ostream& operator <<(ostream& out, const MemberId& id) {
+ return out << std::hex << id.first << ":" << std::dec << id.second;
}
-ostream& operator <<(ostream& out, const Cpg::Id& id) {
- return out << id.getNodeId() << "-" << id.getPid();
+ostream& operator<<(ostream& o, const ConnectionId& c) {
+ return o << c.first << "-" << c.second;
}
-ostream& operator <<(ostream& out, const cpg_name& name) {
- return out << string(name.value, name.length);
+ostream& operator<<(ostream& o, const cpg_name& name) {
+ return o << string(name.value, name.length);
}
}} // namespace qpid::cluster
+// In proper namespace for ADL.
+
+std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) {
+ const char* reasonString;
+ switch (a.reason) {
+ case CPG_REASON_JOIN: reasonString = "joined"; break;
+ case CPG_REASON_LEAVE: reasonString = "left";break;
+ case CPG_REASON_NODEDOWN: reasonString = "node-down";break;
+ case CPG_REASON_NODEUP: reasonString = "node-up";break;
+ case CPG_REASON_PROCDOWN: reasonString = "process-down";break;
+ default:
+ assert(0);
+ reasonString = "";
+ }
+ return o << qpid::cluster::MemberId(a.nodeid, a.pid) << " " << reasonString;
+}
+
+namespace std {
+ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
+ for (cpg_address* p = a.first; p < a.first+a.second; ++p)
+ o << *p << " ";
+ return o;
+}
+}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Fri Aug 29 11:18:45 2008
@@ -19,12 +19,12 @@
*
*/
+#include "qpid/cluster/types.h"
+#include "qpid/cluster/Dispatchable.h"
+
#include "qpid/Exception.h"
#include "qpid/sys/IOHandle.h"
-#include "qpid/cluster/Dispatchable.h"
-#include <boost/tuple/tuple.hpp>
-#include <boost/tuple/tuple_comparison.hpp>
#include <boost/scoped_ptr.hpp>
#include <cassert>
@@ -65,14 +65,6 @@
std::string str() const { return std::string(value, length); }
};
- // boost::tuple gives us == and < for free.
- struct Id : public boost::tuple<uint32_t, uint32_t> {
- Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {}
- Id(const cpg_address& addr) : boost::tuple<uint32_t, uint32_t>(addr.nodeid, addr.pid) {}
- uint32_t getNodeId() const { return boost::get<0>(*this); }
- uint32_t getPid() const { return boost::get<1>(*this); }
- };
-
static std::string str(const cpg_name& n) {
return std::string(n.value, n.length);
}
@@ -127,7 +119,7 @@
cpg_handle_t getHandle() const { return handle; }
- Id self() const;
+ MemberId self() const;
int getFd();
@@ -166,9 +158,7 @@
bool isShutdown;
};
-std::ostream& operator <<(std::ostream& out, const cpg_name& name);
-std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
-std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses);
+std::ostream& operator <<(std::ostream& out, const MemberId& id);
inline bool operator==(const cpg_name& a, const cpg_name& b) {
return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
@@ -177,5 +167,12 @@
}} // namespace qpid::cluster
+// In proper namespaces for ADL
+std::ostream& operator <<(std::ostream& out, const cpg_name& name);
+std::ostream& operator<<(std::ostream& o, const cpg_address& a);
+namespace std {
+std::ostream& operator <<(std::ostream& out, std::pair<cpg_address*,int> addresses);
+}
+
#endif /*!CPG_H*/
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H
+#define QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/sys/ConnectionOutputHandler.h>
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Output handler for frames sent to noop connections.
+ * Simply discards frames.
+ */
+class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
+{
+ public:
+ virtual void send(framing::AMQFrame&) {}
+ virtual void close() {}
+ virtual void activateOutput() {}
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Fri Aug 29 11:18:45 2008
@@ -19,9 +19,10 @@
*
*/
#include "OutputInterceptor.h"
-#include "ConnectionInterceptor.h"
-#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "Connection.h"
+#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/log/Statement.h"
#include <boost/current_function.hpp>
@@ -30,7 +31,7 @@
using namespace framing;
-OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h)
+OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h)
: parent(p), next(h), sent(), moreOutput(), doingOutput()
{}
@@ -57,8 +58,6 @@
// which stocks up the write buffers with data.
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
- if (parent.getClosed()) return;
-
Locker l(lock);
size_t buf = next.getBuffered();
if (parent.isLocal())
@@ -68,7 +67,7 @@
sent = 0;
do {
sys::Mutex::ScopedUnlock u(lock);
- moreOutput = doOutputNext(); // Calls send()
+ moreOutput = parent.getBrokerConnection().doOutput();
} while (sent < requested && moreOutput);
sent += buf; // Include buffered data in the sent total.
@@ -88,8 +87,8 @@
// Send a doOutput request if one is not already in flight.
void OutputInterceptor::sendDoOutput() {
// Call with lock held.
- if (parent.isShadow() || parent.getClosed())
- return;
+ // FIXME aconway 2008-08-28: used to have || parent.getClosed())
+ if (!parent.isLocal()) return;
doingOutput = true;
size_t request = writeEstimate.sending(getBuffered());
@@ -98,8 +97,8 @@
// Send it anyway to keep the doOutput chain going until we are sure there's no more output
// (in deliverDoOutput)
//
- parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>(
- framing::ProtocolVersion(), request)), &parent);
+ parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>(
+ framing::ProtocolVersion(), request)), parent.getId());
QPID_LOG(trace, &parent << "Send doOutput request for " << request);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Fri Aug 29 11:18:45 2008
@@ -31,14 +31,14 @@
namespace framing { class AMQFrame; }
namespace cluster {
-class ConnectionInterceptor;
+class Connection;
/**
* Interceptor for connection OutputHandler, manages outgoing message replication.
*/
class OutputInterceptor : public sys::ConnectionOutputHandler {
public:
- OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h);
+ OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h);
// sys::ConnectionOutputHandler functions
void send(framing::AMQFrame& f);
@@ -51,9 +51,7 @@
// Intercept doOutput requests on Connection.
bool doOutput();
- boost::function<bool ()> doOutputNext;
-
- ConnectionInterceptor& parent;
+ cluster::Connection& parent;
private:
typedef sys::Mutex::ScopedLock Locker;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Fri Aug 29 11:18:45 2008
@@ -90,7 +90,7 @@
batch.swap(queue);
condition.clear();
ScopedUnlock u(lock);
- callback(batch.begin(), batch.end()); // Process the batch outside the lock.
+ callback(batch.begin(), batch.end()); // Process outside the lock to allow concurrent push.
h.rewatch();
}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,57 @@
+#ifndef QPID_CLUSTER_PROXYINPUTHANDLER_H
+#define QPID_CLUSTER_PROXYINPUTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/ConnectionInputHandler.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Proxies ConnectionInputHandler functions and ensures target.closed()
+ * is called, on deletion if not before.
+ */
+class ProxyInputHandler : public sys::ConnectionInputHandler
+{
+ public:
+ ProxyInputHandler(boost::intrusive_ptr<cluster::Connection> t) : target(t) {}
+ ~ProxyInputHandler() { closed(); }
+
+ void received(framing::AMQFrame& f) { target->received(f); }
+ void closed() { if (target) target->closed(); target = 0; }
+ void idleOut() { target->idleOut(); }
+ void idleIn() { target->idleIn(); }
+ bool doOutput() { return target->doOutput(); }
+ bool hasOutput() { return target->hasOutput(); }
+
+ private:
+ boost::intrusive_ptr<cluster::Connection> target;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_PROXYINPUTHANDLER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ProxyInputHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=690358&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Fri Aug 29 11:18:45 2008
@@ -0,0 +1,58 @@
+#ifndef QPID_CLUSTER_TYPES_H
+#define QPID_CLUSTER_TYPES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <utility>
+#include <iosfwd>
+#include <stdint.h>
+
+extern "C" {
+#include <openais/cpg.h>
+}
+
+namespace qpid {
+namespace cluster {
+
+class Connection;
+
+/** first=node-id, second=pid */
+struct MemberId : std::pair<uint32_t, uint32_t> {
+ MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
+ MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {}
+ uint32_t getNode() const { return first; }
+ uint32_t getPid() const { return second; }
+};
+
+inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id == MemberId(caddr); }
+
+std::ostream& operator<<(std::ostream&, const MemberId&);
+
+struct ConnectionId : public std::pair<MemberId, Connection*> {
+ ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {}
+ MemberId getMember() const { return first; }
+ Connection* getConnectionPtr() const { return second; }
+};
+std::ostream& operator<<(std::ostream&, const ConnectionId&);
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_TYPES_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionCodec.h Fri Aug 29 11:18:45 2008
@@ -22,14 +22,14 @@
*
*/
#include "qpid/framing/ProtocolVersion.h"
-#include "OutputControl.h"
-#include <memory>
-#include <map>
namespace qpid {
namespace sys {
+class InputHandlerFactory;
+class OutputControl;
+
/**
* Interface of coder/decoder for a connection of a specific protocol
* version.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h Fri Aug 29 11:18:45 2008
@@ -33,6 +33,7 @@
public TimeoutHandler, public OutputTask
{
public:
+
virtual void closed() = 0;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionInputHandlerFactory.h Fri Aug 29 11:18:45 2008
@@ -42,7 +42,8 @@
*@param id identify the connection for management purposes.
*/
virtual ConnectionInputHandler* create(ConnectionOutputHandler* out,
- const std::string& id) = 0;
+ const std::string& id,
+ bool isClient) = 0;
virtual ~ConnectionInputHandlerFactory(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Fri Aug 29 11:18:45 2008
@@ -53,12 +53,12 @@
}
~ForkedBroker() {
- try { stop(); } catch(const std::exception& e) {
- QPID_LOG(error, QPID_MSG("Stopping forked broker: " << e.what()));
+ try { kill(); } catch(const std::exception& e) {
+ QPID_LOG(error, QPID_MSG("Killing forked broker: " << e.what()));
}
}
- void stop() {
+ void kill() {
using qpid::ErrnoException;
if (pid == 0) return;
if (::kill(pid, SIGINT) < 0) throw ErrnoException("kill failed");
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Aug 29 11:18:45 2008
@@ -69,8 +69,8 @@
void add();
void setup();
void kill(size_t n) {
- if (n) forkedBrokers[n-1]->stop();
- else broker0.shutdown();
+ if (n) forkedBrokers[n-1].kill();
+ else broker0->broker->shutdown();
}
};
@@ -139,6 +139,14 @@
BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType());
}
+QPID_AUTO_TEST_CASE(testSingletonCluster) {
+ // Test against a singleton cluster, verify basic operation.
+ ClusterFixture cluster(1);
+ Client c(cluster[0]);
+ BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
+ BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
+}
+
QPID_AUTO_TEST_CASE(testWiringReplication) {
ClusterFixture cluster(3);
Client c0(cluster[0]);
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=690358&r1=690357&r2=690358&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Fri Aug 29 11:18:45 2008
@@ -25,19 +25,17 @@
<class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
- <control name = "notify" code="0x1">
- <role name="server" implement="MUST" />
+ <control name = "joined" code="0x1">
<field name="url" type="str16" />
</control>
+ </class>
- <control name="connection-close" code="0x2">
- <role name="server" implement="MUST" />
+ <class name="cluster-connection" code="0x81" label="Qpid clustering extensions.">
+ <control name="deliver-close" code="0x2">
</control>
- <control name="connection-do-output" code="0x3">
- <role name="server" implement="MUST" />
+ <control name="deliver-do-output" code="0x3">
<field name="bytes" type="uint32"/>
</control>
-
</class>
</amqp>