You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/04/25 20:09:16 UTC
svn commit: r1330450 - in /incubator/mesos/trunk/src: log/coordinator.cpp
log/coordinator.hpp log/log.hpp log/replica.cpp tests/log_tests.cpp
Author: benh
Date: Wed Apr 25 18:09:15 2012
New Revision: 1330450
URL: http://svn.apache.org/viewvc?rev=1330450&view=rev
Log:
Fixes a bug when a coordinator tried to fill positions that other replicas had already learned were truncated (note that the test added failed before the rest of the patch got applied).
Modified:
incubator/mesos/trunk/src/log/coordinator.cpp
incubator/mesos/trunk/src/log/coordinator.hpp
incubator/mesos/trunk/src/log/log.hpp
incubator/mesos/trunk/src/log/replica.cpp
incubator/mesos/trunk/src/tests/log_tests.cpp
Modified: incubator/mesos/trunk/src/log/coordinator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.cpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.cpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.cpp Wed Apr 25 18:09:15 2012
@@ -373,6 +373,9 @@ Result<uint64_t> Coordinator::commit(con
message.mutable_action()->set_learned(true);
}
+ LOG(INFO) << "Telling other replicas of learned action at position "
+ << action.position();
+
remotecast(message);
return action.position();
@@ -428,7 +431,18 @@ Result<Action> Coordinator::fill(uint64_
if (response.has_action()) {
CHECK(response.action().position() == position);
if (response.action().has_learned() && response.action().learned()) {
- // Received a learned action, try and commit locally.
+ // Received a learned action, try and commit locally. Note
+ // that there is no checking that we get the _same_ learned
+ // action in the event we get multiple responses with
+ // learned actions, we just take the "first". In fact, there
+ // is a specific instance in which learned actions will NOT
+ // be the same! In this instance, one replica may return
+ // that the action is a learned no-op because it knows the
+ // position has been truncated while another replica (that
+ // hasn't learned the truncation yet) might return the
+ // actual action at this position. Picking either action is
+ // _correct_, since eventually we know this position will be
+ // truncated. Fun!
Result<uint64_t> result = commit(response.action());
if (result.isError()) {
return Result<Action>::error(result.error());
Modified: incubator/mesos/trunk/src/log/coordinator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.hpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.hpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.hpp Wed Apr 25 18:09:15 2012
@@ -99,7 +99,7 @@ private:
bool elected; // True if this coordinator has been elected.
- int quorum; // Quorum size.
+ const int quorum; // Quorum size.
Replica* replica; // Local log replica.
Modified: incubator/mesos/trunk/src/log/log.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.hpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.hpp (original)
+++ incubator/mesos/trunk/src/log/log.hpp Wed Apr 25 18:09:15 2012
@@ -203,6 +203,8 @@ public:
quorum = _quorum;
+ LOG(INFO) << "Creating a new log replica";
+
replica = new Replica(path);
group = new zookeeper::Group(servers, timeout, znode, auth);
@@ -339,8 +341,6 @@ Log::Writer::Writer(Log* log, const seco
: coordinator(log->quorum, log->replica, log->network),
error(Option<std::string>::none())
{
- LOG(INFO) << "Number of retries: " << retries;
-
do {
Result<uint64_t> result = coordinator.elect(Timeout(timeout.value));
if (result.isNone()) {
Modified: incubator/mesos/trunk/src/log/replica.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/replica.cpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/replica.cpp (original)
+++ incubator/mesos/trunk/src/log/replica.cpp Wed Apr 25 18:09:15 2012
@@ -209,6 +209,9 @@ Try<State> LevelDBStorage::recover(const
CHECK(leveldb::BytewiseComparator()->Compare(ten, two) > 0);
CHECK(leveldb::BytewiseComparator()->Compare(ten, ten) == 0);
+ Timer timer;
+ timer.start();
+
leveldb::Status status = leveldb::DB::Open(options, path, &db);
if (!status.ok()) {
@@ -216,9 +219,17 @@ Try<State> LevelDBStorage::recover(const
return Try<State>::error(status.ToString());
}
+ LOG(INFO) << "Opened db in "
+ << timer.elapsed().millis() << " milliseconds";
+
+ timer.start(); // Restart the timer.
+
// TODO(benh): Conditionally compact to avoid long recovery times?
db->CompactRange(NULL, NULL);
+ LOG(INFO) << "Compacted db in "
+ << timer.elapsed().millis() << " milliseconds";
+
State state;
state.coordinator = 0;
state.begin = 0;
@@ -229,11 +240,26 @@ Try<State> LevelDBStorage::recover(const
// records and confirming that they are all indeed of type
// Record::Action.
+ timer.start(); // Restart the timer.
+
leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
+ LOG(INFO) << "Created db iterator in " << timer.elapsed().millis()
+ << " milliseconds";
+
+ timer.start(); // Restart the timer.
+
iterator->SeekToFirst();
+ LOG(INFO) << "Seeked to beginning of db in "
+ << timer.elapsed().millis() << " milliseconds";
+
+ timer.start(); // Restart the timer.
+
+ uint64_t keys = 0;
+
while (iterator->Valid()) {
+ keys++;
const leveldb::Slice& slice = iterator->value();
google::protobuf::io::ArrayInputStream stream(slice.data(), slice.size());
@@ -277,6 +303,9 @@ Try<State> LevelDBStorage::recover(const
iterator->Next();
}
+ LOG(INFO) << "Iterated through " << keys << " keys in the db in "
+ << timer.elapsed().millis() << " milliseconds";
+
// Determine the first position still in leveldb so during a
// truncation we can attempt to delete all positions from the first
// position up to the truncate position. Note that this is not the
@@ -675,6 +704,32 @@ void ReplicaProcess::promise(const Promi
LOG(INFO) << "Replica received explicit promise request for "
<< request.id() << " for position " << request.position();
+ // If the position has been truncated, tell the coordinator that
+ // it's a learned no-op. This can happen when a replica has missed
+ // some truncates and it's coordinator tries to fill some
+ // truncated positions on election. A learned no-op is safe since
+ // the coordinator should eventually learn that this position was
+ // actually truncated. The action must be _learned_ so that the
+ // coordinator doesn't attempt to run a full Paxos round which
+ // will never succeed because this replica will not permit the
+ // write (because ReplicaProcess::write "ignores" writes on
+ // truncated positions).
+ if (request.position() < begin) {
+ Action action;
+ action.set_position(request.position());
+ action.set_promised(coordinator); // Use the last coordinator.
+ action.set_performed(coordinator); // Use the last coordinator.
+ action.set_learned(true);
+ action.set_type(Action::NOP);
+ action.mutable_nop()->MergeFrom(Action::Nop());
+
+ PromiseResponse response;
+ response.set_okay(true);
+ response.set_id(request.id());
+ response.mutable_action()->MergeFrom(action);
+ reply(response);
+ }
+
// Need to get the action for the specified position.
Result<Action> result = read(request.position());
@@ -722,6 +777,8 @@ void ReplicaProcess::promise(const Promi
<< request.id();
if (request.id() <= coordinator) { // Only make an implicit promise once!
+ LOG(INFO) << "Replica denying promise request for "
+ << request.id();
PromiseResponse response;
response.set_okay(false);
response.set_id(request.id());
Modified: incubator/mesos/trunk/src/tests/log_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/log_tests.cpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/log_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/log_tests.cpp Wed Apr 25 18:09:15 2012
@@ -1034,6 +1034,87 @@ TEST(CoordinatorTest, TruncateNotLearned
}
+TEST(CoordinatorTest, TruncateLearnedFill)
+{
+ const std::string path1 = utils::os::getcwd() + "/.log1";
+ const std::string path2 = utils::os::getcwd() + "/.log2";
+ const std::string path3 = utils::os::getcwd() + "/.log3";
+
+ utils::os::rmdir(path1);
+ utils::os::rmdir(path2);
+ utils::os::rmdir(path3);
+
+ Replica replica1(path1);
+ Replica replica2(path2);
+
+ Network network1;
+
+ network1.add(replica1.pid());
+ network1.add(replica2.pid());
+
+ Coordinator coord1(2, &replica1, &network1);
+
+ {
+ Result<uint64_t> result = coord1.elect(Timeout(1.0));
+ ASSERT_TRUE(result.isSome());
+ EXPECT_EQ(0, result.get());
+ }
+
+ for (uint64_t position = 1; position <= 10; position++) {
+ Result<uint64_t> result =
+ coord1.append(utils::stringify(position), Timeout(1.0));
+ ASSERT_TRUE(result.isSome());
+ EXPECT_EQ(position, result.get());
+ }
+
+ {
+ Result<uint64_t> result = coord1.truncate(7, Timeout(1.0));
+ ASSERT_TRUE(result.isSome());
+ EXPECT_EQ(11, result.get());
+ }
+
+ Replica replica3(path3);
+
+ Network network2;
+
+ network2.add(replica2.pid());
+ network2.add(replica3.pid());
+
+ Coordinator coord2(2, &replica3, &network2);
+
+ {
+ Result<uint64_t> result = coord2.elect(Timeout(1.0));
+ ASSERT_TRUE(result.isNone());
+ result = coord2.elect(Timeout(1.0));
+ ASSERT_TRUE(result.isSome());
+ EXPECT_EQ(11, result.get());
+ }
+
+ {
+ Future<std::list<Action> > actions = replica3.read(6, 10);
+ ASSERT_TRUE(actions.await(2.0));
+ ASSERT_TRUE(actions.isFailed());
+ EXPECT_EQ("Bad read range (truncated position)", actions.failure());
+ }
+
+ {
+ Future<std::list<Action> > actions = replica3.read(7, 10);
+ ASSERT_TRUE(actions.await(2.0));
+ ASSERT_TRUE(actions.isReady());
+ EXPECT_EQ(4, actions.get().size());
+ foreach (const Action& action, actions.get()) {
+ ASSERT_TRUE(action.has_type());
+ ASSERT_EQ(Action::APPEND, action.type());
+ EXPECT_EQ(utils::stringify(action.position()), action.append().bytes());
+ }
+ }
+
+ utils::os::rmdir(path1);
+ utils::os::rmdir(path2);
+ utils::os::rmdir(path3);
+}
+
+
TEST(LogTest, WriteRead)
{
const std::string path1 = utils::os::getcwd() + "/.log1";