You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/12/19 22:22:51 UTC

svn commit: r1424125 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Broker.cpp Broker.h Connection.cpp Connection.h Exchange.cpp Link.cpp Link.h QueueFlowLimit.cpp SessionHandler.cpp SessionState.cpp SessionState.h

Author: aconway
Date: Wed Dec 19 21:22:50 2012
New Revision: 1424125

URL: http://svn.apache.org/viewvc?rev=1424125&view=rev
Log:
QPID-4514: Remove obsolete cluster code: Broker, Connection, Link.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Dec 19 21:22:50 2012
@@ -215,9 +215,6 @@ Broker::Broker(const Broker::Options& co
         *this),
     queueCleaner(queues, &timer),
     recoveryInProgress(false),
-    recovery(true),
-    inCluster(false),
-    clusterUpdatee(false),
     expiryPolicy(new ExpiryPolicy),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
     deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
@@ -289,18 +286,11 @@ Broker::Broker(const Broker::Options& co
     exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs());
 
     if (store.get() != 0) {
-        // The cluster plug-in will setRecovery(false) on all but the first
-        // broker to join a cluster.
-        if (getRecovery()) {
-            RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry);
-            recoveryInProgress = true;
-            store->recover(recoverer);
-            recoveryInProgress = false;
-        }
-        else {
-            QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
-            store->truncateInit(true); // save old files in subdir
-        }
+        RecoveryManagerImpl recoverer(
+            queues, exchanges, links, dtxManager, protocolRegistry);
+        recoveryInProgress = true;
+        store->recover(recoverer);
+        recoveryInProgress = false;
     }
 
     //ensure standard exchanges exist (done after recovery from store)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Dec 19 21:22:50 2012
@@ -184,8 +184,6 @@ class Broker : public sys::Runnable, pub
                            const Message& msg);
     std::string federationTag;
     bool recoveryInProgress;
-    bool recovery;
-    bool inCluster, clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConsumerFactories consumerFactories;
     ProtocolRegistry protocolRegistry;
@@ -282,22 +280,8 @@ class Broker : public sys::Runnable, pub
 
     static QPID_BROKER_EXTERN const std::string TCP_TRANSPORT;
 
-    void setRecovery(bool set) { recovery = set; }
-    bool getRecovery() const { return recovery; }
     bool inRecovery() const { return recoveryInProgress; }
 
-    /** True of this broker is part of a cluster.
-     * Only valid after early initialization of plugins is complete.
-     */
-    bool isInCluster() const { return inCluster; }
-    void setInCluster(bool set) { inCluster = set; }
-
-    /** True if this broker is joining a cluster and in the process of
-     * receiving a state update.
-     */
-    bool isClusterUpdatee() const { return clusterUpdatee; }
-    void setClusterUpdatee(bool set) { clusterUpdatee = set; }
-
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
 
     /**

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Dec 19 21:22:50 2012
@@ -85,13 +85,10 @@ Connection::Connection(ConnectionOutputH
                        const qpid::sys::SecuritySettings& external,
                        bool link_,
                        uint64_t objectId_,
-                       bool shadow_,
-                       bool delayManagement,
                        bool authenticated_
 ) :
     ConnectionState(out_, broker_),
     securitySettings(external),
-    shadow(shadow_),
     authenticated(authenticated_),
     adapter(*this, link_),
     link(link_),
@@ -106,11 +103,6 @@ Connection::Connection(ConnectionOutputH
 {
     outboundTracker.wrap(out);
     broker.getConnectionObservers().connection(*this);
-    // In a cluster, allow adding the management object to be delayed.
-    if (!delayManagement) addManagementObject();
-}
-
-void Connection::addManagementObject() {
     assert(agent == 0);
     assert(mgmtObject == 0);
     Manageable* parent = broker.GetVhostObject();
@@ -119,7 +111,6 @@ void Connection::addManagementObject() {
         if (agent != 0) {
             // TODO set last bool true if system connection
             mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10"));
-            mgmtObject->set_shadow(shadow);
             agent->addObject(mgmtObject, objectId);
         }
         ConnectionState::setUrl(mgmtId);
@@ -277,20 +268,6 @@ void Connection::notifyConnectionForced(
 void Connection::setUserId(const string& userId)
 {
     ConnectionState::setUserId(userId);
-    // In a cluster, the cluster code will raise the connect event
-    // when the connection is replicated to the cluster.
-    if (!broker.isInCluster()) raiseConnectEvent();
-}
-
-void Connection::raiseConnectEvent() {
-    if (mgmtObject != 0) {
-        mgmtObject->set_authIdentity(userId);
-        agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId, mgmtObject->get_remoteProperties()));
-    }
-
-    QPID_LOG_CAT(debug, model, "Create connection. user:" << userId
-        << " rhost:" << mgmtId );
-
 }
 
 void Connection::setUserProxyAuth(bool b)
@@ -488,7 +465,7 @@ void Connection::abort()
 void Connection::setHeartbeatInterval(uint16_t heartbeat)
 {
     setHeartbeat(heartbeat);
-    if (heartbeat > 0 && !isShadow()) {
+    if (heartbeat > 0) {
         if (!heartbeatTimer) {
             heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
             timer.add(heartbeatTimer);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Wed Dec 19 21:22:50 2012
@@ -84,8 +84,6 @@ class Connection : public sys::Connectio
                const qpid::sys::SecuritySettings&,
                bool isLink = false,
                uint64_t objectId = 0,
-               bool shadow=false,
-               bool delayManagement = false,
                bool authenticated=true);
 
     ~Connection ();
@@ -130,7 +128,6 @@ class Connection : public sys::Connectio
 
     void notifyConnectionForced(const std::string& text);
     void setUserId(const std::string& uid);
-    void raiseConnectEvent();
 
     // credentials for connected client
     const std::string& getUserId() const { return ConnectionState::getUserId(); }
@@ -153,18 +150,9 @@ class Connection : public sys::Connectio
     void sendClose();
     void setSecureConnection(SecureConnection* secured);
 
-    /** True if this is a shadow connection in a cluster. */
-    bool isShadow() const { return shadow; }
-
     /** True if this connection is authenticated */
     bool isAuthenticated() const { return authenticated; }
 
-    // Used by cluster to update connection status
-    sys::AggregateOutput& getOutputTasks() { return outputTasks; }
-
-    /** Cluster delays adding management object in the constructor then calls this. */
-    void addManagementObject();
-
     const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
     {
         return securitySettings;
@@ -176,9 +164,6 @@ class Connection : public sys::Connectio
     bool isLink() { return link; }
     void startLinkHeartbeatTimeoutTask();
 
-    // Used by cluster during catch-up, see cluster::OutputInterceptor
-    void doIoCallbacks();
-
     void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
     const framing::FieldTable& getClientProperties() const { return clientProperties; }
 
@@ -188,7 +173,6 @@ class Connection : public sys::Connectio
 
     ChannelMap channels;
     qpid::sys::SecuritySettings securitySettings;
-    bool shadow;
     bool authenticated;
     ConnectionHandler adapter;
     const bool link;
@@ -228,6 +212,7 @@ class Connection : public sys::Connectio
     OutboundFrameTracker outboundTracker;
 
     void sent(const framing::AMQFrame& f);
+    void doIoCallbacks();
 
   public:
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Wed Dec 19 21:22:50 2012
@@ -210,8 +210,6 @@ Exchange::Exchange(const string& _name, 
 
     ive = _args.get(qpidIVE);
     if (ive) {
-        if (broker && broker->isInCluster())
-            throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster");
         QPID_LOG(debug, "Configured exchange " <<  _name  << " with Initial Value");
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Wed Dec 19 21:22:50 2012
@@ -209,9 +209,6 @@ void Link::setStateLH (int newState)
 
     state = newState;
 
-    if (hideManagement())
-        return;
-
     switch (state)
     {
     case STATE_WAITING     : mgmtObject->set_state("Waiting");     break;
@@ -237,8 +234,7 @@ void Link::startConnectionLH ()
         QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
                  << e.what());
         setStateLH(STATE_WAITING);
-        if (!hideManagement())
-            mgmtObject->set_lastError (e.what());
+        mgmtObject->set_lastError (e.what());
     }
 }
 
@@ -249,7 +245,7 @@ void Link::established(Connection* c)
     addr << host << ":" << port;
     QPID_LOG (info, "Inter-broker link established to " << addr.str());
 
-    if (!hideManagement() && agent)
+    if (agent)
         agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
     bool isClosing = false;
     {
@@ -292,7 +288,7 @@ void Link::opened() {
     Mutex::ScopedLock mutex(lock);
     if (!connection) return;
 
-    if (!hideManagement() && connection->GetManagementObject()) {
+    if (connection->GetManagementObject()) {
         mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
     }
 
@@ -354,13 +350,11 @@ void Link::closed(int, std::string text)
 
     connection = 0;
 
-    if (!hideManagement()) {
-        mgmtObject->set_connectionRef(qpid::management::ObjectId());
-        if (state == STATE_OPERATIONAL && agent) {
-            stringstream addr;
-            addr << host << ":" << port;
+    mgmtObject->set_connectionRef(qpid::management::ObjectId());
+    if (state == STATE_OPERATIONAL && agent) {
+        stringstream addr;
+        addr << host << ":" << port;
             agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
-        }
     }
 
     for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
@@ -372,8 +366,7 @@ void Link::closed(int, std::string text)
     if (state != STATE_FAILED && state != STATE_PASSIVE)
     {
         setStateLH(STATE_WAITING);
-        if (!hideManagement())
-            mgmtObject->set_lastError (text);
+        mgmtObject->set_lastError (text);
     }
 }
 
@@ -514,14 +507,13 @@ void Link::reconnectLH(const Address& a)
     port = a.port;
     transport = a.protocol;
 
-    if (!hideManagement()) {
-        stringstream errorString;
-        errorString << "Failing over to " << a;
-        mgmtObject->set_lastError(errorString.str());
-        mgmtObject->set_host(host);
-        mgmtObject->set_port(port);
-        mgmtObject->set_transport(transport);
-    }
+    stringstream errorString;
+    errorString << "Failing over to " << a;
+    mgmtObject->set_lastError(errorString.str());
+    mgmtObject->set_host(host);
+    mgmtObject->set_port(port);
+    mgmtObject->set_transport(transport);
+
     startConnectionLH();
 }
 
@@ -538,12 +530,6 @@ bool Link::tryFailoverLH() {
     return false;
 }
 
-// Management updates for a link are inconsistent in a cluster, so they are
-// suppressed.
-bool Link::hideManagement() const {
-    return !mgmtObject || ( broker && broker->isInCluster());
-}
-
 // Allocate channel from link free pool
 framing::ChannelId Link::nextChannel()
 {
@@ -585,8 +571,7 @@ void Link::notifyConnectionForced(const 
 {
     Mutex::ScopedLock mutex(lock);
     setStateLH(STATE_FAILED);
-    if (!hideManagement())
-        mgmtObject->set_lastError(text);
+    mgmtObject->set_lastError(text);
 }
 
 void Link::setPersistenceId(uint64_t id) const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Wed Dec 19 21:22:50 2012
@@ -107,7 +107,6 @@ class Link : public PersistableConfig, p
     void destroy();                  // Cleanup connection before link goes away
     void ioThreadProcessing();       // Called on connection's IO thread by request
     bool tryFailoverLH();            // Called during maintenance visit
-    bool hideManagement() const;
     void reconnectLH(const Address&); //called by LinkRegistry
 
     // connection management (called by LinkRegistry)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Wed Dec 19 21:22:50 2012
@@ -129,11 +129,6 @@ void QueueFlowLimit::enqueued(const Mess
     }
 
     if (flowStopped || !index.empty()) {
-        // ignore flow control if we are populating the queue due to cluster replication:
-        if (broker && broker->isClusterUpdatee()) {
-            QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
-            return;
-        }
         QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
         msg.getPersistentContext()->getIngressCompletion().startCompleter();    // don't complete until flow resumes
         bool unique;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Dec 19 21:22:50 2012
@@ -111,7 +111,7 @@ void SessionHandler::attachAs(const std:
     // Delay creating management object till attached(). In a cluster,
     // only the active link broker calls attachAs but all brokers
     // receive the subsequent attached() call.
-    session.reset(new SessionState(connection.getBroker(), *this, id, config, true));
+    session.reset(new SessionState(connection.getBroker(), *this, id, config));
     sendAttach(false);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Wed Dec 19 21:22:50 2012
@@ -53,14 +53,14 @@ namespace _qmf = qmf::org::apache::qpid:
 
 SessionState::SessionState(
     Broker& b, SessionHandler& h, const SessionId& id,
-    const SessionState::Configuration& config, bool delayManagement)
+    const SessionState::Configuration& config)
     : qpid::SessionState(id, config),
       broker(b), handler(&h),
       semanticState(*this),
       adapter(semanticState),
       asyncCommandCompleter(new AsyncCommandCompleter(this))
 {
-    if (!delayManagement) addManagementObject();
+    addManagementObject();
     attach(h);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1424125&r1=1424124&r2=1424125&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Dec 19 21:22:50 2012
@@ -73,7 +73,7 @@ class SessionState : public qpid::Sessio
 {
   public:
     SessionState(Broker&, SessionHandler&, const SessionId&,
-                 const SessionState::Configuration&, bool delayManagement=false);
+                 const SessionState::Configuration&);
     ~SessionState();
     bool isAttached() const { return handler; }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org