You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/02/04 18:04:45 UTC

svn commit: r740793 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Broker.cpp qpid/broker/Broker.h qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/cluster/ClusterPlugin.cpp tests/run_acl_tests

Author: aconway
Date: Wed Feb  4 17:04:45 2009
New Revision: 740793

URL: http://svn.apache.org/viewvc?rev=740793&view=rev
Log:
Cluster sets recovery flag on Broker for first member in cluster.
Disable recovery from local store if the recovery flag is not set.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/tests/run_acl_tests

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Feb  4 17:04:45 2009
@@ -149,6 +149,7 @@
         *this),
     queueCleaner(queues, timer),
     queueEvents(poller),
+    recovery(true),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
     if (conf.enableMgmt) {
@@ -209,11 +210,17 @@
         setStore (new NullMessageStore());
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
-    
+
     if (store.get() != 0) {
-        RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, 
-                                      conf.stagingThreshold);
-        store->recover(recoverer);
+        // The cluster plug-in will setRecovery(false) on all but the first
+        // broker to join a cluster.
+        if (getRecovery()) {
+            RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, 
+                                          conf.stagingThreshold);
+            store->recover(recoverer);
+        }
+        else
+            QPID_LOG(notice, "Recovering from cluster, no recovery from local journal");
     }
 
     //ensure standard exchanges exist (done after recovery from store)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Feb  4 17:04:45 2009
@@ -139,6 +139,8 @@
     std::vector<Url> getKnownBrokersImpl();
     std::string federationTag;
 
+    bool recovery;
+
   public:
 
   
@@ -223,6 +225,9 @@
     boost::function<std::vector<Url> ()> getKnownBrokers;
 
     static const std::string TCP_TRANSPORT;
+
+    void setRecovery(bool set) { recovery = set; }
+    bool getRecovery() const { return recovery; }
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Feb  4 17:04:45 2009
@@ -21,28 +21,30 @@
 #include "UpdateClient.h"
 #include "FailoverExchange.h"
 
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
+#include "qmf/org/apache/qpid/cluster/Package.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/broker/SessionState.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionState.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUpdateRequestBody.h"
-#include "qpid/framing/ClusterReadyBody.h"
 #include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterUpdateOfferBody.h"
-#include "qpid/framing/ClusterShutdownBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
-#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterShutdownBody.h"
+#include "qpid/framing/ClusterUpdateOfferBody.h"
+#include "qpid/framing/ClusterUpdateRequestBody.h"
 #include "qpid/log/Helpers.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/LatencyMetric.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/IdAllocator.h"
+#include "qpid/management/ManagementBroker.h"
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
-#include "qmf/org/apache/qpid/cluster/Package.h"
-#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
+#include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/Thread.h"
 
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
@@ -101,11 +103,28 @@
                       poller),
     connections(*this),
     decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+    initialized(false),
     state(INIT),
     lastSize(0),
     lastBroker(false),
     sequence(0)
 {
+    failoverExchange.reset(new FailoverExchange(this));
+    if (quorum_) quorum.init();
+    cpg.join(name);
+    // pump the CPG dispatch manually till we get initialized. 
+    while (!initialized)
+        cpg.dispatchOne();
+}
+
+Cluster::~Cluster() {
+    if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+}
+
+void Cluster::initialize() {
+    if (myUrl.empty())
+        myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
+    QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
     mAgent = ManagementAgent::Singleton::getInstance();
     if (mAgent != 0){
         _qmf::Package  packageInit(mAgent);
@@ -114,18 +133,11 @@
         mgmtObject->set_status("JOINING");
     }
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
-    failoverExchange.reset(new FailoverExchange(this));
     dispatcher.start();
     deliverEventQueue.start();
     deliverFrameQueue.start();
-    QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
-    if (quorum_) quorum.init();
-    cpg.join(name);
-    broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
-}
-
-Cluster::~Cluster() {
-    if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+    // Add finalizer last for exception safety.
+    broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); 
 }
 
 // Called in connection thread to insert a client connection.
@@ -279,6 +291,11 @@
     cpg_address */*joined*/, int /*nJoined*/)
 {
     Mutex::ScopedLock l(lock);
+    if (state == INIT) {        // First config change.
+        // Recover only if we are first in cluster.
+        broker.setRecovery(nCurrent == 1);
+        initialized = true;
+    }
     QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) 
              << AddrList(left, nLeft, "( ", ")"));
     std::string addresses;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Feb  4 17:04:45 2009
@@ -64,15 +64,17 @@
   public:
     typedef boost::intrusive_ptr<Connection> ConnectionPtr;
     typedef std::vector<ConnectionPtr> Connections;
-    
-    /**
-     * Join a cluster. 
-     */
+
+    /** Construct the cluster in plugin earlyInitialize */ 
     Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
             size_t readMax, size_t writeEstimate);
 
     virtual ~Cluster();
 
+    /** Join the cluster in plugin initialize. Requires transport
+     * plugins to be available.. */
+    void initialize();
+
     // Connection map - called in connection threads.
     void addLocalConnection(const ConnectionPtr&); 
     void addShadowConnection(const ConnectionPtr&); 
@@ -177,7 +179,7 @@
     boost::shared_ptr<sys::Poller> poller;
     Cpg cpg;
     const std::string name;
-    const Url myUrl;
+    Url myUrl;
     const MemberId myId;
     const size_t readMax;
     const size_t writeEstimate;
@@ -197,7 +199,10 @@
 
     // Called only from event delivery thread
     Decoder decoder;
-    
+
+    // Used only during initialization
+    bool initialized;
+
     // Remaining members are protected by lock
     mutable sys::Monitor lock;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Feb  4 17:04:45 2009
@@ -136,13 +136,13 @@
 
     Options* getOptions() { return &options; }
 
-    void initialize(Plugin::Target& target) {
+    void earlyInitialize(Plugin::Target& target) {
         if (values.name.empty()) return; // Only if --cluster-name option was specified.
         Broker* broker = dynamic_cast<Broker*>(&target);
         if (!broker) return;
         cluster = new Cluster(
             values.name,
-            values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
+            values.url.empty() ? Url() : Url(values.url),
             *broker,
             values.quorum,
             values.readMax, values.writeEstimate*1024
@@ -158,7 +158,9 @@
         }
     }
 
-    void earlyInitialize(Plugin::Target&) {}
+    void initialize(Plugin::Target& ) {
+        cluster->initialize();
+    }
 };
 
 static ClusterPlugin instance; // Static initialization.

Modified: qpid/trunk/qpid/cpp/src/tests/run_acl_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_acl_tests?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_acl_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_acl_tests Wed Feb  4 17:04:45 2009
@@ -20,6 +20,7 @@
 #
 
 # Run the acl tests. $srcdir is set by the Makefile.
+set -x
 PYTHON_DIR=$srcdir/../../../python
 DATA_DIR=`pwd`/data_dir
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org