You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/10/04 01:57:17 UTC

[1/3] mesos git commit: Added 'at' to LinkedHashMap.

Repository: mesos
Updated Branches:
  refs/heads/1.0.x 6e503351a -> 837bb4ecc


Added 'at' to LinkedHashMap.

Review: https://reviews.apache.org/r/51403


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a333a2ed
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a333a2ed
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a333a2ed

Branch: refs/heads/1.0.x
Commit: a333a2edcf633c2075aad32660cb6222691b1931
Parents: 6e50335
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Aug 24 18:03:54 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 3 18:40:16 2016 -0700

----------------------------------------------------------------------
 3rdparty/stout/include/stout/linkedhashmap.hpp | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a333a2ed/3rdparty/stout/include/stout/linkedhashmap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/linkedhashmap.hpp b/3rdparty/stout/include/stout/linkedhashmap.hpp
index 6e06260..ef1578e 100644
--- a/3rdparty/stout/include/stout/linkedhashmap.hpp
+++ b/3rdparty/stout/include/stout/linkedhashmap.hpp
@@ -49,6 +49,16 @@ public:
     return None();
   }
 
+  Value& at(const Key& key)
+  {
+    return values_.at(key).first;
+  }
+
+  const Value& at(const Key& key) const
+  {
+    return values_.at(key).first;
+  }
+
   bool contains(const Key& key) const
   {
     return values_.contains(key);


[3/3] mesos git commit: Backported MESOS-6026 to 1.0.2.

Posted by jo...@apache.org.
Backported MESOS-6026 to 1.0.2.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/837bb4ec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/837bb4ec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/837bb4ec

Branch: refs/heads/1.0.x
Commit: 837bb4ecc0424b4cd6c7996bd6752ec00b437322
Parents: d1b4b72
Author: Joseph Wu <jo...@apache.org>
Authored: Mon Oct 3 18:45:36 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 3 18:45:36 2016 -0700

----------------------------------------------------------------------
 CHANGELOG | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/837bb4ec/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index c254d2c..ca9192d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -16,7 +16,8 @@ All Issues:
     * [MESOS-6233] - Master CHECK fails during recovery while relinking to other masters.
     * [MESOS-6234] - Potential socket leak during Zookeeper network changes.
     * [MESOS-6246] - Libprocess links will not generate an ExitedEvent if the socket creation fails.
-    * [MESOS-6299] - Master doesn't remove task from pending when it is invalid 
+    * [MESOS-6299] - Master doesn't remove task from pending when it is invalid.
+    * [MESOS-6026] - Tasks mistakenly marked as FAILED due to race b/w sendExecutorTerminatedStatusUpdate() and _statusUpdate().
 
 
 Release Notes - Mesos - Version 1.0.1


[2/3] mesos git commit: Updated the agent to reject multiple terminal status updates.

Posted by jo...@apache.org.
Updated the agent to reject multiple terminal status updates.

Currently the agent will allow terminal status updates when updating
the state of a task. This allows the status update invariants to be
violated! This patch disallows multiple terminal updates.

Review: https://reviews.apache.org/r/51404


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d1b4b72d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d1b4b72d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d1b4b72d

Branch: refs/heads/1.0.x
Commit: d1b4b72dd14ab23ed731b8cf46edd8cd5d5b0058
Parents: a333a2e
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 12 19:44:43 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 3 18:41:26 2016 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                       | 168 ++++++++++++++-----------
 src/slave/slave.hpp                       |   3 +-
 src/tests/slave_tests.cpp                 | 111 ++++++++++++++++
 src/tests/status_update_manager_tests.cpp | 108 ----------------
 4 files changed, 205 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d1b4b72d/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 47364b7..09be083 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3427,18 +3427,26 @@ void Slave::_statusUpdate(
   // task is sent only after the acknowledgement for the previous one
   // is received, which could take a long time if the framework is
   // backed up or is down.
-  executor->updateTaskState(status);
-
-  // Handle the task appropriately if it is terminated.
-  // TODO(vinod): Revisit these semantics when we disallow duplicate
-  // terminal updates (e.g., when slave recovery is always enabled).
-  if (protobuf::isTerminalState(status.state()) &&
-      (executor->queuedTasks.contains(status.task_id()) ||
-       executor->launchedTasks.contains(status.task_id()))) {
-    executor->terminateTask(status.task_id(), status);
-
-    // Wait until the container's resources have been updated before
-    // sending the status update.
+  Try<Nothing> updated = executor->updateTaskState(status);
+
+  // If we fail to update the task state, drop the update. Note that
+  // we have to acknowledge the executor so that it does not retry.
+  if (updated.isError()) {
+    LOG(ERROR) << "Failed to update state of task '" << status.task_id() << "'"
+               << " to " << status.state() << ": " << updated.error();
+
+    // NOTE: This may lead to out-of-order acknowledgements since
+    // other update acknowledgements may be waiting for the
+    // containerizer or status update manager.
+    ___statusUpdate(Nothing(), update, pid);
+    return;
+  }
+
+  if (protobuf::isTerminalState(status.state())) {
+    // If the task terminated, wait until the container's resources
+    // have been updated before sending the status update. Note that
+    // duplicate terminal updates are not possible here because they
+    // lead to an error from `Executor::updateTaskState`.
     containerizer->update(executor->containerId, executor->resources)
       .onAny(defer(self(),
                    &Slave::__statusUpdate,
@@ -6023,49 +6031,6 @@ Task* Executor::addTask(const TaskInfo& task)
 }
 
 
-void Executor::terminateTask(
-    const TaskID& taskId,
-    const mesos::TaskStatus& status)
-{
-  VLOG(1) << "Terminating task " << taskId;
-
-  Task* task = nullptr;
-  // Remove the task if it's queued.
-  if (queuedTasks.contains(taskId)) {
-    task = new Task(
-        protobuf::createTask(queuedTasks[taskId], status.state(), frameworkId));
-    queuedTasks.erase(taskId);
-  } else if (launchedTasks.contains(taskId)) {
-    // Update the resources if it's been launched.
-    task = launchedTasks[taskId];
-    resources -= task->resources();
-    launchedTasks.erase(taskId);
-  }
-
-  switch (status.state()) {
-    case TASK_FINISHED:
-      ++slave->metrics.tasks_finished;
-      break;
-    case TASK_FAILED:
-      ++slave->metrics.tasks_failed;
-      break;
-    case TASK_KILLED:
-      ++slave->metrics.tasks_killed;
-      break;
-    case TASK_LOST:
-      ++slave->metrics.tasks_lost;
-      break;
-    default:
-      LOG(WARNING) << "Unhandled task state " << status.state()
-                   << " on completion.";
-      break;
-  }
-
-  // TODO(dhamon): Update source/reason metrics.
-  terminatedTasks[taskId] = CHECK_NOTNULL(task);
-}
-
-
 void Executor::completeTask(const TaskID& taskId)
 {
   VLOG(1) << "Completing task " << taskId;
@@ -6135,21 +6100,28 @@ void Executor::recoverTask(const TaskState& state)
 
   // Read updates to get the latest state of the task.
   foreach (const StatusUpdate& update, state.updates) {
-    updateTaskState(update.status());
-
-    // Terminate the task if it received a terminal update.
-    // We ignore duplicate terminal updates by checking if
-    // the task is present in launchedTasks.
-    // TODO(vinod): Revisit these semantics when we disallow duplicate
-    // terminal updates (e.g., when slave recovery is always enabled).
-    if (protobuf::isTerminalState(update.status().state()) &&
-        launchedTasks.contains(state.id)) {
-      terminateTask(state.id, update.status());
+    Try<Nothing> updated = updateTaskState(update.status());
+
+    // TODO(bmahler): We only log this error because we used to
+    // allow multiple terminal updates and so we may encounter
+    // this when recovering an old executor. We can hard-CHECK
+    // this 6 months from 1.1.0.
+    if (updated.isError()) {
+      LOG(ERROR) << "Failed to update state of recovered task"
+                 << " '" << state.id << "' to " << update.status().state()
+                 << ": " << updated.error();
+
+      // The only case that should be possible here is when the
+      // task had multiple terminal updates persisted.
+      continue;
+    }
 
+    // Complete the task if it is terminal and
+    // the update has been acknowledged.
+    if (protobuf::isTerminalState(update.status().state())) {
       CHECK(update.has_uuid())
         << "Expecting updates without 'uuid' to have been rejected";
 
-      // If the terminal update has been acknowledged, remove it.
       if (state.acks.contains(UUID::fromBytes(update.uuid()).get())) {
         completeTask(state.id);
       }
@@ -6159,18 +6131,64 @@ void Executor::recoverTask(const TaskState& state)
 }
 
 
-void Executor::updateTaskState(const TaskStatus& status)
+Try<Nothing> Executor::updateTaskState(const TaskStatus& status)
 {
-  if (launchedTasks.contains(status.task_id())) {
-    Task* task = launchedTasks[status.task_id()];
-    // TODO(brenden): Consider wiping the `data` and `message` fields?
-    if (task->statuses_size() > 0 &&
-        task->statuses(task->statuses_size() - 1).state() == status.state()) {
-      task->mutable_statuses()->RemoveLast();
+  bool terminal = protobuf::isTerminalState(status.state());
+
+  const TaskID& taskId = status.task_id();
+  Task* task = nullptr;
+
+  if (queuedTasks.contains(taskId)) {
+    if (terminal) {
+      task = new Task(protobuf::createTask(
+          queuedTasks.at(taskId),
+          status.state(),
+          frameworkId));
+      queuedTasks.erase(taskId);
+      terminatedTasks[taskId] = task;
+    } else {
+      return Error("Cannot send non-terminal update for queued task");
+    }
+  } else if (launchedTasks.contains(taskId)) {
+    task = launchedTasks.at(status.task_id());
+
+    if (terminal) {
+      resources -= task->resources(); // Release the resources.
+      launchedTasks.erase(taskId);
+      terminatedTasks[taskId] = task;
+    }
+  } else if (terminatedTasks.contains(taskId)) {
+    return Error("Task is already terminated with state"
+                 " " + stringify(terminatedTasks.at(taskId)->state()));
+  } else {
+    return Error("Task is unknown");
+  }
+
+  CHECK_NOTNULL(task);
+
+  // TODO(brenden): Consider wiping the `data` and `message` fields?
+  if (task->statuses_size() > 0 &&
+      task->statuses(task->statuses_size() - 1).state() == status.state()) {
+    task->mutable_statuses()->RemoveLast();
+  }
+  task->add_statuses()->CopyFrom(status);
+  task->set_state(status.state());
+
+  // TODO(bmahler): This only increments the state when the update
+  // can be handled. Should we always increment the state?
+  if (terminal) {
+    switch (status.state()) {
+      case TASK_FINISHED: ++slave->metrics.tasks_finished; break;
+      case TASK_FAILED:   ++slave->metrics.tasks_failed;   break;
+      case TASK_KILLED:   ++slave->metrics.tasks_killed;   break;
+      case TASK_LOST:     ++slave->metrics.tasks_lost;     break;
+      default:
+        LOG(ERROR) << "Unexpected terminal task state " << status.state();
+        break;
     }
-    task->add_statuses()->CopyFrom(status);
-    task->set_state(status.state());
   }
+
+  return Nothing();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d1b4b72d/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index a8952f0..95223ec 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -738,12 +738,11 @@ struct Executor
   ~Executor();
 
   Task* addTask(const TaskInfo& task);
-  void terminateTask(const TaskID& taskId, const mesos::TaskStatus& status);
   void completeTask(const TaskID& taskId);
   void checkpointExecutor();
   void checkpointTask(const TaskInfo& task);
   void recoverTask(const state::TaskState& state);
-  void updateTaskState(const TaskStatus& status);
+  Try<Nothing> updateTaskState(const TaskStatus& status);
 
   // Returns true if there are any queued/launched/terminated tasks.
   bool incompleteTasks();

http://git-wip-us.apache.org/repos/asf/mesos/blob/d1b4b72d/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index b9fa85d..ea8f403 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -166,6 +166,117 @@ TEST_F(SlaveTest, Shutdown)
 }
 
 
+// This test verifies that the slave rejects duplicate terminal
+// status updates for tasks before the first terminal update is
+// acknowledged.
+TEST_F(SlaveTest, DuplicateTerminalUpdateBeforeAck)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Drop the first ACK from the scheduler to the slave.
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
+
+  Future<Nothing> ___statusUpdate =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
+
+  TaskInfo task;
+  task.set_name("test-task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers->at(0).slave_id());
+  task.mutable_resources()->MergeFrom(offers->at(0).resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  driver.launchTasks(offers->at(0).id(), {task});
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status->state());
+
+  AWAIT_READY(statusUpdateAckMessage);
+
+  // At this point the status update manager has enqueued
+  // TASK_FINISHED update.
+  AWAIT_READY(___statusUpdate);
+
+  Future<Nothing> _statusUpdate2 =
+    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdate);
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // At this point the slave has handled the TASK_KILLED update.
+  AWAIT_READY(_statusUpdate2);
+
+  // After we advance the clock, the scheduler should receive
+  // the retried TASK_FINISHED update and acknowledge it.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Ensure the scheduler receives TASK_FINISHED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_FINISHED, update->state());
+
+  // Settle the clock to ensure that TASK_KILLED is not sent.
+  Clock::settle();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
 TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
 {
   Try<Owned<cluster::Master>> master = StartMaster();

http://git-wip-us.apache.org/repos/asf/mesos/blob/d1b4b72d/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index f5313a1..4b67700 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -458,114 +458,6 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
 
 // This test verifies that the slave and status update manager
 // properly handle duplicate terminal status updates, when the
-// second update is received before the ACK for the first update.
-// The proper behavior here is for the status update manager to
-// drop the duplicate update.
-TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-  TestContainerizer containerizer(&exec);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
-  ASSERT_SOME(slave);
-
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
-  FrameworkID frameworkId;
-  EXPECT_CALL(sched, registered(_, _, _))
-    .WillOnce(SaveArg<1>(&frameworkId));
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-
-  ExecutorDriver* execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .WillOnce(SaveArg<0>(&execDriver));
-
-  // Send a terminal update right away.
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
-
-  Future<TaskStatus> status;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
-
-  // Drop the first ACK from the scheduler to the slave.
-  Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
-    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
-
-  Future<Nothing> ___statusUpdate =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
-
-  Clock::pause();
-
-  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
-  AWAIT_READY(status);
-
-  EXPECT_EQ(TASK_FINISHED, status.get().state());
-
-  AWAIT_READY(statusUpdateAckMessage);
-
-  // At this point the status update manager has enqueued
-  // TASK_FINISHED update.
-  AWAIT_READY(___statusUpdate);
-
-  Future<Nothing> ___statusUpdate2 =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
-
-  // Now send a TASK_KILLED update for the same task.
-  TaskStatus status2 = status.get();
-  status2.set_state(TASK_KILLED);
-  execDriver->sendStatusUpdate(status2);
-
-  // At this point the status update manager has enqueued
-  // TASK_FINISHED and TASK_KILLED updates.
-  AWAIT_READY(___statusUpdate2);
-
-  // After we advance the clock, the scheduler should receive
-  // the retried TASK_FINISHED update and acknowledge it. The
-  // TASK_KILLED update should be dropped by the status update
-  // manager, as the stream is already terminated.
-  Future<TaskStatus> update;
-  EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&update));
-
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
-  Clock::settle();
-
-  // Ensure the scheduler receives TASK_FINISHED.
-  AWAIT_READY(update);
-  EXPECT_EQ(TASK_FINISHED, update.get().state());
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  Clock::resume();
-
-  driver.stop();
-  driver.join();
-}
-
-
-// This test verifies that the slave and status update manager
-// properly handle duplicate terminal status updates, when the
 // second update is received after the ACK for the first update.
 // The proper behavior here is for the status update manager to
 // forward the duplicate update to the scheduler.