You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/04/01 19:51:04 UTC

svn commit: r930055 - in /qpid/trunk/qpid: cpp/src/qpid/cluster/ cpp/src/qpid/framing/ cpp/src/tests/ python/qpid/

Author: aconway
Date: Thu Apr  1 17:51:04 2010
New Revision: 930055

URL: http://svn.apache.org/viewvc?rev=930055&view=rev
Log:
Update cluster store status with a single atomic write() operation.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
    qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp
    qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp
    qpid/trunk/qpid/cpp/src/tests/Uuid.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/testagent.mk
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Apr  1 17:51:04 2010
@@ -297,8 +297,7 @@ Cluster::Cluster(const ClusterSettings& 
     // Load my store status before we go into initialization
     if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
         store.load();
-        if (store.getClusterId())
-            clusterId = store.getClusterId(); // Use stored ID if there is one.
+        clusterId = store.getClusterId(); 
         QPID_LOG(notice, "Cluster store state: " << store)
     }
     cpg.join(name);
@@ -626,7 +625,6 @@ void Cluster::initMapCompleted(Lock& l) 
             QPID_LOG(info, *this << " not active for links.");
         }
         setClusterId(initMap.getClusterId(), l);
-        if (store.hasStore()) store.dirty(clusterId);
 
         if (initMap.isUpdateNeeded())  { // Joining established cluster.
             broker.setRecovery(false); // Ditch my current store.
@@ -919,7 +917,7 @@ void Cluster::updateOutError(const std::
 
 void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) {
     QPID_LOG(notice, *this << " cluster shut down by administrator.");
-    if (store.hasStore()) store.clean(Uuid(id));
+    if (store.hasStore()) store.clean(id);
     leave(l);
 }
 
@@ -967,9 +965,16 @@ void Cluster::memberUpdate(Lock& l) {
     if (store.hasStore()) {
         // Mark store clean if I am the only broker, dirty otherwise.
         if (size == 1 ) {
-            if (!store.isClean()) store.clean(Uuid(true));
-        } else {
-            if (!store.isDirty()) store.dirty(clusterId);
+            if (store.getState() != STORE_STATE_CLEAN_STORE) {
+                QPID_LOG(notice, "Sole member of cluster, marking store clean.");
+                store.clean(Uuid(true));
+            }
+        }
+        else {
+            if (store.getState() != STORE_STATE_DIRTY_STORE) {
+                QPID_LOG(notice, "No longer sole cluster member, marking store dirty.");
+                store.dirty();
+            }
         }
     }
 
@@ -1034,6 +1039,7 @@ broker::Broker& Cluster::getBroker() con
 
 void Cluster::setClusterId(const Uuid& uuid, Lock&) {
     clusterId = uuid;
+    if (store.hasStore()) store.setClusterId(uuid);
     if (mgmtObject) {
         stringstream stream;
         stream << self;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Thu Apr  1 17:51:04 2010
@@ -25,7 +25,9 @@
 #include <boost/filesystem/path.hpp>
 #include <boost/filesystem/fstream.hpp>
 #include <boost/filesystem/operations.hpp>
+#include <boost/scoped_array.hpp>
 #include <fstream>
+#include <sstream>
 
 namespace qpid {
 namespace cluster {
@@ -42,53 +44,29 @@ StoreStatus::StoreStatus(const std::stri
 namespace {
 
 const char* SUBDIR="cluster";
-const char* CLUSTER_ID_FILE="cluster.uuid";
-const char* SHUTDOWN_ID_FILE="shutdown.uuid";
+const char* STORE_STATUS="store.status";
 
-void throw_exceptions(ios& ios) {
-    // Have stream throw an exception on error.
-    ios.exceptions(std::ios::badbit | std::ios::failbit);
-}
-
-Uuid loadUuid(const fs::path& path) {
-    Uuid ret;
-    if (exists(path)) {
-        fs::ifstream i(path);
-        try {
-            throw_exceptions(i);
-            i >> ret;
-        } catch (const std::exception& e) {
-            QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what());
-            throw;
-        }
-    }
-    return ret;
-}
-
-void saveUuid(const fs::path& path, const Uuid& uuid) {
-    fs::ofstream o(path);
-    try {
-        throw_exceptions(o);
-        o << uuid;
-    } catch (const std::exception& e) {
-        QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what());
-        throw;
-    }
-}
-
-framing::SequenceNumber loadSeqNum(const fs::path& path) {
-    uint32_t n = 0;
-    if (exists(path)) {
-        fs::ifstream i(path);
-        try {
-            throw_exceptions(i);
-            i >> n;
-        } catch (const std::exception& e) {
-            QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what());
-            throw;
-        }
-    }
-    return framing::SequenceNumber(n);
+string readFile(const fs::path& path) {
+    fs::ifstream is;
+    is.exceptions(std::ios::badbit | std::ios::failbit);
+    is.open(path);
+    // get length of file:
+    is.seekg (0, ios::end);
+    size_t length = is.tellg();
+    is.seekg (0, ios::beg);
+    // load data
+    boost::scoped_array<char> buffer(new char[length]);
+    is.read(buffer.get(), length);
+    is.close();
+    return string(buffer.get(), length);
+}
+
+void writeFile(const fs::path& path, const string& data) {
+    fs::ofstream os;
+    os.exceptions(std::ios::badbit | std::ios::failbit);
+    os.open(path);
+    os.write(data.data(), data.size());
+    os.close();
 }
 
 } // namespace
@@ -98,14 +76,25 @@ void StoreStatus::load() {
     if (dataDir.empty()) {
         throw Exception(QPID_MSG("No data-dir: When a store is loaded together with clustering, --data-dir must be specified."));
     }
-    fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
     try {
+        fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
         create_directory(dir);
-        clusterId = loadUuid(dir/CLUSTER_ID_FILE);
-        shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE);
-        if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE;
-        else if (clusterId) state = STORE_STATE_DIRTY_STORE;
-        else state = STORE_STATE_EMPTY_STORE;
+        fs::path file = dir/STORE_STATUS;
+        if (fs::exists(file)) {
+            string data = readFile(file);
+            istringstream is(data);
+            is.exceptions(std::ios::badbit | std::ios::failbit);
+            is >> ws >> clusterId >> ws >> shutdownId;
+            if (!clusterId)
+                throw Exception(QPID_MSG("Invalid cluster store state, no cluster-id"));
+            if (shutdownId) state = STORE_STATE_CLEAN_STORE;
+            else state = STORE_STATE_DIRTY_STORE;
+        }
+        else {                  // Starting from empty store
+            clusterId = Uuid(true);
+            save();
+            state = STORE_STATE_EMPTY_STORE;
+        }
     }
     catch (const std::exception&e) {
         throw Exception(QPID_MSG("Cannot load cluster store status: " << e.what()));
@@ -114,13 +103,13 @@ void StoreStatus::load() {
 
 void StoreStatus::save() {
     if (dataDir.empty()) return;
-    fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
     try {
-        create_directory(dir);
-        saveUuid(dir/CLUSTER_ID_FILE, clusterId);
-        saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
+        ostringstream os;
+        os << clusterId << endl << shutdownId << endl;
+        fs::path file = fs::path(dataDir, fs::native)/SUBDIR/STORE_STATUS;
+        writeFile(file, os.str());
     }
-    catch (const std::exception&e) {
+    catch (const std::exception& e) {
         throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what()));
     }
 }
@@ -129,20 +118,27 @@ bool StoreStatus::hasStore() const {
     return state != framing::cluster::STORE_STATE_NO_STORE;
 }
 
-void StoreStatus::dirty(const Uuid& clusterId_) {
-    if (!hasStore()) return;
-    assert(clusterId_);
-    clusterId = clusterId_;
-    shutdownId = Uuid();
+void StoreStatus::dirty() {
+    assert(hasStore());
+    if (shutdownId) {
+        shutdownId = Uuid();
+        save();
+    }
     state = STORE_STATE_DIRTY_STORE;
-    save();
 }
 
 void StoreStatus::clean(const Uuid& shutdownId_) {
-    if (!hasStore()) return;
+    assert(hasStore());
     assert(shutdownId_);
+    if (shutdownId_ != shutdownId) {
+        shutdownId = shutdownId_;
+        save();
+    }
     state = STORE_STATE_CLEAN_STORE;
-    shutdownId = shutdownId_;
+}
+
+void StoreStatus::setClusterId(const Uuid& clusterId_) {
+    clusterId = clusterId_;
     save();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Thu Apr  1 17:51:04 2010
@@ -42,21 +42,19 @@ class StoreStatus
     StoreStatus(const std::string& dir);
 
     framing::cluster::StoreState getState() const { return state; }
-    bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; }
-    bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; }
 
     const Uuid& getClusterId() const { return clusterId; }
+    void setClusterId(const Uuid&);
     const Uuid& getShutdownId() const { return shutdownId; }
 
-    void dirty(const Uuid& clusterId);  // Mark the store in use by clusterId.
-    void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId
-
     void load();
-    void save();
-
+    void dirty();               // Mark the store in use.
+    void clean(const Uuid& shutdownId); // Mark the store clean.
     bool hasStore() const;
 
   private:
+    void save();
+
     framing::cluster::StoreState state;
     Uuid clusterId, shutdownId;
     std::string dataDir;

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp Thu Apr  1 17:51:04 2010
@@ -81,8 +81,10 @@ ostream& operator<<(ostream& out, Uuid u
 istream& operator>>(istream& in, Uuid& uuid) {
     char unparsed[UNPARSED_SIZE + 1] = {0};
     in.get(unparsed, sizeof(unparsed));
-    if (uuid_parse(unparsed, uuid.c_array()) != 0) 
-        in.setstate(ios::failbit);
+    if (!in.fail()) {
+        if (uuid_parse(unparsed, uuid.c_array()) != 0) 
+            in.setstate(ios::failbit);
+    }
     return in;
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp Thu Apr  1 17:51:04 2010
@@ -65,7 +65,8 @@ QPID_AUTO_TEST_CASE(testSaveLoadDirty) {
     Uuid clusterId = Uuid(true);
     StoreStatus ss(TEST_DIR);
     ss.load();
-    ss.dirty(clusterId);
+    ss.setClusterId(clusterId);
+    ss.dirty();
     BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_DIRTY_STORE);
 
     StoreStatus ss2(TEST_DIR);
@@ -81,7 +82,7 @@ QPID_AUTO_TEST_CASE(testSaveLoadClean) {
     Uuid shutdownId = Uuid(true);
     StoreStatus ss(TEST_DIR);
     ss.load();
-    ss.dirty(clusterId);
+    ss.setClusterId(clusterId);
     ss.clean(shutdownId);
     BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_CLEAN_STORE);
 
@@ -99,9 +100,10 @@ QPID_AUTO_TEST_CASE(testMarkDirty) {
     Uuid shutdownId = Uuid(true);
     StoreStatus ss(TEST_DIR);
     ss.load();
-    ss.dirty(clusterId);
+    ss.setClusterId(clusterId);
+    ss.dirty();
     ss.clean(shutdownId);
-    ss.dirty(clusterId);
+    ss.dirty();
     
     StoreStatus ss2(TEST_DIR);
     ss2.load();

Modified: qpid/trunk/qpid/cpp/src/tests/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Uuid.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/Uuid.cpp Thu Apr  1 17:51:04 2010
@@ -50,6 +50,7 @@ QPID_AUTO_TEST_CASE(testUuidCtor) {
 
 boost::array<uint8_t, 16>  sample =  {{'\x1b', '\x4e', '\x28', '\xba', '\x2f', '\xa1', '\x11', '\xd2', '\x88', '\x3f', '\xb9', '\xa7', '\x61', '\xbd', '\xe3', '\xfb'}};
 const string sampleStr("1b4e28ba-2fa1-11d2-883f-b9a761bde3fb");
+const string zeroStr("00000000-0000-0000-0000-000000000000");
 
 QPID_AUTO_TEST_CASE(testUuidIstream) {
     Uuid uuid;
@@ -57,6 +58,12 @@ QPID_AUTO_TEST_CASE(testUuidIstream) {
     in >> uuid;
     BOOST_CHECK(!in.fail());
     BOOST_CHECK(uuid == sample);
+
+    istringstream is(zeroStr);
+    Uuid zero;
+    is >> zero;
+    BOOST_CHECK(!in.fail());
+    BOOST_CHECK_EQUAL(zero, Uuid());
 }
 
 QPID_AUTO_TEST_CASE(testUuidOstream) {
@@ -65,6 +72,23 @@ QPID_AUTO_TEST_CASE(testUuidOstream) {
     out << uuid;
     BOOST_CHECK(out.good());
     BOOST_CHECK_EQUAL(out.str(), sampleStr);
+
+    ostringstream os;
+    os << Uuid();
+    BOOST_CHECK(out.good());
+    BOOST_CHECK_EQUAL(os.str(), zeroStr);
+}
+
+QPID_AUTO_TEST_CASE(testUuidIOstream) {
+    Uuid a(true), b(true);
+    ostringstream os;
+    os << a << endl << b;
+    Uuid aa, bb;
+    istringstream is(os.str());
+    is >> aa >> ws >> bb;
+    BOOST_CHECK(os.good());
+    BOOST_CHECK_EQUAL(a, aa);
+    BOOST_CHECK_EQUAL(b, bb);
 }
 
 QPID_AUTO_TEST_CASE(testUuidEncodeDecode) {

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Apr  1 17:51:04 2010
@@ -310,7 +310,6 @@ class StoreTests(BrokerTest):
         c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True)
         a.send_message("q", Message("4", durable=True))
         a.kill()
-        time.sleep(0.1)   # pause for b to write status.        
         b.kill()
         self.assertEqual(c.get_message("q").content, "4")
         c.send_message("q", Message("clean", durable=True))
@@ -356,11 +355,6 @@ class StoreTests(BrokerTest):
         self.assertRaises(Exception, lambda: a.ready())
         self.assertRaises(Exception, lambda: b.ready())
 
-    def assert_dirty_store(self, broker):
-        assert retry(lambda: os.path.exists(broker.log)), "Missing log file %s"%broker.log
-        msg = re.compile("critical.*no clean store")
-        assert retry(lambda: msg.search(readfile(broker.log))), "Expected dirty store message in %s"%broker.log
-
     def test_solo_store_clean(self):
         # A single node cluster should always leave a clean store.
         cluster = self.cluster(0, self.args())
@@ -378,29 +372,25 @@ class StoreTests(BrokerTest):
         # store.
         cluster = self.cluster(0, self.args())
         a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+        self.assertEqual(a.store_state(), "clean")
         b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
         c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
+        self.assertEqual(b.store_state(), "dirty")
+        self.assertEqual(c.store_state(), "dirty")
+        retry(lambda: a.store_state() == "dirty") 
+
         a.send_message("q", Message("x", durable=True))
         a.kill()
-        # FIXME aconway 2010-03-29: this test has too many sleeps.
-        # Need to tighten up status persistence to be more atomic and less
-        # prone to interruption.
-        time.sleep(0.1)   # pause for b to update status.
-        b.kill()          # c is last man
-        time.sleep(0.1)   # pause for c to find out hes last.
+        b.kill()                # c is last man, will mark store clean
+        retry(lambda: c.store_state() == "clean") 
         a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
-        time.sleep(0.1)   # pause for c to find out hes no longer last.
-        c.kill()          # a is now last man
-        time.sleep(0.1)   # pause for a to find out hes last.
-        a.kill()          # really last, should be clean.
-        # b & c should be dirty
-        b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
-        self.assert_dirty_store(b)
-        c = cluster.start("c", wait=False, expect=EXPECT_EXIT_FAIL)
-        self.assert_dirty_store(c)
-        # a should be clean
-        a = cluster.start("a")
-        self.assertEqual(a.get_message("q").content, "x")
+        retry(lambda: c.store_state() == "dirty") 
+        c.kill()                        # a is now last man
+        retry(lambda: a.store_state() == "clean") 
+        a.kill()
+        self.assertEqual(a.store_state(), "clean")
+        self.assertEqual(b.store_state(), "dirty")
+        self.assertEqual(c.store_state(), "dirty")
 
     def test_restart_clean(self):
         """Verify that we can re-start brokers one by one in a
@@ -426,7 +416,6 @@ class StoreTests(BrokerTest):
         a.send_message("q", Message("x", durable=True))
         a.send_message("q", Message("y", durable=True))
         a.kill()
-        time.sleep(0.1)   # pause for b to write status.
         b.kill()
         a = cluster.start("a")
         self.assertEqual(c.get_message("q").content, "x")

Modified: qpid/trunk/qpid/cpp/src/tests/testagent.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/testagent.mk?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/testagent.mk (original)
+++ qpid/trunk/qpid/cpp/src/tests/testagent.mk Thu Apr  1 17:51:04 2010
@@ -36,7 +36,7 @@ TESTAGENT_GEN_SRC=									\
 
 $(TESTAGENT_GEN_SRC): testagent_gen.timestamp
 
-testagent_gen.timestamp: testagent.xml
+testagent_gen.timestamp: testagent.xml $(mgen_generator)
 	$(QMF_GEN) -o testagent_gen/qmf $(srcdir)/testagent.xml
 	touch testagent_gen.timestamp
 

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Thu Apr  1 17:51:04 2010
@@ -373,6 +373,13 @@ class Broker(Popen):
         try: self.connect().close()
         except: raise RethrownException("Broker %s failed ready test"%self.name)
 
+    def store_state(self):
+        uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()
+        null_uuid="00000000-0000-0000-0000-000000000000\n"
+        if uuids[0] == null_uuid: return "empty"
+        if uuids[1] == null_uuid: return "dirty"
+        return "clean"
+        
 class Cluster:
     """A cluster of brokers in a test."""
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org