You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2016/08/25 02:59:55 UTC

[1/2] mesos git commit: Added 'at' to LinkedHashMap.

Repository: mesos
Updated Branches:
  refs/heads/master b5dd33397 -> 2f019d801


Added 'at' to LinkedHashMap.

Review: https://reviews.apache.org/r/51403


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8bb12e81
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8bb12e81
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8bb12e81

Branch: refs/heads/master
Commit: 8bb12e81b938ad691ebc9092345b426c7ab25515
Parents: b5dd333
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Aug 24 18:03:54 2016 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 24 19:54:25 2016 -0700

----------------------------------------------------------------------
 3rdparty/stout/include/stout/linkedhashmap.hpp | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8bb12e81/3rdparty/stout/include/stout/linkedhashmap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/linkedhashmap.hpp b/3rdparty/stout/include/stout/linkedhashmap.hpp
index 38acdc8..f48cc59 100644
--- a/3rdparty/stout/include/stout/linkedhashmap.hpp
+++ b/3rdparty/stout/include/stout/linkedhashmap.hpp
@@ -49,6 +49,16 @@ public:
     return None();
   }
 
+  Value& at(const Key& key)
+  {
+    return values_.at(key).first;
+  }
+
+  const Value& at(const Key& key) const
+  {
+    return values_.at(key).first;
+  }
+
   bool contains(const Key& key) const
   {
     return values_.contains(key);


[2/2] mesos git commit: Updated the agent to reject multiple terminal status updates.

Posted by bm...@apache.org.
Updated the agent to reject multiple terminal status updates.

Currently the agent will allow terminal status updates when updating
the state of a task. This allows the status update invariants to be
violated! This patch disallows multiple terminal updates.

Review: https://reviews.apache.org/r/51404


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2f019d80
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2f019d80
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2f019d80

Branch: refs/heads/master
Commit: 2f019d801d9a9458e10bb41f176622a280415a90
Parents: 8bb12e8
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 12 19:44:43 2016 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 24 19:55:19 2016 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                       | 168 ++++++++++++++-----------
 src/slave/slave.hpp                       |   3 +-
 src/tests/slave_tests.cpp                 | 111 ++++++++++++++++
 src/tests/status_update_manager_tests.cpp | 108 ----------------
 4 files changed, 205 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2f019d80/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c686a97..5d162d0 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3465,18 +3465,26 @@ void Slave::_statusUpdate(
   // task is sent only after the acknowledgement for the previous one
   // is received, which could take a long time if the framework is
   // backed up or is down.
-  executor->updateTaskState(status);
-
-  // Handle the task appropriately if it is terminated.
-  // TODO(vinod): Revisit these semantics when we disallow duplicate
-  // terminal updates (e.g., when slave recovery is always enabled).
-  if (protobuf::isTerminalState(status.state()) &&
-      (executor->queuedTasks.contains(status.task_id()) ||
-       executor->launchedTasks.contains(status.task_id()))) {
-    executor->terminateTask(status.task_id(), status);
-
-    // Wait until the container's resources have been updated before
-    // sending the status update.
+  Try<Nothing> updated = executor->updateTaskState(status);
+
+  // If we fail to update the task state, drop the update. Note that
+  // we have to acknowledge the executor so that it does not retry.
+  if (updated.isError()) {
+    LOG(ERROR) << "Failed to update state of task '" << status.task_id() << "'"
+               << " to " << status.state() << ": " << updated.error();
+
+    // NOTE: This may lead to out-of-order acknowledgements since
+    // other update acknowledgements may be waiting for the
+    // containerizer or status update manager.
+    ___statusUpdate(Nothing(), update, pid);
+    return;
+  }
+
+  if (protobuf::isTerminalState(status.state())) {
+    // If the task terminated, wait until the container's resources
+    // have been updated before sending the status update. Note that
+    // duplicate terminal updates are not possible here because they
+    // lead to an error from `Executor::updateTaskState`.
     containerizer->update(executor->containerId, executor->resources)
       .onAny(defer(self(),
                    &Slave::__statusUpdate,
@@ -6108,49 +6116,6 @@ Task* Executor::addTask(const TaskInfo& task)
 }
 
 
-void Executor::terminateTask(
-    const TaskID& taskId,
-    const mesos::TaskStatus& status)
-{
-  VLOG(1) << "Terminating task " << taskId;
-
-  Task* task = nullptr;
-  // Remove the task if it's queued.
-  if (queuedTasks.contains(taskId)) {
-    task = new Task(
-        protobuf::createTask(queuedTasks[taskId], status.state(), frameworkId));
-    queuedTasks.erase(taskId);
-  } else if (launchedTasks.contains(taskId)) {
-    // Update the resources if it's been launched.
-    task = launchedTasks[taskId];
-    resources -= task->resources();
-    launchedTasks.erase(taskId);
-  }
-
-  switch (status.state()) {
-    case TASK_FINISHED:
-      ++slave->metrics.tasks_finished;
-      break;
-    case TASK_FAILED:
-      ++slave->metrics.tasks_failed;
-      break;
-    case TASK_KILLED:
-      ++slave->metrics.tasks_killed;
-      break;
-    case TASK_LOST:
-      ++slave->metrics.tasks_lost;
-      break;
-    default:
-      LOG(WARNING) << "Unhandled task state " << status.state()
-                   << " on completion.";
-      break;
-  }
-
-  // TODO(dhamon): Update source/reason metrics.
-  terminatedTasks[taskId] = CHECK_NOTNULL(task);
-}
-
-
 void Executor::completeTask(const TaskID& taskId)
 {
   VLOG(1) << "Completing task " << taskId;
@@ -6220,21 +6185,28 @@ void Executor::recoverTask(const TaskState& state)
 
   // Read updates to get the latest state of the task.
   foreach (const StatusUpdate& update, state.updates) {
-    updateTaskState(update.status());
-
-    // Terminate the task if it received a terminal update.
-    // We ignore duplicate terminal updates by checking if
-    // the task is present in launchedTasks.
-    // TODO(vinod): Revisit these semantics when we disallow duplicate
-    // terminal updates (e.g., when slave recovery is always enabled).
-    if (protobuf::isTerminalState(update.status().state()) &&
-        launchedTasks.contains(state.id)) {
-      terminateTask(state.id, update.status());
+    Try<Nothing> updated = updateTaskState(update.status());
+
+    // TODO(bmahler): We only log this error because we used to
+    // allow multiple terminal updates and so we may encounter
+    // this when recovering an old executor. We can hard-CHECK
+    // this 6 months from 1.1.0.
+    if (updated.isError()) {
+      LOG(ERROR) << "Failed to update state of recovered task"
+                 << " '" << state.id << "' to " << update.status().state()
+                 << ": " << updated.error();
+
+      // The only case that should be possible here is when the
+      // task had multiple terminal updates persisted.
+      continue;
+    }
 
+    // Complete the task if it is terminal and
+    // the update has been acknowledged.
+    if (protobuf::isTerminalState(update.status().state())) {
       CHECK(update.has_uuid())
         << "Expecting updates without 'uuid' to have been rejected";
 
-      // If the terminal update has been acknowledged, remove it.
       if (state.acks.contains(UUID::fromBytes(update.uuid()).get())) {
         completeTask(state.id);
       }
@@ -6244,18 +6216,64 @@ void Executor::recoverTask(const TaskState& state)
 }
 
 
-void Executor::updateTaskState(const TaskStatus& status)
+Try<Nothing> Executor::updateTaskState(const TaskStatus& status)
 {
-  if (launchedTasks.contains(status.task_id())) {
-    Task* task = launchedTasks[status.task_id()];
-    // TODO(brenden): Consider wiping the `data` and `message` fields?
-    if (task->statuses_size() > 0 &&
-        task->statuses(task->statuses_size() - 1).state() == status.state()) {
-      task->mutable_statuses()->RemoveLast();
+  bool terminal = protobuf::isTerminalState(status.state());
+
+  const TaskID& taskId = status.task_id();
+  Task* task = nullptr;
+
+  if (queuedTasks.contains(taskId)) {
+    if (terminal) {
+      task = new Task(protobuf::createTask(
+          queuedTasks.at(taskId),
+          status.state(),
+          frameworkId));
+      queuedTasks.erase(taskId);
+      terminatedTasks[taskId] = task;
+    } else {
+      return Error("Cannot send non-terminal update for queued task");
+    }
+  } else if (launchedTasks.contains(taskId)) {
+    task = launchedTasks.at(status.task_id());
+
+    if (terminal) {
+      resources -= task->resources(); // Release the resources.
+      launchedTasks.erase(taskId);
+      terminatedTasks[taskId] = task;
+    }
+  } else if (terminatedTasks.contains(taskId)) {
+    return Error("Task is already terminated with state"
+                 " " + stringify(terminatedTasks.at(taskId)->state()));
+  } else {
+    return Error("Task is unknown");
+  }
+
+  CHECK_NOTNULL(task);
+
+  // TODO(brenden): Consider wiping the `data` and `message` fields?
+  if (task->statuses_size() > 0 &&
+      task->statuses(task->statuses_size() - 1).state() == status.state()) {
+    task->mutable_statuses()->RemoveLast();
+  }
+  task->add_statuses()->CopyFrom(status);
+  task->set_state(status.state());
+
+  // TODO(bmahler): This only increments the state when the update
+  // can be handled. Should we always increment the state?
+  if (terminal) {
+    switch (status.state()) {
+      case TASK_FINISHED: ++slave->metrics.tasks_finished; break;
+      case TASK_FAILED:   ++slave->metrics.tasks_failed;   break;
+      case TASK_KILLED:   ++slave->metrics.tasks_killed;   break;
+      case TASK_LOST:     ++slave->metrics.tasks_lost;     break;
+      default:
+        LOG(ERROR) << "Unexpected terminal task state " << status.state();
+        break;
     }
-    task->add_statuses()->CopyFrom(status);
-    task->set_state(status.state());
   }
+
+  return Nothing();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f019d80/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 7ca9923..4add4c0 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -787,12 +787,11 @@ struct Executor
   ~Executor();
 
   Task* addTask(const TaskInfo& task);
-  void terminateTask(const TaskID& taskId, const mesos::TaskStatus& status);
   void completeTask(const TaskID& taskId);
   void checkpointExecutor();
   void checkpointTask(const TaskInfo& task);
   void recoverTask(const state::TaskState& state);
-  void updateTaskState(const TaskStatus& status);
+  Try<Nothing> updateTaskState(const TaskStatus& status);
 
   // Returns true if there are any queued/launched/terminated tasks.
   bool incompleteTasks();

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f019d80/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index dcf8454..84ee37c 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -166,6 +166,117 @@ TEST_F(SlaveTest, Shutdown)
 }
 
 
+// This test verifies that the slave rejects duplicate terminal
+// status updates for tasks before the first terminal update is
+// acknowledged.
+TEST_F(SlaveTest, DuplicateTerminalUpdateBeforeAck)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Drop the first ACK from the scheduler to the slave.
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
+
+  Future<Nothing> ___statusUpdate =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
+
+  TaskInfo task;
+  task.set_name("test-task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers->at(0).slave_id());
+  task.mutable_resources()->MergeFrom(offers->at(0).resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  driver.launchTasks(offers->at(0).id(), {task});
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status->state());
+
+  AWAIT_READY(statusUpdateAckMessage);
+
+  // At this point the status update manager has enqueued
+  // TASK_FINISHED update.
+  AWAIT_READY(___statusUpdate);
+
+  Future<Nothing> _statusUpdate2 =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdate);
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // At this point the slave has handled the TASK_KILLED update.
+  AWAIT_READY(_statusUpdate2);
+
+  // After we advance the clock, the scheduler should receive
+  // the retried TASK_FINISHED update and acknowledge it.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Ensure the scheduler receives TASK_FINISHED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_FINISHED, update->state());
+
+  // Settle the clock to ensure that TASK_KILLED is not sent.
+  Clock::settle();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
 TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
 {
   Try<Owned<cluster::Master>> master = StartMaster();

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f019d80/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 7b6fe31..38d8913 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -458,114 +458,6 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
 
 // This test verifies that the slave and status update manager
 // properly handle duplicate terminal status updates, when the
-// second update is received before the ACK for the first update.
-// The proper behavior here is for the status update manager to
-// drop the duplicate update.
-TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  FrameworkID frameworkId;
-  EXPECT_CALL(sched, registered(_, _, _))
-    .WillOnce(SaveArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .WillOnce(SaveArg<0>(&execDriver));
-
-  // Send a terminal update right away.
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  // Drop the first ACK from the scheduler to the slave.
-  Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
-    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
-
-  Future<Nothing> ___statusUpdate =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
-
-  Clock::pause();
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_FINISHED, status.get().state());
-
-  AWAIT_READY(statusUpdateAckMessage);
-
-  // At this point the status update manager has enqueued
-  // TASK_FINISHED update.
-  AWAIT_READY(___statusUpdate);
-
-  Future<Nothing> ___statusUpdate2 =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
-
-  // Now send a TASK_KILLED update for the same task.
-  TaskStatus status2 = status.get();
-  status2.set_state(TASK_KILLED);
-  execDriver->sendStatusUpdate(status2);
-
-  // At this point the status update manager has enqueued
-  // TASK_FINISHED and TASK_KILLED updates.
-  AWAIT_READY(___statusUpdate2);
-
-  // After we advance the clock, the scheduler should receive
-  // the retried TASK_FINISHED update and acknowledge it. The
-  // TASK_KILLED update should be dropped by the status update
-  // manager, as the stream is already terminated.
-  Future<TaskStatus> update;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&update));
-
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
-  Clock::settle();
-
-  // Ensure the scheduler receives TASK_FINISHED.
-  AWAIT_READY(update);
-  EXPECT_EQ(TASK_FINISHED, update.get().state());
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  Clock::resume();
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that the slave and status update manager
-// properly handle duplicate terminal status updates, when the
 // second update is received after the ACK for the first update.
 // The proper behavior here is for the status update manager to
 // forward the duplicate update to the scheduler.