You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2016/08/25 02:59:56 UTC

[2/2] mesos git commit: Updated the agent to reject multiple terminal status updates.

Updated the agent to reject multiple terminal status updates.

Currently the agent will allow terminal status updates when updating
the state of a task. This allows the status update invariants to be
violated! This patch disallows multiple terminal updates.

Review: https://reviews.apache.org/r/51404


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2f019d80
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2f019d80
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2f019d80

Branch: refs/heads/master
Commit: 2f019d801d9a9458e10bb41f176622a280415a90
Parents: 8bb12e8
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 12 19:44:43 2016 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 24 19:55:19 2016 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                       | 168 ++++++++++++++-----------
 src/slave/slave.hpp                       |   3 +-
 src/tests/slave_tests.cpp                 | 111 ++++++++++++++++
 src/tests/status_update_manager_tests.cpp | 108 ----------------
 4 files changed, 205 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2f019d80/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c686a97..5d162d0 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3465,18 +3465,26 @@ void Slave::_statusUpdate(
   // task is sent only after the acknowledgement for the previous one
   // is received, which could take a long time if the framework is
   // backed up or is down.
-  executor->updateTaskState(status);
-
-  // Handle the task appropriately if it is terminated.
-  // TODO(vinod): Revisit these semantics when we disallow duplicate
-  // terminal updates (e.g., when slave recovery is always enabled).
-  if (protobuf::isTerminalState(status.state()) &&
-      (executor->queuedTasks.contains(status.task_id()) ||
-       executor->launchedTasks.contains(status.task_id()))) {
-    executor->terminateTask(status.task_id(), status);
-
-    // Wait until the container's resources have been updated before
-    // sending the status update.
+  Try<Nothing> updated = executor->updateTaskState(status);
+
+  // If we fail to update the task state, drop the update. Note that
+  // we have to acknowledge the executor so that it does not retry.
+  if (updated.isError()) {
+    LOG(ERROR) << "Failed to update state of task '" << status.task_id() << "'"
+               << " to " << status.state() << ": " << updated.error();
+
+    // NOTE: This may lead to out-of-order acknowledgements since
+    // other update acknowledgements may be waiting for the
+    // containerizer or status update manager.
+    ___statusUpdate(Nothing(), update, pid);
+    return;
+  }
+
+  if (protobuf::isTerminalState(status.state())) {
+    // If the task terminated, wait until the container's resources
+    // have been updated before sending the status update. Note that
+    // duplicate terminal updates are not possible here because they
+    // lead to an error from `Executor::updateTaskState`.
     containerizer->update(executor->containerId, executor->resources)
       .onAny(defer(self(),
                    &Slave::__statusUpdate,
@@ -6108,49 +6116,6 @@ Task* Executor::addTask(const TaskInfo& task)
 }
 
 
-void Executor::terminateTask(
-    const TaskID& taskId,
-    const mesos::TaskStatus& status)
-{
-  VLOG(1) << "Terminating task " << taskId;
-
-  Task* task = nullptr;
-  // Remove the task if it's queued.
-  if (queuedTasks.contains(taskId)) {
-    task = new Task(
-        protobuf::createTask(queuedTasks[taskId], status.state(), frameworkId));
-    queuedTasks.erase(taskId);
-  } else if (launchedTasks.contains(taskId)) {
-    // Update the resources if it's been launched.
-    task = launchedTasks[taskId];
-    resources -= task->resources();
-    launchedTasks.erase(taskId);
-  }
-
-  switch (status.state()) {
-    case TASK_FINISHED:
-      ++slave->metrics.tasks_finished;
-      break;
-    case TASK_FAILED:
-      ++slave->metrics.tasks_failed;
-      break;
-    case TASK_KILLED:
-      ++slave->metrics.tasks_killed;
-      break;
-    case TASK_LOST:
-      ++slave->metrics.tasks_lost;
-      break;
-    default:
-      LOG(WARNING) << "Unhandled task state " << status.state()
-                   << " on completion.";
-      break;
-  }
-
-  // TODO(dhamon): Update source/reason metrics.
-  terminatedTasks[taskId] = CHECK_NOTNULL(task);
-}
-
-
 void Executor::completeTask(const TaskID& taskId)
 {
   VLOG(1) << "Completing task " << taskId;
@@ -6220,21 +6185,28 @@ void Executor::recoverTask(const TaskState& state)
 
   // Read updates to get the latest state of the task.
   foreach (const StatusUpdate& update, state.updates) {
-    updateTaskState(update.status());
-
-    // Terminate the task if it received a terminal update.
-    // We ignore duplicate terminal updates by checking if
-    // the task is present in launchedTasks.
-    // TODO(vinod): Revisit these semantics when we disallow duplicate
-    // terminal updates (e.g., when slave recovery is always enabled).
-    if (protobuf::isTerminalState(update.status().state()) &&
-        launchedTasks.contains(state.id)) {
-      terminateTask(state.id, update.status());
+    Try<Nothing> updated = updateTaskState(update.status());
+
+    // TODO(bmahler): We only log this error because we used to
+    // allow multiple terminal updates and so we may encounter
+    // this when recovering an old executor. We can hard-CHECK
+    // this 6 months from 1.1.0.
+    if (updated.isError()) {
+      LOG(ERROR) << "Failed to update state of recovered task"
+                 << " '" << state.id << "' to " << update.status().state()
+                 << ": " << updated.error();
+
+      // The only case that should be possible here is when the
+      // task had multiple terminal updates persisted.
+      continue;
+    }
 
+    // Complete the task if it is terminal and
+    // the update has been acknowledged.
+    if (protobuf::isTerminalState(update.status().state())) {
       CHECK(update.has_uuid())
         << "Expecting updates without 'uuid' to have been rejected";
 
-      // If the terminal update has been acknowledged, remove it.
       if (state.acks.contains(UUID::fromBytes(update.uuid()).get())) {
         completeTask(state.id);
       }
@@ -6244,18 +6216,64 @@ void Executor::recoverTask(const TaskState& state)
 }
 
 
-void Executor::updateTaskState(const TaskStatus& status)
+Try<Nothing> Executor::updateTaskState(const TaskStatus& status)
 {
-  if (launchedTasks.contains(status.task_id())) {
-    Task* task = launchedTasks[status.task_id()];
-    // TODO(brenden): Consider wiping the `data` and `message` fields?
-    if (task->statuses_size() > 0 &&
-        task->statuses(task->statuses_size() - 1).state() == status.state()) {
-      task->mutable_statuses()->RemoveLast();
+  bool terminal = protobuf::isTerminalState(status.state());
+
+  const TaskID& taskId = status.task_id();
+  Task* task = nullptr;
+
+  if (queuedTasks.contains(taskId)) {
+    if (terminal) {
+      task = new Task(protobuf::createTask(
+          queuedTasks.at(taskId),
+          status.state(),
+          frameworkId));
+      queuedTasks.erase(taskId);
+      terminatedTasks[taskId] = task;
+    } else {
+      return Error("Cannot send non-terminal update for queued task");
+    }
+  } else if (launchedTasks.contains(taskId)) {
+    task = launchedTasks.at(status.task_id());
+
+    if (terminal) {
+      resources -= task->resources(); // Release the resources.
+      launchedTasks.erase(taskId);
+      terminatedTasks[taskId] = task;
+    }
+  } else if (terminatedTasks.contains(taskId)) {
+    return Error("Task is already terminated with state"
+                 " " + stringify(terminatedTasks.at(taskId)->state()));
+  } else {
+    return Error("Task is unknown");
+  }
+
+  CHECK_NOTNULL(task);
+
+  // TODO(brenden): Consider wiping the `data` and `message` fields?
+  if (task->statuses_size() > 0 &&
+      task->statuses(task->statuses_size() - 1).state() == status.state()) {
+    task->mutable_statuses()->RemoveLast();
+  }
+  task->add_statuses()->CopyFrom(status);
+  task->set_state(status.state());
+
+  // TODO(bmahler): This only increments the state when the update
+  // can be handled. Should we always increment the state?
+  if (terminal) {
+    switch (status.state()) {
+      case TASK_FINISHED: ++slave->metrics.tasks_finished; break;
+      case TASK_FAILED:   ++slave->metrics.tasks_failed;   break;
+      case TASK_KILLED:   ++slave->metrics.tasks_killed;   break;
+      case TASK_LOST:     ++slave->metrics.tasks_lost;     break;
+      default:
+        LOG(ERROR) << "Unexpected terminal task state " << status.state();
+        break;
     }
-    task->add_statuses()->CopyFrom(status);
-    task->set_state(status.state());
   }
+
+  return Nothing();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f019d80/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 7ca9923..4add4c0 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -787,12 +787,11 @@ struct Executor
   ~Executor();
 
   Task* addTask(const TaskInfo& task);
-  void terminateTask(const TaskID& taskId, const mesos::TaskStatus& status);
   void completeTask(const TaskID& taskId);
   void checkpointExecutor();
   void checkpointTask(const TaskInfo& task);
   void recoverTask(const state::TaskState& state);
-  void updateTaskState(const TaskStatus& status);
+  Try<Nothing> updateTaskState(const TaskStatus& status);
 
   // Returns true if there are any queued/launched/terminated tasks.
   bool incompleteTasks();

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f019d80/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index dcf8454..84ee37c 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -166,6 +166,117 @@ TEST_F(SlaveTest, Shutdown)
 }
 
 
+// This test verifies that the slave rejects duplicate terminal
+// status updates for tasks before the first terminal update is
+// acknowledged.
+TEST_F(SlaveTest, DuplicateTerminalUpdateBeforeAck)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Drop the first ACK from the scheduler to the slave.
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
+
+  Future<Nothing> ___statusUpdate =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
+
+  TaskInfo task;
+  task.set_name("test-task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers->at(0).slave_id());
+  task.mutable_resources()->MergeFrom(offers->at(0).resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  driver.launchTasks(offers->at(0).id(), {task});
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status->state());
+
+  AWAIT_READY(statusUpdateAckMessage);
+
+  // At this point the status update manager has enqueued
+  // TASK_FINISHED update.
+  AWAIT_READY(___statusUpdate);
+
+  Future<Nothing> _statusUpdate2 =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdate);
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // At this point the slave has handled the TASK_KILLED update.
+  AWAIT_READY(_statusUpdate2);
+
+  // After we advance the clock, the scheduler should receive
+  // the retried TASK_FINISHED update and acknowledge it.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Ensure the scheduler receives TASK_FINISHED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_FINISHED, update->state());
+
+  // Settle the clock to ensure that TASK_KILLED is not sent.
+  Clock::settle();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
 TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
 {
   Try<Owned<cluster::Master>> master = StartMaster();

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f019d80/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 7b6fe31..38d8913 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -458,114 +458,6 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
 
 // This test verifies that the slave and status update manager
 // properly handle duplicate terminal status updates, when the
-// second update is received before the ACK for the first update.
-// The proper behavior here is for the status update manager to
-// drop the duplicate update.
-TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  FrameworkID frameworkId;
-  EXPECT_CALL(sched, registered(_, _, _))
-    .WillOnce(SaveArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .WillOnce(SaveArg<0>(&execDriver));
-
-  // Send a terminal update right away.
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  // Drop the first ACK from the scheduler to the slave.
-  Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
-    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
-
-  Future<Nothing> ___statusUpdate =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
-
-  Clock::pause();
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_FINISHED, status.get().state());
-
-  AWAIT_READY(statusUpdateAckMessage);
-
-  // At this point the status update manager has enqueued
-  // TASK_FINISHED update.
-  AWAIT_READY(___statusUpdate);
-
-  Future<Nothing> ___statusUpdate2 =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
-
-  // Now send a TASK_KILLED update for the same task.
-  TaskStatus status2 = status.get();
-  status2.set_state(TASK_KILLED);
-  execDriver->sendStatusUpdate(status2);
-
-  // At this point the status update manager has enqueued
-  // TASK_FINISHED and TASK_KILLED updates.
-  AWAIT_READY(___statusUpdate2);
-
-  // After we advance the clock, the scheduler should receive
-  // the retried TASK_FINISHED update and acknowledge it. The
-  // TASK_KILLED update should be dropped by the status update
-  // manager, as the stream is already terminated.
-  Future<TaskStatus> update;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&update));
-
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
-  Clock::settle();
-
-  // Ensure the scheduler receives TASK_FINISHED.
-  AWAIT_READY(update);
-  EXPECT_EQ(TASK_FINISHED, update.get().state());
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  Clock::resume();
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that the slave and status update manager
-// properly handle duplicate terminal status updates, when the
 // second update is received after the ACK for the first update.
 // The proper behavior here is for the status update manager to
 // forward the duplicate update to the scheduler.