You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/02/25 23:07:30 UTC
svn commit: r916475 - in /qpid/trunk/qpid/cpp/src: qpid/cluster/Cluster.cpp
qpid/cluster/StoreStatus.cpp qpid/cluster/StoreStatus.h
tests/cluster_tests.py
Author: aconway
Date: Thu Feb 25 22:07:30 2010
New Revision: 916475
URL: http://svn.apache.org/viewvc?rev=916475&view=rev
Log:
Last member of a cluster always has clean store.
When a cluster is reduced to a single broker, it marks its store as
clean regardless of how it is shut down. If we're down to a single
member we know we want to use its store to recover as there are no
others.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=916475&r1=916474&r2=916475&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Feb 25 22:07:30 2010
@@ -914,6 +914,12 @@
size_t size = urls.size();
failoverExchange->updateUrls(urls);
+ if (store.hasStore()) {
+ // Mark store clean if I am the only broker, dirty otherwise.
+ if (size == 1) store.clean(Uuid(true));
+ else store.dirty(clusterId);
+ }
+
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=916475&r1=916474&r2=916475&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Thu Feb 25 22:07:30 2010
@@ -114,7 +114,12 @@
}
}
+bool StoreStatus::hasStore() const {
+ return state != framing::cluster::STORE_STATE_NO_STORE;
+}
+
void StoreStatus::dirty(const Uuid& clusterId_) {
+ if (!hasStore()) return;
assert(clusterId_);
clusterId = clusterId_;
shutdownId = Uuid();
@@ -123,6 +128,7 @@
}
void StoreStatus::clean(const Uuid& shutdownId_) {
+ if (!hasStore()) return;
assert(shutdownId_);
state = STORE_STATE_CLEAN_STORE;
shutdownId = shutdownId_;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=916475&r1=916474&r2=916475&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Thu Feb 25 22:07:30 2010
@@ -46,14 +46,14 @@
const Uuid& getShutdownId() const { return shutdownId; }
framing::SequenceNumber getConfigSeq() const { return configSeq; }
- void dirty(const Uuid& start); // Start using the store.
- void clean(const Uuid& stop); // Stop using the store.
+ void dirty(const Uuid& clusterId); // Mark the store in use by clusterId.
+ void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId
void setConfigSeq(framing::SequenceNumber seq); // Update the config seq number.
void load();
void save();
- bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
+ bool hasStore() const;
private:
framing::cluster::StoreState state;
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=916475&r1=916474&r2=916475&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Feb 25 22:07:30 2010
@@ -305,21 +305,47 @@
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
- def test_total_failure(self):
- # Verify we abort with sutiable error message if no clean stores.
- cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
- a.kill()
- b.kill()
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
- self.assertRaises(Exception, lambda: a.ready())
- self.assertRaises(Exception, lambda: b.ready())
+ def assert_dirty_store(self, broker):
+ self.assertRaises(Exception, lambda: broker.ready())
msg = re.compile("critical.*no clean store")
- assert msg.search(readfile(a.log))
- assert msg.search(readfile(b.log))
+ assert msg.search(readfile(broker.log))
+
+ def test_solo_store_clean(self):
+ # A single node cluster should always leave a clean store.
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+ a.send_message("q", Message("x", durable=True))
+ a.kill()
+ a = cluster.start("a")
+ self.assertEqual(a.get_message("q").content, "x")
+
+ def test_last_store_clean(self):
+
+ # Verify that only the last node in a cluster to shut down has
+ # a clean store. Start with cluster of 3, reduce to 1 then
+ # increase again to ensure that a node that was once alone but
+ # finally did not finish as the last node does not get a clean
+ # store.
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
+ c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
+ a.send_message("q", Message("x", durable=True))
+ a.kill()
+ b.kill() # c is last man
+ time.sleep(0.1) # pause for c to find out hes last.
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
+ c.kill() # a is now last man
+ time.sleep(0.1) # pause for a to find out hes last.
+ a.kill() # really last
+ # b & c should be dirty
+ b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK)
+ self.assert_dirty_store(b)
+ c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK)
+ self.assert_dirty_store(c)
+ # a should be clean
+ a = cluster.start("a")
+ self.assertEqual(a.get_message("q").content, "x")
- # FIXME aconway 2009-12-03: verify manual restore procedure
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org