You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/27 23:20:37 UTC
svn commit: r903864 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp
Connection.cpp Connection.h
Author: aconway
Date: Wed Jan 27 22:20:36 2010
New Revision: 903864
URL: http://svn.apache.org/viewvc?rev=903864&view=rev
Log:
In clustered broker: move construction of broker::Connections to the cluster dispatch thread.
Constructing a connection can involve sending management information so needs to be
in the cluster dispatch context.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=903864&r1=903863&r2=903864&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 27 22:20:36 2010
@@ -294,14 +294,8 @@
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- QPID_LOG(info, *this << " new local connection " << c->getId());
- localConnections.insert(c);
assert(c->getId().getMember() == self);
- // Announce the connection to the cluster.
- if (c->isLocalClient())
- mcast.mcastControl(ClusterConnectionAnnounceBody(ProtocolVersion(),
- c->getBrokerConnection().getSSF() ),
- c->getId());
+ localConnections.insert(c);
}
// Called in connection thread to insert an updated shadow connection.
@@ -497,22 +491,18 @@
if (i != connections.end()) return i->second;
ConnectionPtr cp;
// If the frame is an announcement for a new connection, add it.
- if (e.frame.getBody() && e.frame.getMethod() &&
- e.frame.getMethod()->isA<ClusterConnectionAnnounceBody>())
+ const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody());
+ if (e.frame.getBody() && e.frame.getMethod() && announce)
{
if (id.getMember() == self) { // Announces one of my own
cp = localConnections.getErase(id);
- assert(cp);
+ assert(cp);
}
else { // New remote connection, create a shadow.
std::ostringstream mgmtId;
- unsigned int ssf;
- const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody());
-
+ unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
mgmtId << id;
- ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
- QPID_LOG(debug, *this << "new connection's ssf =" << ssf );
- cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf );
+ cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf);
}
connections.insert(ConnectionMap::value_type(id, cp));
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=903864&r1=903863&r2=903864&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jan 27 22:20:36 2010
@@ -37,6 +37,7 @@
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
@@ -73,42 +74,63 @@
// Shadow connection
- Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
- const ConnectionId& id, unsigned int ssf)
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
+ const ConnectionId& id, unsigned int ssf)
: cluster(c), self(id), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), shadowPrefix+logId, ssf), expectProtocolHeader(false),
+ connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf),
+ expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
consumerNumbering(c.getUpdateReceiver().consumerNumbering)
-{ init(); }
+{}
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& logId, MemberId member,
bool isCatchUp, bool isLink, unsigned int ssf
) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(),
- isCatchUp ? shadowPrefix+logId : logId,
- ssf,
- isLink,
- isCatchUp ? ++catchUpId : 0),
- expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self),
+ connectionCtor(&output, cluster.getBroker(),
+ isCatchUp ? shadowPrefix+logId : logId,
+ ssf,
+ isLink,
+ isCatchUp ? ++catchUpId : 0),
+ expectProtocolHeader(isLink),
+ mcastFrameHandler(cluster.getMulticast(), self),
consumerNumbering(c.getUpdateReceiver().consumerNumbering)
-{ init(); }
+{
+ cluster.addLocalConnection(this);
+ if (isLocalClient()) {
+ // Local clients are announced to the cluster
+ // and initialized when the announce is received.
+ QPID_LOG(info, "new client connection " << *this);
+ giveReadCredit(cluster.getSettings().readMax);
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId());
+ }
+ else {
+ // Catch-up connections initialized immediately.
+ assert(catchUp);
+ QPID_LOG(info, "new catch-up connection " << *this);
+ init();
+ }
+}
void Connection::init() {
- QPID_LOG(debug, cluster << " new connection: " << *this);
+ connection = connectionCtor.construct();
+ QPID_LOG(debug, cluster << " initialized connection: " << *this
+ << " ssf=" << connection->getSSF());
if (isLocalClient()) {
- connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order frames from local node
- cluster.addLocalConnection(this);
- giveReadCredit(cluster.getSettings().readMax);
+ // Actively send cluster-order frames from local node
+ connection->setClusterOrderOutput(mcastFrameHandler);
}
- else { // Shadow or catch-up connection
- connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
- connection.setClientThrottling(false); // Disable client throttling, done by active node.
- connection.setShadow(); // Mark the broker connection as a shadow.
+ else { // Shadow or catch-up connection
+ // Passive, discard cluster-order frames
+ connection->setClusterOrderOutput(nullFrameHandler);
+ // Disable client throttling, done by active node.
+ connection->setClientThrottling(false);
+ connection->setShadow(); // Mark the connection as a shadow.
}
if (!isCatchUp())
- connection.setErrorListener(this);
+ connection->setErrorListener(this);
}
void Connection::giveReadCredit(int credit) {
@@ -116,8 +138,13 @@
output.giveReadCredit(credit);
}
+void Connection::announce(uint32_t ssf) {
+ assert(ssf == connectionCtor.ssf);
+ init();
+}
+
Connection::~Connection() {
- connection.setErrorListener(0);
+ if (connection.get()) connection->setErrorListener(0);
QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
@@ -131,14 +158,15 @@
if (isLocal()) { // Local catch-up connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
- connection.received(f);
+
+ connection->received(f);
}
else { // Shadow or updated catch-up connection.
if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
if (isShadow())
cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
- connection.getOutput().send(ok);
+ connection->getOutput().send(ok);
output.closeOutput();
catchUp = false;
}
@@ -155,7 +183,7 @@
}
}
if (!message.empty())
- connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message);
+ connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
return !message.empty();
}
@@ -177,9 +205,9 @@
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
if (f.type == DATA) // incoming data frames to broker::Connection
- connection.received(const_cast<AMQFrame&>(f.frame));
+ connection->received(const_cast<AMQFrame&>(f.frame));
else { // frame control, send frame via SessionState
- broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
+ broker::SessionState* ss = connection->getChannel(currentChannel).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
}
@@ -194,7 +222,7 @@
}
else if (isUpdated()) {
QPID_LOG(debug, cluster << " closed update connection " << *this);
- connection.closed();
+ connection->closed();
}
else if (isLocal()) {
QPID_LOG(debug, cluster << " local close of replicated connection " << *this);
@@ -213,13 +241,13 @@
// Self-delivery of close message, close the connection.
void Connection::deliverClose () {
assert(!catchUp);
- connection.closed();
+ connection->closed();
cluster.erase(self);
}
// The connection has been killed for misbehaving
void Connection::abort() {
- connection.abort();
+ if (connection.get()) connection->abort();
cluster.erase(self);
}
@@ -257,7 +285,7 @@
}
broker::SessionState& Connection::sessionState() {
- return *connection.getChannel(currentChannel).getSession();
+ return *connection->getChannel(currentChannel).getSession();
}
broker::SemanticState& Connection::semanticState() {
@@ -294,26 +322,26 @@
receivedIncomplete);
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
// The output tasks will be added later in the update process.
- connection.getOutputTasks().removeAll();
+ connection->getOutputTasks().removeAll();
}
void Connection::outputTask(uint16_t channel, const std::string& name) {
- broker::SessionState* session = connection.getChannel(channel).getSession();
+ broker::SessionState* session = connection->getChannel(channel).getSession();
if (!session)
throw Exception(QPID_MSG(cluster << " channel not attached " << *this
<< "[" << channel << "] "));
OutputTask* task = &session->getSemanticState().find(name);
- connection.getOutputTasks().addOutputTask(task);
+ connection->getOutputTasks().addOutputTask(task);
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
ConnectionId shadowId = ConnectionId(memberId, connectionId);
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
- connection.setUserId(username);
+ connection->setUserId(username);
// OK to use decoder here because cluster is stalled for update.
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
- connection.setErrorListener(this);
+ connection->setErrorListener(this);
output.setSendMax(sendMax);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=903864&r1=903863&r2=903864&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jan 27 22:20:36 2010
@@ -73,7 +73,7 @@
~Connection();
ConnectionId getId() const { return self; }
- broker::Connection& getBrokerConnection() { return connection; }
+ broker::Connection& getBrokerConnection() { return *connection; }
/** Local connections may be clients or catch-up connections */
bool isLocal() const;
@@ -95,8 +95,8 @@
void received(framing::AMQFrame&);
void closed();
bool doOutput();
- void idleOut() { connection.idleOut(); }
- void idleIn() { connection.idleIn(); }
+ void idleOut() { if (connection.get()) connection->idleOut(); }
+ void idleIn() { if (connection.get()) connection->idleIn(); }
// ConnectionCodec methods - called by IO layer with a read buffer.
size_t decode(const char* buffer, size_t size);
@@ -156,7 +156,7 @@
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
- void announce(uint32_t) {} // handled by Cluster.
+ void announce(uint32_t ssf);
void abort();
void deliverClose();
@@ -165,11 +165,36 @@
void addQueueListener(const std::string& queue, uint32_t listener);
void managementSchema(const std::string& data);
+ uint32_t getSsf() const { return connectionCtor.ssf; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
};
-
+
+ // Arguments to construct a broker::Connection
+ struct ConnectionCtor {
+ sys::ConnectionOutputHandler* out;
+ broker::Broker& broker;
+ std::string mgmtId;
+ unsigned int ssf;
+ bool isLink;
+ uint64_t objectId;
+
+ ConnectionCtor(
+ sys::ConnectionOutputHandler* out_,
+ broker::Broker& broker_,
+ const std::string& mgmtId_,
+ unsigned int ssf_,
+ bool isLink_=false,
+ uint64_t objectId_=0
+ ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_), isLink(isLink_), objectId(objectId_) {}
+
+ std::auto_ptr<broker::Connection> construct() {
+ return std::auto_ptr<broker::Connection>(
+ new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId));
+ }
+ };
static NullFrameHandler nullFrameHandler;
@@ -191,7 +216,8 @@
bool catchUp;
OutputInterceptor output;
framing::FrameDecoder localDecoder;
- broker::Connection connection;
+ ConnectionCtor connectionCtor;
+ std::auto_ptr<broker::Connection> connection;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org