You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/04/25 20:09:16 UTC

svn commit: r1330450 - in /incubator/mesos/trunk/src: log/coordinator.cpp log/coordinator.hpp log/log.hpp log/replica.cpp tests/log_tests.cpp

Author: benh
Date: Wed Apr 25 18:09:15 2012
New Revision: 1330450

URL: http://svn.apache.org/viewvc?rev=1330450&view=rev
Log:
Fixes a bug when a coordinator tried to fill positions that other replicas had already learned were truncated (note that the test added failed before the rest of the patch got applied).

Modified:
    incubator/mesos/trunk/src/log/coordinator.cpp
    incubator/mesos/trunk/src/log/coordinator.hpp
    incubator/mesos/trunk/src/log/log.hpp
    incubator/mesos/trunk/src/log/replica.cpp
    incubator/mesos/trunk/src/tests/log_tests.cpp

Modified: incubator/mesos/trunk/src/log/coordinator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.cpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.cpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.cpp Wed Apr 25 18:09:15 2012
@@ -373,6 +373,9 @@ Result<uint64_t> Coordinator::commit(con
     message.mutable_action()->set_learned(true);
   }
 
+  LOG(INFO) << "Telling other replicas of learned action at position "
+            << action.position();
+
   remotecast(message);
 
   return action.position();
@@ -428,7 +431,18 @@ Result<Action> Coordinator::fill(uint64_
       if (response.has_action()) {
         CHECK(response.action().position() == position);
         if (response.action().has_learned() && response.action().learned()) {
-          // Received a learned action, try and commit locally.
+          // Received a learned action, try and commit locally. Note
+          // that there is no checking that we get the _same_ learned
+          // action in the event we get multiple responses with
+          // learned actions, we just take the "first". In fact, there
+          // is a specific instance in which learned actions will NOT
+          // be the same! In this instance, one replica may return
+          // that the action is a learned no-op because it knows the
+          // position has been truncated while another replica (that
+          // hasn't learned the truncation yet) might return the
+          // actual action at this position. Picking either action is
+          // _correct_, since eventually we know this position will be
+          // truncated. Fun!
           Result<uint64_t> result = commit(response.action());
           if (result.isError()) {
             return Result<Action>::error(result.error());

Modified: incubator/mesos/trunk/src/log/coordinator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.hpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.hpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.hpp Wed Apr 25 18:09:15 2012
@@ -99,7 +99,7 @@ private:
 
   bool elected; // True if this coordinator has been elected.
 
-  int quorum; // Quorum size.
+  const int quorum; // Quorum size.
 
   Replica* replica; // Local log replica.
 

Modified: incubator/mesos/trunk/src/log/log.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.hpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.hpp (original)
+++ incubator/mesos/trunk/src/log/log.hpp Wed Apr 25 18:09:15 2012
@@ -203,6 +203,8 @@ public:
 
     quorum = _quorum;
 
+    LOG(INFO) << "Creating a new log replica";
+
     replica = new Replica(path);
 
     group = new zookeeper::Group(servers, timeout, znode, auth);
@@ -339,8 +341,6 @@ Log::Writer::Writer(Log* log, const seco
   : coordinator(log->quorum, log->replica, log->network),
     error(Option<std::string>::none())
 {
-  LOG(INFO) << "Number of retries: " << retries;
-
   do {
     Result<uint64_t> result = coordinator.elect(Timeout(timeout.value));
     if (result.isNone()) {

Modified: incubator/mesos/trunk/src/log/replica.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/replica.cpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/replica.cpp (original)
+++ incubator/mesos/trunk/src/log/replica.cpp Wed Apr 25 18:09:15 2012
@@ -209,6 +209,9 @@ Try<State> LevelDBStorage::recover(const
   CHECK(leveldb::BytewiseComparator()->Compare(ten, two) > 0);
   CHECK(leveldb::BytewiseComparator()->Compare(ten, ten) == 0);
 
+  Timer timer;
+  timer.start();
+
   leveldb::Status status = leveldb::DB::Open(options, path, &db);
 
   if (!status.ok()) {
@@ -216,9 +219,17 @@ Try<State> LevelDBStorage::recover(const
     return Try<State>::error(status.ToString());
   }
 
+  LOG(INFO) << "Opened db in "
+	    << timer.elapsed().millis() << " milliseconds";
+
+  timer.start(); // Restart the timer.
+
   // TODO(benh): Conditionally compact to avoid long recovery times?
   db->CompactRange(NULL, NULL);
 
+  LOG(INFO) << "Compacted db in "
+	    << timer.elapsed().millis() << " milliseconds";
+
   State state;
   state.coordinator = 0;
   state.begin = 0;
@@ -229,11 +240,26 @@ Try<State> LevelDBStorage::recover(const
   // records and confirming that they are all indeed of type
   // Record::Action.
 
+  timer.start(); // Restart the timer.
+
   leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
 
+  LOG(INFO) << "Created db iterator in " << timer.elapsed().millis()
+            << " milliseconds";
+
+  timer.start(); // Restart the timer.
+
   iterator->SeekToFirst();
 
+  LOG(INFO) << "Seeked to beginning of db in "
+	    << timer.elapsed().millis() << " milliseconds";
+
+  timer.start(); // Restart the timer.
+
+  uint64_t keys = 0;
+
   while (iterator->Valid()) {
+    keys++;
     const leveldb::Slice& slice = iterator->value();
 
     google::protobuf::io::ArrayInputStream stream(slice.data(), slice.size());
@@ -277,6 +303,9 @@ Try<State> LevelDBStorage::recover(const
     iterator->Next();
   }
 
+  LOG(INFO) << "Iterated through " << keys << " keys in the db in "
+	    << timer.elapsed().millis() << " milliseconds";
+
   // Determine the first position still in leveldb so during a
   // truncation we can attempt to delete all positions from the first
   // position up to the truncate position. Note that this is not the
@@ -675,6 +704,32 @@ void ReplicaProcess::promise(const Promi
     LOG(INFO) << "Replica received explicit promise request for "
               << request.id() << " for position " << request.position();
 
+    // If the position has been truncated, tell the coordinator that
+    // it's a learned no-op. This can happen when a replica has missed
+    // some truncates and it's coordinator tries to fill some
+    // truncated positions on election. A learned no-op is safe since
+    // the coordinator should eventually learn that this position was
+    // actually truncated. The action must be _learned_ so that the
+    // coordinator doesn't attempt to run a full Paxos round which
+    // will never succeed because this replica will not permit the
+    // write (because ReplicaProcess::write "ignores" writes on
+    // truncated positions).
+    if (request.position() < begin) {
+      Action action;
+      action.set_position(request.position());
+      action.set_promised(coordinator); // Use the last coordinator.
+      action.set_performed(coordinator); // Use the last coordinator.
+      action.set_learned(true);
+      action.set_type(Action::NOP);
+      action.mutable_nop()->MergeFrom(Action::Nop());
+
+      PromiseResponse response;
+      response.set_okay(true);
+      response.set_id(request.id());
+      response.mutable_action()->MergeFrom(action);
+      reply(response);
+    }
+
     // Need to get the action for the specified position.
     Result<Action> result = read(request.position());
 
@@ -722,6 +777,8 @@ void ReplicaProcess::promise(const Promi
               << request.id();
 
     if (request.id() <= coordinator) { // Only make an implicit promise once!
+      LOG(INFO) << "Replica denying promise request for "
+                << request.id();
       PromiseResponse response;
       response.set_okay(false);
       response.set_id(request.id());

Modified: incubator/mesos/trunk/src/tests/log_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/log_tests.cpp?rev=1330450&r1=1330449&r2=1330450&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/log_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/log_tests.cpp Wed Apr 25 18:09:15 2012
@@ -1034,6 +1034,87 @@ TEST(CoordinatorTest, TruncateNotLearned
 }
 
 
+TEST(CoordinatorTest, TruncateLearnedFill)
+{
+  const std::string path1 = utils::os::getcwd() + "/.log1";
+  const std::string path2 = utils::os::getcwd() + "/.log2";
+  const std::string path3 = utils::os::getcwd() + "/.log3";
+
+  utils::os::rmdir(path1);
+  utils::os::rmdir(path2);
+  utils::os::rmdir(path3);
+
+  Replica replica1(path1);
+  Replica replica2(path2);
+
+  Network network1;
+
+  network1.add(replica1.pid());
+  network1.add(replica2.pid());
+
+  Coordinator coord1(2, &replica1, &network1);
+
+  {
+    Result<uint64_t> result = coord1.elect(Timeout(1.0));
+    ASSERT_TRUE(result.isSome());
+    EXPECT_EQ(0, result.get());
+  }
+
+  for (uint64_t position = 1; position <= 10; position++) {
+    Result<uint64_t> result =
+      coord1.append(utils::stringify(position), Timeout(1.0));
+    ASSERT_TRUE(result.isSome());
+    EXPECT_EQ(position, result.get());
+  }
+
+  {
+    Result<uint64_t> result = coord1.truncate(7, Timeout(1.0));
+    ASSERT_TRUE(result.isSome());
+    EXPECT_EQ(11, result.get());
+  }
+
+  Replica replica3(path3);
+
+  Network network2;
+
+  network2.add(replica2.pid());
+  network2.add(replica3.pid());
+
+  Coordinator coord2(2, &replica3, &network2);
+
+  {
+    Result<uint64_t> result = coord2.elect(Timeout(1.0));
+    ASSERT_TRUE(result.isNone());
+    result = coord2.elect(Timeout(1.0));
+    ASSERT_TRUE(result.isSome());
+    EXPECT_EQ(11, result.get());
+  }
+
+  {
+    Future<std::list<Action> > actions = replica3.read(6, 10);
+    ASSERT_TRUE(actions.await(2.0));
+    ASSERT_TRUE(actions.isFailed());
+    EXPECT_EQ("Bad read range (truncated position)", actions.failure());
+  }
+
+  {
+    Future<std::list<Action> > actions = replica3.read(7, 10);
+    ASSERT_TRUE(actions.await(2.0));
+    ASSERT_TRUE(actions.isReady());
+    EXPECT_EQ(4, actions.get().size());
+    foreach (const Action& action, actions.get()) {
+      ASSERT_TRUE(action.has_type());
+      ASSERT_EQ(Action::APPEND, action.type());
+      EXPECT_EQ(utils::stringify(action.position()), action.append().bytes());
+    }
+  }
+
+  utils::os::rmdir(path1);
+  utils::os::rmdir(path2);
+  utils::os::rmdir(path3);
+}
+
+
 TEST(LogTest, WriteRead)
 {
   const std::string path1 = utils::os::getcwd() + "/.log1";