You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/02/25 23:07:30 UTC

svn commit: r916475 - in /qpid/trunk/qpid/cpp/src: qpid/cluster/Cluster.cpp qpid/cluster/StoreStatus.cpp qpid/cluster/StoreStatus.h tests/cluster_tests.py

Author: aconway
Date: Thu Feb 25 22:07:30 2010
New Revision: 916475

URL: http://svn.apache.org/viewvc?rev=916475&view=rev
Log:
Last member of a cluster always has clean store.

When a cluster is reduced to a single broker, it marks its store as
clean regardless of how it is shut down. If we're down to a single
member we know we want to use its store to recover as there are no
others.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=916475&r1=916474&r2=916475&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Feb 25 22:07:30 2010
@@ -914,6 +914,12 @@
     size_t size = urls.size();
     failoverExchange->updateUrls(urls);
 
+    if (store.hasStore()) {
+        // Mark store clean if I am the only broker, dirty otherwise.
+        if (size == 1) store.clean(Uuid(true));
+        else store.dirty(clusterId);
+    }
+
     if (size == 1 && lastSize > 1 && state >= CATCHUP) {
         QPID_LOG(notice, *this << " last broker standing, update queue policies");
         lastBroker = true;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=916475&r1=916474&r2=916475&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Thu Feb 25 22:07:30 2010
@@ -114,7 +114,12 @@
     }
 }
 
+bool StoreStatus::hasStore() const {
+    return state != framing::cluster::STORE_STATE_NO_STORE;
+}
+
 void StoreStatus::dirty(const Uuid& clusterId_) {
+    if (!hasStore()) return;
     assert(clusterId_);
     clusterId = clusterId_;
     shutdownId = Uuid();
@@ -123,6 +128,7 @@
 }
 
 void StoreStatus::clean(const Uuid& shutdownId_) {
+    if (!hasStore()) return;
     assert(shutdownId_);
     state = STORE_STATE_CLEAN_STORE;
     shutdownId = shutdownId_;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=916475&r1=916474&r2=916475&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Thu Feb 25 22:07:30 2010
@@ -46,14 +46,14 @@
     const Uuid& getShutdownId() const { return shutdownId; }
     framing::SequenceNumber getConfigSeq() const { return configSeq; }
 
-    void dirty(const Uuid& start); // Start using the store.
-    void clean(const Uuid& stop); // Stop using the store.
+    void dirty(const Uuid& clusterId);  // Mark the store in use by clusterId.
+    void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId
     void setConfigSeq(framing::SequenceNumber seq); // Update the config seq number.
 
     void load();
     void save();
 
-    bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
+    bool hasStore() const;
 
   private:
     framing::cluster::StoreState state;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=916475&r1=916474&r2=916475&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Feb 25 22:07:30 2010
@@ -305,21 +305,47 @@
         self.assertRaises(Exception, lambda: a.ready())
         self.assertRaises(Exception, lambda: b.ready())
 
-    def test_total_failure(self):
-        # Verify we abort with sutiable error message if no clean stores.
-        cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
-        a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
-        b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
-        a.kill()
-        b.kill()
-        a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
-        b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
-        self.assertRaises(Exception, lambda: a.ready())
-        self.assertRaises(Exception, lambda: b.ready())
+    def assert_dirty_store(self, broker):
+        self.assertRaises(Exception, lambda: broker.ready())
         msg = re.compile("critical.*no clean store")
-        assert msg.search(readfile(a.log))
-        assert msg.search(readfile(b.log))
+        assert msg.search(readfile(broker.log))
+
+    def test_solo_store_clean(self):
+        # A single node cluster should always leave a clean store.
+        cluster = self.cluster(0, self.args())
+        a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+        a.send_message("q", Message("x", durable=True))
+        a.kill()
+        a = cluster.start("a")
+        self.assertEqual(a.get_message("q").content, "x")
+
+    def test_last_store_clean(self):
+
+        # Verify that only the last node in a cluster to shut down has
+        # a clean store. Start with cluster of 3, reduce to 1 then
+        # increase again to ensure that a node that was once alone but
+        # finally did not finish as the last node does not get a clean
+        # store.
+        cluster = self.cluster(0, self.args())
+        a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+        b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
+        c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
+        a.send_message("q", Message("x", durable=True))
+        a.kill()
+        b.kill()          # c is last man
+        time.sleep(0.1)   # pause for c to find out hes last.
+        a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
+        c.kill()          # a is now last man
+        time.sleep(0.1)   # pause for a to find out hes last.
+        a.kill()          # really last
+        # b & c should be dirty
+        b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK)
+        self.assert_dirty_store(b)
+        c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK)
+        self.assert_dirty_store(c)
+        # a should be clean
+        a = cluster.start("a")
+        self.assertEqual(a.get_message("q").content, "x")
 
-        # FIXME aconway 2009-12-03: verify manual restore procedure
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org