You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/07/13 05:07:09 UTC

mesos git commit: Added tests to ensure slave recovery post reboot.

Repository: mesos
Updated Branches:
  refs/heads/master cd6495e67 -> 188109b63


Added tests to ensure slave recovery post reboot.

Added tests to verify that the state is recovered post reboot and the
agent ID is retained given the recovery finishes without errors and
if the recovery fails due to agent info mismatch then agent is recoverd
as a new agent.

Review: https://reviews.apache.org/r/56895/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/188109b6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/188109b6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/188109b6

Branch: refs/heads/master
Commit: 188109b63ea9cc0cdfe1fd616c744cb10dbb4a57
Parents: cd6495e
Author: Megha Sharma <ms...@apple.com>
Authored: Wed Jul 12 22:03:37 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed Jul 12 22:03:37 2017 -0700

----------------------------------------------------------------------
 src/tests/slave_recovery_tests.cpp | 164 +++++++++++++++++++++++++++-----
 1 file changed, 142 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/188109b6/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 1cd248a..9ba6f60 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -2516,14 +2516,13 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
 
 
 // When the slave is down we modify the BOOT_ID_FILE to simulate a
-// reboot. The subsequent run of the slave should not recover.
+// reboot. The subsequent run of the slave should recover.
 TYPED_TEST(SlaveRecoveryTest, Reboot)
 {
   Try<Owned<cluster::Master>> master = this->StartMaster();
   ASSERT_SOME(master);
 
   slave::Flags flags = this->CreateSlaveFlags();
-  flags.strict = false;
 
   Fetcher fetcher(flags);
 
@@ -2589,18 +2588,18 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
   AWAIT_READY(containers);
   ASSERT_EQ(1u, containers->size());
 
-  ContainerID containerId = *containers->begin();
+  ContainerID containerId1 = *containers->begin();
 
   slave.get()->terminate();
 
   // Get the executor's pid so we can reap it to properly simulate a
   // reboot.
   string pidPath = paths::getForkedPidPath(
-        paths::getMetaRootDir(flags.work_dir),
-        slaveId1,
-        frameworkId,
-        executorId,
-        containerId);
+      paths::getMetaRootDir(flags.work_dir),
+      slaveId1,
+      frameworkId,
+      executorId,
+      containerId1);
 
   Try<string> read = os::read(pidPath);
   ASSERT_SOME(read);
@@ -2620,8 +2619,8 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
       paths::getBootIdPath(paths::getMetaRootDir(flags.work_dir)),
       "rebooted! ;)"));
 
-  Future<SlaveRegisteredMessage> slaveRegistered =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+  Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
 
   // Restart the slave (use same flags) with a new containerizer.
   _containerizer = TypeParam::create(flags, true, &fetcher);
@@ -2633,27 +2632,148 @@ TYPED_TEST(SlaveRecoveryTest, Reboot)
     .WillOnce(FutureArg<1>(&offers2))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
-  slave = this->StartSlave(detector.get(), containerizer.get(), flags);
-  ASSERT_SOME(slave);
+  // Recover the state.
+  Result<slave::state::State> recoverState =
+    slave::state::recover(paths::getMetaRootDir(flags.work_dir), true);
 
-  AWAIT_READY(slaveRegistered);
+  ASSERT_SOME(recoverState);
+  ASSERT_SOME(recoverState->slave);
 
-  SlaveID slaveId2 = slaveRegistered->slave_id();
+  slave::state::SlaveState state = recoverState->slave.get();
 
-  EXPECT_NE(slaveId1, slaveId2);
+  slave = this->StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+  EXPECT_EQ(slaveId1, state.id);
+
+  AWAIT_READY(reregisterSlaveMessage);
 
   // Make sure all slave resources are reoffered.
   AWAIT_READY(offers2);
   EXPECT_EQ(Resources(offers1.get()[0].resources()),
             Resources(offers2.get()[0].resources()));
 
-  // The old agent ID is not removed (MESOS-5396).
-  JSON::Object stats = Metrics();
-  EXPECT_EQ(0, stats.values["master/tasks_lost"]);
-  EXPECT_EQ(0, stats.values["master/slave_unreachable_scheduled"]);
-  EXPECT_EQ(0, stats.values["master/slave_unreachable_completed"]);
-  EXPECT_EQ(0, stats.values["master/slave_removals"]);
-  EXPECT_EQ(0, stats.values["master/slave_removals/reason_registered"]);
+  SlaveID slaveId2 = offers2.get()[0].slave_id();
+  EXPECT_EQ(slaveId1, slaveId2);
+
+  ASSERT_TRUE(state.frameworks.contains(frameworkId));
+
+  EXPECT_TRUE(state.frameworks[frameworkId].executors.contains(executorId));
+
+  slave::state::ExecutorState executorState =
+    state.frameworks[frameworkId].executors[executorId];
+
+  const Option<ContainerID>& containerId2 = executorState.latest;
+  EXPECT_SOME_EQ(containerId1, containerId2);
+
+  EXPECT_TRUE(executorState.runs.contains(containerId2.get()));
+
+  EXPECT_SOME_EQ(
+      executorPid,
+      executorState
+        .runs[containerId2.get()]
+        .libprocessPid);
+
+  EXPECT_TRUE(executorState
+                .runs[containerId2.get()]
+                .tasks.contains(task.task_id()));
+
+  driver.stop();
+  driver.join();
+}
+
+
+// When the agent is down we modify the BOOT_ID_FILE to simulate a
+// reboot and change the resources flag to cause a mismatch
+// between the recovered agent's agent info and the one of the new agent
+// to ensure it registers as a new agent.
+TYPED_TEST(SlaveRecoveryTest, RebootWithSlaveInfoMismatch)
+{
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+  flags.resources = "cpus:8;mem:4096;disk:2048";
+
+  Fetcher fetcher(flags);
+
+  Try<TypeParam*> _containerizer = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    this->StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  // Capture offer in order to get the agent ID.
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers1);
+
+  SlaveID slaveId1 = offers1.get()[0].slave_id();
+
+  EXPECT_CALL(sched, offerRescinded(_, _))
+    .Times(AtMost(1));
+
+  slave.get()->terminate();
+
+  // Modify the boot ID to simulate a reboot.
+  ASSERT_SOME(os::write(
+      paths::getBootIdPath(paths::getMetaRootDir(flags.work_dir)),
+      "rebooted! ;)"));
+
+  Future<RegisterSlaveMessage> registerSlaveMessage =
+    FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
+
+  // Change agent's resources to cause agent Info mismatch.
+  flags.resources = "cpus:4;mem:2048;disk:2048";
+
+  _containerizer = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  containerizer.reset(_containerizer.get());
+
+  // Recover the state.
+  Result<slave::state::State> recoverState =
+    slave::state::recover(paths::getMetaRootDir(flags.work_dir), true);
+
+  // Capture offer in order to get the new agent ID.
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  ASSERT_SOME(recoverState);
+  EXPECT_SOME(recoverState->slave);
+
+  slave = this->StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(registerSlaveMessage);
+
+  AWAIT_READY(offers2);
+
+  ASSERT_EQ(1u, offers2->size());
+  Offer offer2 = offers2.get()[0];
+
+  SlaveID slaveId2 = offers2.get()[0].slave_id();
+
+  EXPECT_NE(slaveId1, slaveId2);
 
   driver.stop();
   driver.join();