You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/01/22 23:53:50 UTC

svn commit: r736841 - in /qpid/trunk/qpid/cpp/src/qpid: broker/ cluster/

Author: gsim
Date: Thu Jan 22 14:53:50 2009
New Revision: 736841

URL: http://svn.apache.org/viewvc?rev=736841&view=rev
Log:
QPID-1567: More changes to make clustering and federation work together

* replicate outgoing link traffic to all nodes

* coordinate amongst nodes so that only one node actually maintains active links
  with the others able to take over if that node fails


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Jan 22 14:53:50 2009
@@ -72,6 +72,7 @@
         if (!args.i_durable)
             agent->addObject(mgmtObject);
     }
+    QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
 }
 
 Bridge::~Bridge() 
@@ -104,10 +105,11 @@
     session->attach(name, false);
     session->commandPoint(0,0);
         
-    if (args.i_srcIsQueue) {
+    if (args.i_srcIsQueue) {        
         peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+        QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
     } else {
         FieldTable queueSettings;
 
@@ -141,6 +143,9 @@
             if (exchange.get() == 0)
                 throw Exception("Exchange not found for dynamic route");
             exchange->registerDynamicBridge(this);
+            QPID_LOG(debug, "Activated dynamic route for exchange " << args.i_src);
+        } else {
+            QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest);
         }
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Thu Jan 22 14:53:50 2009
@@ -117,7 +117,7 @@
     ChannelMap channels;
     //framing::AMQP_ClientProxy::Connection* client;
     ConnectionHandler adapter;
-    bool isLink;
+    const bool isLink;
     bool mgmtClosing;
     const std::string mgmtId;
     boost::function0<void> ioCallback;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Thu Jan 22 14:53:50 2009
@@ -106,6 +106,7 @@
     case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
     case STATE_FAILED      : mgmtObject->set_state("Failed");      break;
     case STATE_CLOSED      : mgmtObject->set_state("Closed");      break;
+    case STATE_PASSIVE     : mgmtObject->set_state("Passive");      break;
     }
 }
 
@@ -239,6 +240,7 @@
 
     if (state != STATE_OPERATIONAL)
         return;
+    QPID_LOG(debug, "Link::ioThreadProcessing()");
 
     //process any pending creates
     if (!created.empty()) {
@@ -404,6 +406,7 @@
 
     case _qmf::Link::METHOD_BRIDGE :
         _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
+        QPID_LOG(debug, "Link::bridge() request received");
 
         // Durable bridges are only valid on durable links
         if (iargs.i_durable && !durable) {
@@ -437,3 +440,17 @@
 
     return Manageable::STATUS_UNKNOWN_METHOD;
 }
+
+void Link::setPassive(bool passive)
+{
+    Mutex::ScopedLock mutex(lock);
+    if (passive) {
+        setStateLH(STATE_PASSIVE);
+    } else {
+        if (state == STATE_PASSIVE) {
+            setStateLH(STATE_WAITING);
+        } else {
+            QPID_LOG(warning, "Ignoring attempt to activate non-passive link");
+        }
+    }
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Thu Jan 22 14:53:50 2009
@@ -76,6 +76,7 @@
             static const int STATE_OPERATIONAL = 3;
             static const int STATE_FAILED      = 4;
             static const int STATE_CLOSED      = 5;
+            static const int STATE_PASSIVE     = 6;
 
             static const uint32_t MAX_INTERVAL = 32;
 
@@ -120,6 +121,7 @@
             Broker* getBroker()       { return broker; }
 
             void notifyConnectionForced(const std::string text);
+            void setPassive(bool p);
             
             // PersistableConfig:
             void     setPersistenceId(uint64_t id) const;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Thu Jan 22 14:53:50 2009
@@ -31,7 +31,7 @@
 
 #define LINK_MAINT_INTERVAL 2
 
-LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
+LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false)
 {
     timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
 }
@@ -51,6 +51,14 @@
 
     linksToDestroy.clear();
     bridgesToDestroy.clear();
+    if (passiveChanged) {
+        if (passive) { QPID_LOG(info, "Passivating links"); }
+        else { QPID_LOG(info, "Activating links"); }
+        for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
+            i->second->setPassive(passive);
+        }
+        passiveChanged = false;
+    }
     for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
         i->second->maintenanceVisit();
     //now process any requests for re-addressing
@@ -109,6 +117,7 @@
         link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
                                            authMechanism, username, password,
                                            broker, parent));
+        if (passive) link->setPassive(true);
         links[key] = link;
         return std::pair<Link::shared_ptr, bool>(link, true);
     }
@@ -129,6 +138,8 @@
                                                      uint16_t     sync)
 {
     Mutex::ScopedLock locker(lock);
+    QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
+
     stringstream      keystream;
     keystream << host << ":" << port;
     string linkKey = string(keystream.str());
@@ -291,3 +302,11 @@
     keystream << a.host << ":" << a.port;
     return string(keystream.str());
 }
+
+void LinkRegistry::setPassive(bool p) 
+{
+    Mutex::ScopedLock locker(lock);
+    passiveChanged = p != passive;
+    passive = p;
+    //will activate or passivate links on maintenance visit
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Thu Jan 22 14:53:50 2009
@@ -64,6 +64,8 @@
         Timer   timer;
         management::Manageable* parent;
         MessageStore* store;
+        bool passive;
+        bool passiveChanged;
 
         void periodicMaintenance ();
         bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress);
@@ -122,7 +124,17 @@
         std::string getAuthCredentials (const std::string& key);
         std::string getAuthIdentity    (const std::string& key);
 
+        /**
+         * Called by links failing over to new address
+         */
         void changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress);
+        /**
+         * Called to alter passive state. In passive state the links
+         * and bridges managed by a link registry will be recorded and
+         * updated but links won't actually establish connections and
+         * bridges won't therefore pull or push any messages.
+         */
+        void setPassive(bool);
     };
 }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Jan 22 14:53:50 2009
@@ -323,10 +323,20 @@
             state = NEWBIE;
             QPID_LOG(info, *this << " joining cluster: " << map);
             mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId);
+            ClusterMap::Set members = map.getAlive();
+            members.erase(myId);
+            myElders = members;
+            broker.getLinks().setPassive(true);
         }
     }
-    else if (state >= READY && memberChange)
+    else if (state >= READY && memberChange) {
         memberUpdate(l);
+        myElders = ClusterMap::intersection(myElders, map.getAlive());
+        if (myElders.empty()) {
+            //assume we are oldest, reactive links if necessary
+            broker.getLinks().setPassive(false);
+        }
+    }
 }
 
 
@@ -496,6 +506,8 @@
     }
     lastSize = size;
 
+    //
+
     if (mgmtObject) {
         mgmtObject->set_clusterSize(size); 
         string urlstr;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Jan 22 14:53:50 2009
@@ -184,6 +184,7 @@
     const size_t writeEstimate;
     framing::Uuid clusterId;
     NoOpConnectionOutputHandler shadowOut;
+    ClusterMap::Set myElders;
 
     // Thread safe members
     Multicaster mcast;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Thu Jan 22 14:53:50 2009
@@ -114,6 +114,10 @@
     return urls;
 }
 
+ClusterMap::Set ClusterMap::getAlive() const {
+    return alive;
+}
+
 std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
     std::ostream_iterator<MemberId> oi(o);
     std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1));
@@ -170,4 +174,13 @@
     return boost::optional<Url>();
 }
 
+ClusterMap::Set ClusterMap::intersection(const ClusterMap::Set& a, const ClusterMap::Set& b)
+{
+    Set intersection;
+    std::set_intersection(a.begin(), a.end(),
+                          b.begin(), b.end(),
+                          std::inserter(intersection, intersection.begin()));
+    return intersection;
+
+}
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Thu Jan 22 14:53:50 2009
@@ -76,6 +76,7 @@
     size_t aliveCount() const { return alive.size(); }
     size_t memberCount() const { return members.size(); }
     std::vector<Url> memberUrls() const;
+    Set getAlive() const;
 
     bool dumpRequest(const MemberId& id, const std::string& url);       
     /** Return non-empty Url if accepted */
@@ -84,6 +85,10 @@
     /**@return true If this is a new member */ 
     bool ready(const MemberId& id, const Url&);
 
+    /**
+     * Utility method to return intersection of two member sets
+     */
+    static Set intersection(const Set& a, const Set& b);
   private:
     Url getUrl(const Map& map, const  MemberId& id);
     

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Jan 22 14:53:50 2009
@@ -62,14 +62,15 @@
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, ConnectionId myId)
     : cluster(c), self(myId), catchUp(false), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
+      connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false)
 { init(); }
 
 // Local connections
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
-                       const std::string& wrappedId, MemberId myId, bool isCatchUp)
+                       const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
     : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
+      connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0),
+      expectProtocolHeader(isLink)
 { init(); }
 
 void Connection::init() {
@@ -213,7 +214,25 @@
     }
     else {                      // Multicast local connections.
         assert(isLocal());
-        cluster.getMulticast().mcastBuffer(buffer, size, self);
+        const char* remainingData = buffer;
+        size_t remainingSize = size;
+        if (expectProtocolHeader) {
+            //If this is an outgoing link, we will receive a protocol
+            //header which needs to be decoded first
+            framing::ProtocolInitiation pi;
+            Buffer buf(const_cast<char*>(buffer), size);
+            if (pi.decode(buf)) {
+                //TODO: check the version is correct
+                QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
+                expectProtocolHeader = false;
+                remainingData = buffer + pi.encodedSize();
+                remainingSize = size - pi.encodedSize();
+            } else {
+                QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link");
+                return 0;
+            }
+        }
+        cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
     }
     return size;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Jan 22 14:53:50 2009
@@ -64,7 +64,7 @@
     typedef sys::PollableQueue<EventFrame> EventFrameQueue;
 
     /** Local connection, use this in ConnectionId */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp);
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink);
     /** Shadow connection */
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
     ~Connection();
@@ -172,6 +172,7 @@
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;
     int readCredit;
+    bool expectProtocolHeader;
     
   friend std::ostream& operator<<(std::ostream&, const Connection&);
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Thu Jan 22 14:53:50 2009
@@ -38,21 +38,22 @@
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
     if (v == ProtocolVersion(0, 10))
-        return new ConnectionCodec(out, id, cluster, false);
+        return new ConnectionCodec(out, id, cluster, false, false);
     else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10))
-        return new ConnectionCodec(out, id, cluster, true); // Catch-up connection
+        return new ConnectionCodec(out, id, cluster, true, false); // Catch-up connection
     return 0;
 }
 
 // Used for outgoing Link connections, we don't care.
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
-    return next->create(out, id);
+    return new ConnectionCodec(out, id, cluster, false, true);
+    //return next->create(out, id);
 }
 
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp)
-    : codec(out, id, false),
-      interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp)),
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink)
+    : codec(out, id, isLink),
+      interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)),
       id(interceptor->getId()),
       localId(id)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=736841&r1=736840&r2=736841&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Thu Jan 22 14:53:50 2009
@@ -56,7 +56,7 @@
         sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
     };
 
-    ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp);
+    ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink);
     ~ConnectionCodec();
 
     // ConnectionCodec functions.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org