You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/02/04 18:04:45 UTC
svn commit: r740793 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Broker.cpp
qpid/broker/Broker.h qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h
qpid/cluster/ClusterPlugin.cpp tests/run_acl_tests
Author: aconway
Date: Wed Feb 4 17:04:45 2009
New Revision: 740793
URL: http://svn.apache.org/viewvc?rev=740793&view=rev
Log:
Cluster sets recovery flag on Broker for first member in cluster.
Disable recovery from local store if the recovery flag is not set.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
qpid/trunk/qpid/cpp/src/tests/run_acl_tests
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Feb 4 17:04:45 2009
@@ -149,6 +149,7 @@
*this),
queueCleaner(queues, timer),
queueEvents(poller),
+ recovery(true),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (conf.enableMgmt) {
@@ -209,11 +210,17 @@
setStore (new NullMessageStore());
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
-
+
if (store.get() != 0) {
- RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
- conf.stagingThreshold);
- store->recover(recoverer);
+ // The cluster plug-in will setRecovery(false) on all but the first
+ // broker to join a cluster.
+ if (getRecovery()) {
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
+ conf.stagingThreshold);
+ store->recover(recoverer);
+ }
+ else
+ QPID_LOG(notice, "Recovering from cluster, no recovery from local journal");
}
//ensure standard exchanges exist (done after recovery from store)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Feb 4 17:04:45 2009
@@ -139,6 +139,8 @@
std::vector<Url> getKnownBrokersImpl();
std::string federationTag;
+ bool recovery;
+
public:
@@ -223,6 +225,9 @@
boost::function<std::vector<Url> ()> getKnownBrokers;
static const std::string TCP_TRANSPORT;
+
+ void setRecovery(bool set) { recovery = set; }
+ bool getRecovery() const { return recovery; }
};
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Feb 4 17:04:45 2009
@@ -21,28 +21,30 @@
#include "UpdateClient.h"
#include "FailoverExchange.h"
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
+#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterUpdateRequestBody.h"
-#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterUpdateOfferBody.h"
-#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
-#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterShutdownBody.h"
+#include "qpid/framing/ClusterUpdateOfferBody.h"
+#include "qpid/framing/ClusterUpdateRequestBody.h"
#include "qpid/log/Helpers.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/LatencyMetric.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/IdAllocator.h"
+#include "qpid/management/ManagementBroker.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
-#include "qmf/org/apache/qpid/cluster/Package.h"
-#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
+#include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/Thread.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -101,11 +103,28 @@
poller),
connections(*this),
decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+ initialized(false),
state(INIT),
lastSize(0),
lastBroker(false),
sequence(0)
{
+ failoverExchange.reset(new FailoverExchange(this));
+ if (quorum_) quorum.init();
+ cpg.join(name);
+ // pump the CPG dispatch manually till we get initialized.
+ while (!initialized)
+ cpg.dispatchOne();
+}
+
+Cluster::~Cluster() {
+ if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+}
+
+void Cluster::initialize() {
+ if (myUrl.empty())
+ myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
+ QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
_qmf::Package packageInit(mAgent);
@@ -114,18 +133,11 @@
mgmtObject->set_status("JOINING");
}
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
- failoverExchange.reset(new FailoverExchange(this));
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
- QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
- if (quorum_) quorum.init();
- cpg.join(name);
- broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety.
-}
-
-Cluster::~Cluster() {
- if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+ // Add finalizer last for exception safety.
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
}
// Called in connection thread to insert a client connection.
@@ -279,6 +291,11 @@
cpg_address */*joined*/, int /*nJoined*/)
{
Mutex::ScopedLock l(lock);
+ if (state == INIT) { // First config change.
+ // Recover only if we are first in cluster.
+ broker.setRecovery(nCurrent == 1);
+ initialized = true;
+ }
QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)
<< AddrList(left, nLeft, "( ", ")"));
std::string addresses;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Feb 4 17:04:45 2009
@@ -64,15 +64,17 @@
public:
typedef boost::intrusive_ptr<Connection> ConnectionPtr;
typedef std::vector<ConnectionPtr> Connections;
-
- /**
- * Join a cluster.
- */
+
+ /** Construct the cluster in plugin earlyInitialize */
Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
size_t readMax, size_t writeEstimate);
virtual ~Cluster();
+ /** Join the cluster in plugin initialize. Requires transport
+ * plugins to be available.. */
+ void initialize();
+
// Connection map - called in connection threads.
void addLocalConnection(const ConnectionPtr&);
void addShadowConnection(const ConnectionPtr&);
@@ -177,7 +179,7 @@
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
const std::string name;
- const Url myUrl;
+ Url myUrl;
const MemberId myId;
const size_t readMax;
const size_t writeEstimate;
@@ -197,7 +199,10 @@
// Called only from event delivery thread
Decoder decoder;
-
+
+ // Used only during initialization
+ bool initialized;
+
// Remaining members are protected by lock
mutable sys::Monitor lock;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Feb 4 17:04:45 2009
@@ -136,13 +136,13 @@
Options* getOptions() { return &options; }
- void initialize(Plugin::Target& target) {
+ void earlyInitialize(Plugin::Target& target) {
if (values.name.empty()) return; // Only if --cluster-name option was specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
cluster = new Cluster(
values.name,
- values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
+ values.url.empty() ? Url() : Url(values.url),
*broker,
values.quorum,
values.readMax, values.writeEstimate*1024
@@ -158,7 +158,9 @@
}
}
- void earlyInitialize(Plugin::Target&) {}
+ void initialize(Plugin::Target& ) {
+ cluster->initialize();
+ }
};
static ClusterPlugin instance; // Static initialization.
Modified: qpid/trunk/qpid/cpp/src/tests/run_acl_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_acl_tests?rev=740793&r1=740792&r2=740793&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_acl_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_acl_tests Wed Feb 4 17:04:45 2009
@@ -20,6 +20,7 @@
#
# Run the acl tests. $srcdir is set by the Makefile.
+set -x
PYTHON_DIR=$srcdir/../../../python
DATA_DIR=`pwd`/data_dir
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org