You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/12/19 22:22:51 UTC
svn commit: r1424125 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Broker.cpp
Broker.h Connection.cpp Connection.h Exchange.cpp Link.cpp Link.h
QueueFlowLimit.cpp SessionHandler.cpp SessionState.cpp SessionState.h
Author: aconway
Date: Wed Dec 19 21:22:50 2012
New Revision: 1424125
URL: http://svn.apache.org/viewvc?rev=1424125&view=rev
Log:
QPID-4514: Remove obsolete cluster code: Broker, Connection, Link.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Dec 19 21:22:50 2012
@@ -215,9 +215,6 @@ Broker::Broker(const Broker::Options& co
*this),
queueCleaner(queues, &timer),
recoveryInProgress(false),
- recovery(true),
- inCluster(false),
- clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
@@ -289,18 +286,11 @@ Broker::Broker(const Broker::Options& co
exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs());
if (store.get() != 0) {
- // The cluster plug-in will setRecovery(false) on all but the first
- // broker to join a cluster.
- if (getRecovery()) {
- RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry);
- recoveryInProgress = true;
- store->recover(recoverer);
- recoveryInProgress = false;
- }
- else {
- QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
- store->truncateInit(true); // save old files in subdir
- }
+ RecoveryManagerImpl recoverer(
+ queues, exchanges, links, dtxManager, protocolRegistry);
+ recoveryInProgress = true;
+ store->recover(recoverer);
+ recoveryInProgress = false;
}
//ensure standard exchanges exist (done after recovery from store)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Dec 19 21:22:50 2012
@@ -184,8 +184,6 @@ class Broker : public sys::Runnable, pub
const Message& msg);
std::string federationTag;
bool recoveryInProgress;
- bool recovery;
- bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConsumerFactories consumerFactories;
ProtocolRegistry protocolRegistry;
@@ -282,22 +280,8 @@ class Broker : public sys::Runnable, pub
static QPID_BROKER_EXTERN const std::string TCP_TRANSPORT;
- void setRecovery(bool set) { recovery = set; }
- bool getRecovery() const { return recovery; }
bool inRecovery() const { return recoveryInProgress; }
- /** True of this broker is part of a cluster.
- * Only valid after early initialization of plugins is complete.
- */
- bool isInCluster() const { return inCluster; }
- void setInCluster(bool set) { inCluster = set; }
-
- /** True if this broker is joining a cluster and in the process of
- * receiving a state update.
- */
- bool isClusterUpdatee() const { return clusterUpdatee; }
- void setClusterUpdatee(bool set) { clusterUpdatee = set; }
-
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
/**
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Dec 19 21:22:50 2012
@@ -85,13 +85,10 @@ Connection::Connection(ConnectionOutputH
const qpid::sys::SecuritySettings& external,
bool link_,
uint64_t objectId_,
- bool shadow_,
- bool delayManagement,
bool authenticated_
) :
ConnectionState(out_, broker_),
securitySettings(external),
- shadow(shadow_),
authenticated(authenticated_),
adapter(*this, link_),
link(link_),
@@ -106,11 +103,6 @@ Connection::Connection(ConnectionOutputH
{
outboundTracker.wrap(out);
broker.getConnectionObservers().connection(*this);
- // In a cluster, allow adding the management object to be delayed.
- if (!delayManagement) addManagementObject();
-}
-
-void Connection::addManagementObject() {
assert(agent == 0);
assert(mgmtObject == 0);
Manageable* parent = broker.GetVhostObject();
@@ -119,7 +111,6 @@ void Connection::addManagementObject() {
if (agent != 0) {
// TODO set last bool true if system connection
mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10"));
- mgmtObject->set_shadow(shadow);
agent->addObject(mgmtObject, objectId);
}
ConnectionState::setUrl(mgmtId);
@@ -277,20 +268,6 @@ void Connection::notifyConnectionForced(
void Connection::setUserId(const string& userId)
{
ConnectionState::setUserId(userId);
- // In a cluster, the cluster code will raise the connect event
- // when the connection is replicated to the cluster.
- if (!broker.isInCluster()) raiseConnectEvent();
-}
-
-void Connection::raiseConnectEvent() {
- if (mgmtObject != 0) {
- mgmtObject->set_authIdentity(userId);
- agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId, mgmtObject->get_remoteProperties()));
- }
-
- QPID_LOG_CAT(debug, model, "Create connection. user:" << userId
- << " rhost:" << mgmtId );
-
}
void Connection::setUserProxyAuth(bool b)
@@ -488,7 +465,7 @@ void Connection::abort()
void Connection::setHeartbeatInterval(uint16_t heartbeat)
{
setHeartbeat(heartbeat);
- if (heartbeat > 0 && !isShadow()) {
+ if (heartbeat > 0) {
if (!heartbeatTimer) {
heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
timer.add(heartbeatTimer);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Wed Dec 19 21:22:50 2012
@@ -84,8 +84,6 @@ class Connection : public sys::Connectio
const qpid::sys::SecuritySettings&,
bool isLink = false,
uint64_t objectId = 0,
- bool shadow=false,
- bool delayManagement = false,
bool authenticated=true);
~Connection ();
@@ -130,7 +128,6 @@ class Connection : public sys::Connectio
void notifyConnectionForced(const std::string& text);
void setUserId(const std::string& uid);
- void raiseConnectEvent();
// credentials for connected client
const std::string& getUserId() const { return ConnectionState::getUserId(); }
@@ -153,18 +150,9 @@ class Connection : public sys::Connectio
void sendClose();
void setSecureConnection(SecureConnection* secured);
- /** True if this is a shadow connection in a cluster. */
- bool isShadow() const { return shadow; }
-
/** True if this connection is authenticated */
bool isAuthenticated() const { return authenticated; }
- // Used by cluster to update connection status
- sys::AggregateOutput& getOutputTasks() { return outputTasks; }
-
- /** Cluster delays adding management object in the constructor then calls this. */
- void addManagementObject();
-
const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
{
return securitySettings;
@@ -176,9 +164,6 @@ class Connection : public sys::Connectio
bool isLink() { return link; }
void startLinkHeartbeatTimeoutTask();
- // Used by cluster during catch-up, see cluster::OutputInterceptor
- void doIoCallbacks();
-
void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
const framing::FieldTable& getClientProperties() const { return clientProperties; }
@@ -188,7 +173,6 @@ class Connection : public sys::Connectio
ChannelMap channels;
qpid::sys::SecuritySettings securitySettings;
- bool shadow;
bool authenticated;
ConnectionHandler adapter;
const bool link;
@@ -228,6 +212,7 @@ class Connection : public sys::Connectio
OutboundFrameTracker outboundTracker;
void sent(const framing::AMQFrame& f);
+ void doIoCallbacks();
public:
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Wed Dec 19 21:22:50 2012
@@ -210,8 +210,6 @@ Exchange::Exchange(const string& _name,
ive = _args.get(qpidIVE);
if (ive) {
- if (broker && broker->isInCluster())
- throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster");
QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value");
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Wed Dec 19 21:22:50 2012
@@ -209,9 +209,6 @@ void Link::setStateLH (int newState)
state = newState;
- if (hideManagement())
- return;
-
switch (state)
{
case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
@@ -237,8 +234,7 @@ void Link::startConnectionLH ()
QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
<< e.what());
setStateLH(STATE_WAITING);
- if (!hideManagement())
- mgmtObject->set_lastError (e.what());
+ mgmtObject->set_lastError (e.what());
}
}
@@ -249,7 +245,7 @@ void Link::established(Connection* c)
addr << host << ":" << port;
QPID_LOG (info, "Inter-broker link established to " << addr.str());
- if (!hideManagement() && agent)
+ if (agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
bool isClosing = false;
{
@@ -292,7 +288,7 @@ void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
- if (!hideManagement() && connection->GetManagementObject()) {
+ if (connection->GetManagementObject()) {
mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
}
@@ -354,13 +350,11 @@ void Link::closed(int, std::string text)
connection = 0;
- if (!hideManagement()) {
- mgmtObject->set_connectionRef(qpid::management::ObjectId());
- if (state == STATE_OPERATIONAL && agent) {
- stringstream addr;
- addr << host << ":" << port;
+ mgmtObject->set_connectionRef(qpid::management::ObjectId());
+ if (state == STATE_OPERATIONAL && agent) {
+ stringstream addr;
+ addr << host << ":" << port;
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
- }
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
@@ -372,8 +366,7 @@ void Link::closed(int, std::string text)
if (state != STATE_FAILED && state != STATE_PASSIVE)
{
setStateLH(STATE_WAITING);
- if (!hideManagement())
- mgmtObject->set_lastError (text);
+ mgmtObject->set_lastError (text);
}
}
@@ -514,14 +507,13 @@ void Link::reconnectLH(const Address& a)
port = a.port;
transport = a.protocol;
- if (!hideManagement()) {
- stringstream errorString;
- errorString << "Failing over to " << a;
- mgmtObject->set_lastError(errorString.str());
- mgmtObject->set_host(host);
- mgmtObject->set_port(port);
- mgmtObject->set_transport(transport);
- }
+ stringstream errorString;
+ errorString << "Failing over to " << a;
+ mgmtObject->set_lastError(errorString.str());
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
+
startConnectionLH();
}
@@ -538,12 +530,6 @@ bool Link::tryFailoverLH() {
return false;
}
-// Management updates for a link are inconsistent in a cluster, so they are
-// suppressed.
-bool Link::hideManagement() const {
- return !mgmtObject || ( broker && broker->isInCluster());
-}
-
// Allocate channel from link free pool
framing::ChannelId Link::nextChannel()
{
@@ -585,8 +571,7 @@ void Link::notifyConnectionForced(const
{
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_FAILED);
- if (!hideManagement())
- mgmtObject->set_lastError(text);
+ mgmtObject->set_lastError(text);
}
void Link::setPersistenceId(uint64_t id) const
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Wed Dec 19 21:22:50 2012
@@ -107,7 +107,6 @@ class Link : public PersistableConfig, p
void destroy(); // Cleanup connection before link goes away
void ioThreadProcessing(); // Called on connection's IO thread by request
bool tryFailoverLH(); // Called during maintenance visit
- bool hideManagement() const;
void reconnectLH(const Address&); //called by LinkRegistry
// connection management (called by LinkRegistry)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Wed Dec 19 21:22:50 2012
@@ -129,11 +129,6 @@ void QueueFlowLimit::enqueued(const Mess
}
if (flowStopped || !index.empty()) {
- // ignore flow control if we are populating the queue due to cluster replication:
- if (broker && broker->isClusterUpdatee()) {
- QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
- return;
- }
QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
msg.getPersistentContext()->getIngressCompletion().startCompleter(); // don't complete until flow resumes
bool unique;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Dec 19 21:22:50 2012
@@ -111,7 +111,7 @@ void SessionHandler::attachAs(const std:
// Delay creating management object till attached(). In a cluster,
// only the active link broker calls attachAs but all brokers
// receive the subsequent attached() call.
- session.reset(new SessionState(connection.getBroker(), *this, id, config, true));
+ session.reset(new SessionState(connection.getBroker(), *this, id, config));
sendAttach(false);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Dec 19 21:22:50 2012
@@ -53,14 +53,14 @@ namespace _qmf = qmf::org::apache::qpid:
SessionState::SessionState(
Broker& b, SessionHandler& h, const SessionId& id,
- const SessionState::Configuration& config, bool delayManagement)
+ const SessionState::Configuration& config)
: qpid::SessionState(id, config),
broker(b), handler(&h),
semanticState(*this),
adapter(semanticState),
asyncCommandCompleter(new AsyncCommandCompleter(this))
{
- if (!delayManagement) addManagementObject();
+ addManagementObject();
attach(h);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Dec 19 21:22:50 2012
@@ -73,7 +73,7 @@ class SessionState : public qpid::Sessio
{
public:
SessionState(Broker&, SessionHandler&, const SessionId&,
- const SessionState::Configuration&, bool delayManagement=false);
+ const SessionState::Configuration&);
~SessionState();
bool isAttached() const { return handler; }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org