You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/04/01 19:51:04 UTC
svn commit: r930055 - in /qpid/trunk/qpid: cpp/src/qpid/cluster/
cpp/src/qpid/framing/ cpp/src/tests/ python/qpid/
Author: aconway
Date: Thu Apr 1 17:51:04 2010
New Revision: 930055
URL: http://svn.apache.org/viewvc?rev=930055&view=rev
Log:
Update cluster store status with a single atomic write() operation.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp
qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp
qpid/trunk/qpid/cpp/src/tests/Uuid.cpp
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/cpp/src/tests/testagent.mk
qpid/trunk/qpid/python/qpid/brokertest.py
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Apr 1 17:51:04 2010
@@ -297,8 +297,7 @@ Cluster::Cluster(const ClusterSettings&
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- if (store.getClusterId())
- clusterId = store.getClusterId(); // Use stored ID if there is one.
+ clusterId = store.getClusterId();
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
@@ -626,7 +625,6 @@ void Cluster::initMapCompleted(Lock& l)
QPID_LOG(info, *this << " not active for links.");
}
setClusterId(initMap.getClusterId(), l);
- if (store.hasStore()) store.dirty(clusterId);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
@@ -919,7 +917,7 @@ void Cluster::updateOutError(const std::
void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) {
QPID_LOG(notice, *this << " cluster shut down by administrator.");
- if (store.hasStore()) store.clean(Uuid(id));
+ if (store.hasStore()) store.clean(id);
leave(l);
}
@@ -967,9 +965,16 @@ void Cluster::memberUpdate(Lock& l) {
if (store.hasStore()) {
// Mark store clean if I am the only broker, dirty otherwise.
if (size == 1 ) {
- if (!store.isClean()) store.clean(Uuid(true));
- } else {
- if (!store.isDirty()) store.dirty(clusterId);
+ if (store.getState() != STORE_STATE_CLEAN_STORE) {
+ QPID_LOG(notice, "Sole member of cluster, marking store clean.");
+ store.clean(Uuid(true));
+ }
+ }
+ else {
+ if (store.getState() != STORE_STATE_DIRTY_STORE) {
+ QPID_LOG(notice, "No longer sole cluster member, marking store dirty.");
+ store.dirty();
+ }
}
}
@@ -1034,6 +1039,7 @@ broker::Broker& Cluster::getBroker() con
void Cluster::setClusterId(const Uuid& uuid, Lock&) {
clusterId = uuid;
+ if (store.hasStore()) store.setClusterId(uuid);
if (mgmtObject) {
stringstream stream;
stream << self;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Thu Apr 1 17:51:04 2010
@@ -25,7 +25,9 @@
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/operations.hpp>
+#include <boost/scoped_array.hpp>
#include <fstream>
+#include <sstream>
namespace qpid {
namespace cluster {
@@ -42,53 +44,29 @@ StoreStatus::StoreStatus(const std::stri
namespace {
const char* SUBDIR="cluster";
-const char* CLUSTER_ID_FILE="cluster.uuid";
-const char* SHUTDOWN_ID_FILE="shutdown.uuid";
+const char* STORE_STATUS="store.status";
-void throw_exceptions(ios& ios) {
- // Have stream throw an exception on error.
- ios.exceptions(std::ios::badbit | std::ios::failbit);
-}
-
-Uuid loadUuid(const fs::path& path) {
- Uuid ret;
- if (exists(path)) {
- fs::ifstream i(path);
- try {
- throw_exceptions(i);
- i >> ret;
- } catch (const std::exception& e) {
- QPID_LOG(error, "Cant load UUID from " << path.string() << ": " << e.what());
- throw;
- }
- }
- return ret;
-}
-
-void saveUuid(const fs::path& path, const Uuid& uuid) {
- fs::ofstream o(path);
- try {
- throw_exceptions(o);
- o << uuid;
- } catch (const std::exception& e) {
- QPID_LOG(error, "Cant save UUID to " << path.string() << ": " << e.what());
- throw;
- }
-}
-
-framing::SequenceNumber loadSeqNum(const fs::path& path) {
- uint32_t n = 0;
- if (exists(path)) {
- fs::ifstream i(path);
- try {
- throw_exceptions(i);
- i >> n;
- } catch (const std::exception& e) {
- QPID_LOG(error, "Cant load sequence number from " << path.string() << ": " << e.what());
- throw;
- }
- }
- return framing::SequenceNumber(n);
+string readFile(const fs::path& path) {
+ fs::ifstream is;
+ is.exceptions(std::ios::badbit | std::ios::failbit);
+ is.open(path);
+ // get length of file:
+ is.seekg (0, ios::end);
+ size_t length = is.tellg();
+ is.seekg (0, ios::beg);
+ // load data
+ boost::scoped_array<char> buffer(new char[length]);
+ is.read(buffer.get(), length);
+ is.close();
+ return string(buffer.get(), length);
+}
+
+void writeFile(const fs::path& path, const string& data) {
+ fs::ofstream os;
+ os.exceptions(std::ios::badbit | std::ios::failbit);
+ os.open(path);
+ os.write(data.data(), data.size());
+ os.close();
}
} // namespace
@@ -98,14 +76,25 @@ void StoreStatus::load() {
if (dataDir.empty()) {
throw Exception(QPID_MSG("No data-dir: When a store is loaded together with clustering, --data-dir must be specified."));
}
- fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
try {
+ fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
create_directory(dir);
- clusterId = loadUuid(dir/CLUSTER_ID_FILE);
- shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE);
- if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE;
- else if (clusterId) state = STORE_STATE_DIRTY_STORE;
- else state = STORE_STATE_EMPTY_STORE;
+ fs::path file = dir/STORE_STATUS;
+ if (fs::exists(file)) {
+ string data = readFile(file);
+ istringstream is(data);
+ is.exceptions(std::ios::badbit | std::ios::failbit);
+ is >> ws >> clusterId >> ws >> shutdownId;
+ if (!clusterId)
+ throw Exception(QPID_MSG("Invalid cluster store state, no cluster-id"));
+ if (shutdownId) state = STORE_STATE_CLEAN_STORE;
+ else state = STORE_STATE_DIRTY_STORE;
+ }
+ else { // Starting from empty store
+ clusterId = Uuid(true);
+ save();
+ state = STORE_STATE_EMPTY_STORE;
+ }
}
catch (const std::exception&e) {
throw Exception(QPID_MSG("Cannot load cluster store status: " << e.what()));
@@ -114,13 +103,13 @@ void StoreStatus::load() {
void StoreStatus::save() {
if (dataDir.empty()) return;
- fs::path dir = fs::path(dataDir, fs::native)/SUBDIR;
try {
- create_directory(dir);
- saveUuid(dir/CLUSTER_ID_FILE, clusterId);
- saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
+ ostringstream os;
+ os << clusterId << endl << shutdownId << endl;
+ fs::path file = fs::path(dataDir, fs::native)/SUBDIR/STORE_STATUS;
+ writeFile(file, os.str());
}
- catch (const std::exception&e) {
+ catch (const std::exception& e) {
throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what()));
}
}
@@ -129,20 +118,27 @@ bool StoreStatus::hasStore() const {
return state != framing::cluster::STORE_STATE_NO_STORE;
}
-void StoreStatus::dirty(const Uuid& clusterId_) {
- if (!hasStore()) return;
- assert(clusterId_);
- clusterId = clusterId_;
- shutdownId = Uuid();
+void StoreStatus::dirty() {
+ assert(hasStore());
+ if (shutdownId) {
+ shutdownId = Uuid();
+ save();
+ }
state = STORE_STATE_DIRTY_STORE;
- save();
}
void StoreStatus::clean(const Uuid& shutdownId_) {
- if (!hasStore()) return;
+ assert(hasStore());
assert(shutdownId_);
+ if (shutdownId_ != shutdownId) {
+ shutdownId = shutdownId_;
+ save();
+ }
state = STORE_STATE_CLEAN_STORE;
- shutdownId = shutdownId_;
+}
+
+void StoreStatus::setClusterId(const Uuid& clusterId_) {
+ clusterId = clusterId_;
save();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Thu Apr 1 17:51:04 2010
@@ -42,21 +42,19 @@ class StoreStatus
StoreStatus(const std::string& dir);
framing::cluster::StoreState getState() const { return state; }
- bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; }
- bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; }
const Uuid& getClusterId() const { return clusterId; }
+ void setClusterId(const Uuid&);
const Uuid& getShutdownId() const { return shutdownId; }
- void dirty(const Uuid& clusterId); // Mark the store in use by clusterId.
- void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId
-
void load();
- void save();
-
+ void dirty(); // Mark the store in use.
+ void clean(const Uuid& shutdownId); // Mark the store clean.
bool hasStore() const;
private:
+ void save();
+
framing::cluster::StoreState state;
Uuid clusterId, shutdownId;
std::string dataDir;
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp Thu Apr 1 17:51:04 2010
@@ -81,8 +81,10 @@ ostream& operator<<(ostream& out, Uuid u
istream& operator>>(istream& in, Uuid& uuid) {
char unparsed[UNPARSED_SIZE + 1] = {0};
in.get(unparsed, sizeof(unparsed));
- if (uuid_parse(unparsed, uuid.c_array()) != 0)
- in.setstate(ios::failbit);
+ if (!in.fail()) {
+ if (uuid_parse(unparsed, uuid.c_array()) != 0)
+ in.setstate(ios::failbit);
+ }
return in;
}
Modified: qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp Thu Apr 1 17:51:04 2010
@@ -65,7 +65,8 @@ QPID_AUTO_TEST_CASE(testSaveLoadDirty) {
Uuid clusterId = Uuid(true);
StoreStatus ss(TEST_DIR);
ss.load();
- ss.dirty(clusterId);
+ ss.setClusterId(clusterId);
+ ss.dirty();
BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_DIRTY_STORE);
StoreStatus ss2(TEST_DIR);
@@ -81,7 +82,7 @@ QPID_AUTO_TEST_CASE(testSaveLoadClean) {
Uuid shutdownId = Uuid(true);
StoreStatus ss(TEST_DIR);
ss.load();
- ss.dirty(clusterId);
+ ss.setClusterId(clusterId);
ss.clean(shutdownId);
BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_CLEAN_STORE);
@@ -99,9 +100,10 @@ QPID_AUTO_TEST_CASE(testMarkDirty) {
Uuid shutdownId = Uuid(true);
StoreStatus ss(TEST_DIR);
ss.load();
- ss.dirty(clusterId);
+ ss.setClusterId(clusterId);
+ ss.dirty();
ss.clean(shutdownId);
- ss.dirty(clusterId);
+ ss.dirty();
StoreStatus ss2(TEST_DIR);
ss2.load();
Modified: qpid/trunk/qpid/cpp/src/tests/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Uuid.cpp?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Uuid.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/Uuid.cpp Thu Apr 1 17:51:04 2010
@@ -50,6 +50,7 @@ QPID_AUTO_TEST_CASE(testUuidCtor) {
boost::array<uint8_t, 16> sample = {{'\x1b', '\x4e', '\x28', '\xba', '\x2f', '\xa1', '\x11', '\xd2', '\x88', '\x3f', '\xb9', '\xa7', '\x61', '\xbd', '\xe3', '\xfb'}};
const string sampleStr("1b4e28ba-2fa1-11d2-883f-b9a761bde3fb");
+const string zeroStr("00000000-0000-0000-0000-000000000000");
QPID_AUTO_TEST_CASE(testUuidIstream) {
Uuid uuid;
@@ -57,6 +58,12 @@ QPID_AUTO_TEST_CASE(testUuidIstream) {
in >> uuid;
BOOST_CHECK(!in.fail());
BOOST_CHECK(uuid == sample);
+
+ istringstream is(zeroStr);
+ Uuid zero;
+ is >> zero;
+ BOOST_CHECK(!in.fail());
+ BOOST_CHECK_EQUAL(zero, Uuid());
}
QPID_AUTO_TEST_CASE(testUuidOstream) {
@@ -65,6 +72,23 @@ QPID_AUTO_TEST_CASE(testUuidOstream) {
out << uuid;
BOOST_CHECK(out.good());
BOOST_CHECK_EQUAL(out.str(), sampleStr);
+
+ ostringstream os;
+ os << Uuid();
+ BOOST_CHECK(out.good());
+ BOOST_CHECK_EQUAL(os.str(), zeroStr);
+}
+
+QPID_AUTO_TEST_CASE(testUuidIOstream) {
+ Uuid a(true), b(true);
+ ostringstream os;
+ os << a << endl << b;
+ Uuid aa, bb;
+ istringstream is(os.str());
+ is >> aa >> ws >> bb;
+ BOOST_CHECK(os.good());
+ BOOST_CHECK_EQUAL(a, aa);
+ BOOST_CHECK_EQUAL(b, bb);
}
QPID_AUTO_TEST_CASE(testUuidEncodeDecode) {
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Apr 1 17:51:04 2010
@@ -310,7 +310,6 @@ class StoreTests(BrokerTest):
c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True)
a.send_message("q", Message("4", durable=True))
a.kill()
- time.sleep(0.1) # pause for b to write status.
b.kill()
self.assertEqual(c.get_message("q").content, "4")
c.send_message("q", Message("clean", durable=True))
@@ -356,11 +355,6 @@ class StoreTests(BrokerTest):
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
- def assert_dirty_store(self, broker):
- assert retry(lambda: os.path.exists(broker.log)), "Missing log file %s"%broker.log
- msg = re.compile("critical.*no clean store")
- assert retry(lambda: msg.search(readfile(broker.log))), "Expected dirty store message in %s"%broker.log
-
def test_solo_store_clean(self):
# A single node cluster should always leave a clean store.
cluster = self.cluster(0, self.args())
@@ -378,29 +372,25 @@ class StoreTests(BrokerTest):
# store.
cluster = self.cluster(0, self.args())
a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+ self.assertEqual(a.store_state(), "clean")
b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
+ self.assertEqual(b.store_state(), "dirty")
+ self.assertEqual(c.store_state(), "dirty")
+ retry(lambda: a.store_state() == "dirty")
+
a.send_message("q", Message("x", durable=True))
a.kill()
- # FIXME aconway 2010-03-29: this test has too many sleeps.
- # Need to tighten up status persistence to be more atomic and less
- # prone to interruption.
- time.sleep(0.1) # pause for b to update status.
- b.kill() # c is last man
- time.sleep(0.1) # pause for c to find out hes last.
+ b.kill() # c is last man, will mark store clean
+ retry(lambda: c.store_state() == "clean")
a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
- time.sleep(0.1) # pause for c to find out hes no longer last.
- c.kill() # a is now last man
- time.sleep(0.1) # pause for a to find out hes last.
- a.kill() # really last, should be clean.
- # b & c should be dirty
- b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
- self.assert_dirty_store(b)
- c = cluster.start("c", wait=False, expect=EXPECT_EXIT_FAIL)
- self.assert_dirty_store(c)
- # a should be clean
- a = cluster.start("a")
- self.assertEqual(a.get_message("q").content, "x")
+ retry(lambda: c.store_state() == "dirty")
+ c.kill() # a is now last man
+ retry(lambda: a.store_state() == "clean")
+ a.kill()
+ self.assertEqual(a.store_state(), "clean")
+ self.assertEqual(b.store_state(), "dirty")
+ self.assertEqual(c.store_state(), "dirty")
def test_restart_clean(self):
"""Verify that we can re-start brokers one by one in a
@@ -426,7 +416,6 @@ class StoreTests(BrokerTest):
a.send_message("q", Message("x", durable=True))
a.send_message("q", Message("y", durable=True))
a.kill()
- time.sleep(0.1) # pause for b to write status.
b.kill()
a = cluster.start("a")
self.assertEqual(c.get_message("q").content, "x")
Modified: qpid/trunk/qpid/cpp/src/tests/testagent.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/testagent.mk?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/testagent.mk (original)
+++ qpid/trunk/qpid/cpp/src/tests/testagent.mk Thu Apr 1 17:51:04 2010
@@ -36,7 +36,7 @@ TESTAGENT_GEN_SRC= \
$(TESTAGENT_GEN_SRC): testagent_gen.timestamp
-testagent_gen.timestamp: testagent.xml
+testagent_gen.timestamp: testagent.xml $(mgen_generator)
$(QMF_GEN) -o testagent_gen/qmf $(srcdir)/testagent.xml
touch testagent_gen.timestamp
Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=930055&r1=930054&r2=930055&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Thu Apr 1 17:51:04 2010
@@ -373,6 +373,13 @@ class Broker(Popen):
try: self.connect().close()
except: raise RethrownException("Broker %s failed ready test"%self.name)
+ def store_state(self):
+ uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()
+ null_uuid="00000000-0000-0000-0000-000000000000\n"
+ if uuids[0] == null_uuid: return "empty"
+ if uuids[1] == null_uuid: return "dirty"
+ return "clean"
+
class Cluster:
"""A cluster of brokers in a test."""
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org