You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/13 21:49:53 UTC

[09/23] git commit: Fixed slave to properly handle duplicate terminal updates for the same task.

Fixed slave to properly handle duplicate terminal updates for the
same task.

Review: https://reviews.apache.org/r/11798


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cfb83a4f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cfb83a4f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cfb83a4f

Branch: refs/heads/vinod/0.13.0
Commit: cfb83a4f98c67c3fd74ab4375f474989763d0b2e
Parents: bc2a09d
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Jun 10 22:44:42 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 11 12:01:44 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                       |  25 ++-
 src/tests/status_update_manager_tests.cpp | 202 +++++++++++++++++++++++++
 2 files changed, 220 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cfb83a4f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b5b7e0e..fd96de9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1687,6 +1687,8 @@ void Slave::reregisterExecutorTimeout()
 // 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
 void Slave::statusUpdate(const StatusUpdate& update)
 {
+  LOG(INFO) << "Handling status update " << update;
+
   CHECK(state == RECOVERING || state == DISCONNECTED ||
         state == RUNNING || state == TERMINATING)
     << state;
@@ -1724,7 +1726,8 @@ void Slave::statusUpdate(const StatusUpdate& update)
     // generated by the slave when the executor is unknown to it,
     // e.g., killTask(), _runTask().
     // TODO(vinod): Revisit these semantics when we disallow updates
-    // sent by executors that are unknown to the slave.
+    // (e.g., when slave recovery is always enabled) sent by executors
+    // that are unknown to the slave.
     statusUpdateManager->update(update, info.id())
       .onAny(defer(self(), &Slave::_statusUpdate, params::_1, update, None()));
 
@@ -1737,15 +1740,17 @@ void Slave::statusUpdate(const StatusUpdate& update)
         executor->state == Executor::TERMINATED)
     << executor->state;
 
-  VLOG(1) << "Handling status update " << update;
-
   stats.tasks[update.status().state()]++;
   stats.validStatusUpdates++;
 
   executor->updateTaskState(status.task_id(), status.state());
 
-  // Handle the task appropriately if it's terminated.
-  if (protobuf::isTerminalState(status.state())) {
+  // Handle the task appropriately if it is terminated.
+  // TODO(vinod): Revisit these semantics when we disallow duplicate
+  // terminal updates (e.g., when slave recovery is always enabled).
+  if (protobuf::isTerminalState(status.state()) &&
+      (executor->queuedTasks.contains(status.task_id()) ||
+       executor->launchedTasks.contains(status.task_id()))) {
     executor->terminateTask(status.task_id(), status.state());
 
     // Tell the isolator to update the resources.
@@ -2967,7 +2972,8 @@ void Executor::terminateTask(
 
 void Executor::completeTask(const TaskID& taskId)
 {
-  CHECK(terminatedTasks.contains(taskId)) << "Unknown task " << taskId;
+  CHECK(terminatedTasks.contains(taskId))
+    << "Failed to find terminated task " << taskId;
 
   Task* task = terminatedTasks[taskId];
   completedTasks.push_back(*task);
@@ -3018,7 +3024,12 @@ void Executor::recoverTask(const TaskState& state)
     updateTaskState(state.id, update.status().state());
 
     // Terminate the task if it received a terminal update.
-    if (protobuf::isTerminalState(update.status().state())) {
+    // We ignore duplicate terminal updates by checking if
+    // the task is present in launchedTasks.
+    // TODO(vinod): Revisit these semantics when we disallow duplicate
+    // terminal updates (e.g., when slave recovery is always enabled).
+    if (protobuf::isTerminalState(update.status().state()) &&
+        launchedTasks.contains(state.id)) {
       terminateTask(state.id, update.status().state());
 
       // If the terminal update has been acknowledged, remove it.

http://git-wip-us.apache.org/repos/asf/mesos/blob/cfb83a4f/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 61ccfcc..4239532 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -464,3 +464,205 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
 
   Shutdown();
 }
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate terminal status updates, when the
+// second update is received before the ACK for the first update.
+// The proper behavior here is for the status update manager to
+// drop the duplicate update.
+TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+
+  Try<PID<Slave> > slave = StartSlave(&exec, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Drop the first ACK from the scheduler to the slave.
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+  AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+  Future<Nothing> _statusUpdate =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdate);
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // At this point the status update manager has enqueued
+  // TASK_FINISHED and TASK_KILLED updates.
+  AWAIT_READY(_statusUpdate);
+
+  // After we advance the clock, the scheduler should receive
+  // the retried TASK_FINISHED update and acknowledge it. The
+  // TASK_KILLED update should be dropped by the status update
+  // manager, as the stream is already terminated.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::settle();
+
+  // Ensure the scheduler receives TASK_FINISHED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_FINISHED, update.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate terminal status updates, when the
+// second update is received after the ACK for the first update.
+// The proper behavior here is for the status update manager to
+// forward the duplicate update to the scheduler.
+TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+
+  Try<PID<Slave> > slave = StartSlave(&exec, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  Clock::pause();
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // Ensure the scheduler receives TASK_KILLED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_KILLED, update.get().state());
+
+  // Ensure the slave properly handles the ACK.
+  // Clock::settle() ensures that the slave successfully
+  // executes Slave::_statusUpdateAcknowledgement().
+  AWAIT_READY(_statusUpdateAcknowledgement2);
+  Clock::settle();
+
+  Clock::resume();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}