You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/11/29 23:15:43 UTC

mesos git commit: Do not kill non-partition-aware tasks upon agent reregistration.

Repository: mesos
Updated Branches:
  refs/heads/master 5bfdc3bec -> 9d0cadc82


Do not kill non-partition-aware tasks upon agent reregistration.

The master used to send a ShutdownFrameworkMessage to the agent
to kill the tasks from non-partition-aware framework but this
misuse of ShutdownFrameworkMessage has led to problematic behavior
in MESOS-7215. This commit fixes the problem by not killing the
tasks for non-partition-aware frameworks when an unreachable agent
re-registers. Instead, the master just recovers these tasks.

Review: https://reviews.apache.org/r/61473/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d0cadc8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d0cadc8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d0cadc8

Branch: refs/heads/master
Commit: 9d0cadc82210a78cf632913808c496fd0ade713e
Parents: 5bfdc3b
Author: Megha Sharma <ms...@apple.com>
Authored: Wed Nov 29 14:24:13 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed Nov 29 15:15:29 2017 -0800

----------------------------------------------------------------------
 include/mesos/mesos.proto     | 14 +++---
 include/mesos/v1/mesos.proto  | 14 +++---
 src/master/http.cpp           | 31 +++++++++++-
 src/master/master.cpp         | 99 +++++++++++---------------------------
 src/master/master.hpp         | 31 ++++++------
 src/tests/partition_tests.cpp | 43 ++++++-----------
 6 files changed, 105 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index b1ebfe2..524665a 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -336,14 +336,16 @@ message FrameworkInfo {
       // Frameworks that enable this capability can define how they
       // would like to handle partitioned tasks. Frameworks will
       // receive TASK_UNREACHABLE for tasks on agents that are
-      // partitioned from the master. If/when a partitioned agent
-      // reregisters, tasks on the agent that were started by
-      // PARTITION_AWARE frameworks will not killed.
+      // partitioned from the master.
       //
       // Without this capability, frameworks will receive TASK_LOST
-      // for tasks on partitioned agents; such tasks will be killed by
-      // Mesos when the agent reregisters (unless the master has
-      // failed over).
+      // for tasks on partitioned agents.
+      // NOTE: Prior to Mesos 1.5, such tasks will be killed by Mesos
+      // when the agent reregisters (unless the master has failed over).
+      // However due to the lack of benefit in maintaining different
+      // behaviors depending on whether the master has failed over
+      // (see MESOS-7215), as of 1.5, Mesos will not kill these
+      // tasks in either case.
       PARTITION_AWARE = 5;
 
       // This expresses the ability for the framework to be

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index d535eb4..0ba6032 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -334,14 +334,16 @@ message FrameworkInfo {
       // Frameworks that enable this capability can define how they
       // would like to handle partitioned tasks. Frameworks will
       // receive TASK_UNREACHABLE for tasks on agents that are
-      // partitioned from the master. If/when a partitioned agent
-      // reregisters, tasks on the agent that were started by
-      // PARTITION_AWARE frameworks will not killed.
+      // partitioned from the master.
       //
       // Without this capability, frameworks will receive TASK_LOST
-      // for tasks on partitioned agents; such tasks will be killed by
-      // Mesos when the agent reregisters (unless the master has
-      // failed over).
+      // for tasks on partitioned agents.
+      // NOTE: Prior to Mesos 1.5, such tasks will be killed by Mesos
+      // when the agent reregisters (unless the master has failed over).
+      // However due to the lack of benefit in maintaining different
+      // behaviors depending on whether the master has failed over
+      // (see MESOS-7215), as of 1.5, Mesos will not kill these
+      // tasks in either case.
       PARTITION_AWARE = 5;
 
       // This expresses the ability for the framework to be

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 9dcdcbe..d1138a2 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -328,6 +328,12 @@ struct FullFrameworkWriter {
 
     writer->field("unreachable_tasks", [this](JSON::ArrayWriter* writer) {
       foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
+        // There could be TASK_LOST tasks in this map. See comment for
+        // `unreachableTasks`.
+        if (task.get()->state() != TASK_UNREACHABLE) {
+          continue;
+        }
+
         // Skip unauthorized tasks.
         if (!authorizeTask_->accept(*task, framework_->info)) {
           continue;
@@ -346,6 +352,22 @@ struct FullFrameworkWriter {
 
         writer->element(*task);
       }
+
+      // Unreachable tasks belonging to a non-partition-aware framework
+      // have been stored as TASK_LOST for backward compatibility so we
+      // should export them as completed tasks.
+      foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
+        if (task.get()->state() != TASK_LOST) {
+          continue;
+        }
+
+        // Skip unauthorized tasks.
+        if (!authorizeTask_->accept(*task, framework_->info)) {
+          continue;
+        }
+
+        writer->element(*task);
+      }
     });
 
     // Model all of the offers associated with a framework.
@@ -4221,7 +4243,14 @@ mesos::master::Response::GetTasks Master::Http::_getTasks(
         continue;
       }
 
-      getTasks.add_unreachable_tasks()->CopyFrom(*task);
+      if (task.get()->state() == TASK_UNREACHABLE) {
+        getTasks.add_unreachable_tasks()->CopyFrom(*task);
+      } else {
+        // Unreachable tasks belonging to a non-partition-aware framework
+        // have been stored as TASK_LOST for backward compatibility so we
+        // should export them as completed tasks.
+        getTasks.add_completed_tasks()->CopyFrom(*task);
+      }
     }
 
     // Completed tasks.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7bcdb74..1876d7d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6788,32 +6788,9 @@ void Master::__reregisterSlave(
     }
   }
 
-  // Check if this master was the one that removed the reregistering
-  // agent from the cluster originally. This is false if the master
-  // has failed over since the agent was removed, for example.
-  //
-  // TODO(neilc): Since `removed` is a cache, we might mistakenly
-  // think the master has failed over and neglect to shutdown
-  // non-partition-aware frameworks on reregistering agents.
-  bool slaveWasRemoved = slaves.removed.get(slaveInfo.id()).isSome();
-
-  // Decide how to handle the tasks running on the agent:
-  //
-  // (a) If the master has not failed over since the agent was marked
-  // unreachable, only partition-aware tasks are re-added to the
+  // All tasks except the ones from completed frameworks are re-added to the
   // master (those tasks were previously marked "unreachable", so they
-  // should be removed from that collection). Any non-partition-aware
-  // frameworks running on the agent are shutdown. We already marked
-  // such tasks "completed" when the agent was marked unreachable, so
-  // no further cleanup for non-partition-aware tasks is required.
-  //
-  // In addition, we also filter any tasks whose frameworks have
-  // completed. As in case (a), such frameworks will be shutdown and
-  // their tasks have already been marked "completed".
-  //
-  // (b) If the master has failed over, all tasks are re-added to the
-  // master. The master shouldn't have any record of the tasks running
-  // on the agent, so no further cleanup is required.
+  // should be removed from that collection).
   vector<Task> recoveredTasks;
   foreach (const Task& task, tasks) {
     const FrameworkID& frameworkId = task.framework_id();
@@ -6824,18 +6801,11 @@ void Master::__reregisterSlave(
       continue;
     }
 
-    // Always re-add partition-aware tasks.
-    if (partitionAwareFrameworks.contains(frameworkId)) {
-      recoveredTasks.push_back(task);
+    recoveredTasks.push_back(task);
 
-      Framework* framework = getFramework(frameworkId);
-      if (framework != nullptr) {
-        framework->unreachableTasks.erase(task.task_id());
-      }
-    } else if (!slaveWasRemoved) {
-      // Only re-add non-partition-aware tasks if the master has
-      // failed over since the agent was marked unreachable.
-      recoveredTasks.push_back(task);
+    Framework* framework = getFramework(frameworkId);
+    if (framework != nullptr) {
+      framework->unreachableTasks.erase(task.task_id());
     }
   }
 
@@ -6878,35 +6848,15 @@ void Master::__reregisterSlave(
   LOG(INFO) << "Re-registered agent " << *slave
             << " with " << Resources(slave->info.resources());
 
-  // Determine which frameworks on the slave to shutdown, if any. This
-  // happens in two cases:
-  //
-  // (1) If this master marked the slave unreachable (i.e., master has
-  // not failed over), we shutdown any non-partition-aware frameworks
-  // running on the slave. This matches the Mesos <= 1.0 "non-strict"
-  // registry semantics.
-  //
-  // (2) Any framework that is completed at the master but still
-  // running at the slave is shutdown. This can occur if the framework
-  // was removed when the slave was partitioned. NOTE: This is just a
+  // Any framework that is completed at the master but still running
+  // at the slave is shutdown. This can occur if the framework was
+  // removed when the slave was partitioned. NOTE: This is just a
   // short-term hack because information about completed frameworks is
   // lost when the master fails over. Also, we only store a limited
   // number of completed frameworks. A proper fix likely involves
   // storing framework information in the registry (MESOS-1719).
   foreach (const FrameworkInfo& framework, frameworks) {
-    if (slaveWasRemoved && !partitionAwareFrameworks.contains(framework.id())) {
-      LOG(INFO) << "Shutting down framework " << framework.id()
-                << " at re-registered agent " << *slave
-                << " because the framework is not partition-aware";
-
-      ShutdownFrameworkMessage message;
-      message.mutable_framework_id()->MergeFrom(framework.id());
-      send(slave->pid, message);
-
-      // The framework's tasks should not be stored in the master's
-      // in-memory state, because they were not re-added above.
-      CHECK(!slave->tasks.contains(framework.id()));
-    } else if (isCompletedFramework(framework.id())) {
+    if (isCompletedFramework(framework.id())) {
       LOG(INFO) << "Shutting down framework " << framework.id()
                 << " at re-registered agent " << *slave
                 << " because the framework has been shutdown at the master";
@@ -9369,15 +9319,16 @@ void Master::__removeSlave(
     TaskState newTaskState = TASK_UNREACHABLE;
     TaskStatus::Reason newTaskReason = TaskStatus::REASON_SLAVE_REMOVED;
 
+    // Needed to convey task unreachability because we lose this
+    // information from the task state if `TASK_LOST` is used.
+    bool unreachable = true;
+
     if (!framework->capabilities.partitionAware) {
       newTaskState = TASK_LOST;
-    } else {
-      if (unreachableTime.isSome()) {
-        newTaskState = TASK_UNREACHABLE;
-      } else {
-        newTaskState = TASK_GONE_BY_OPERATOR;
-        newTaskReason = TaskStatus::REASON_SLAVE_REMOVED_BY_OPERATOR;
-      }
+    } else if (unreachableTime.isNone()) {
+      unreachable = false;
+      newTaskState = TASK_GONE_BY_OPERATOR;
+      newTaskReason = TaskStatus::REASON_SLAVE_REMOVED_BY_OPERATOR;
     }
 
     foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
@@ -9399,7 +9350,7 @@ void Master::__removeSlave(
           unreachableTime.isSome() ? unreachableTime : None());
 
       updateTask(task, update);
-      removeTask(task);
+      removeTask(task, unreachable);
 
       if (!framework->connected()) {
         LOG(WARNING) << "Dropping update " << update
@@ -9602,7 +9553,7 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
 }
 
 
-void Master::removeTask(Task* task)
+void Master::removeTask(Task* task, bool unreachable)
 {
   CHECK_NOTNULL(task);
 
@@ -9611,6 +9562,8 @@ void Master::removeTask(Task* task)
   CHECK_NOTNULL(slave);
 
   if (!isRemovable(task->state())) {
+    CHECK(!unreachable) << task->task_id();
+
     // Note that we convert to `Resources` for output as it's faster than
     // logging raw protobuf data. Conversion is safe, as resources have
     // already passed validation.
@@ -9640,7 +9593,7 @@ void Master::removeTask(Task* task)
   // Remove from framework.
   Framework* framework = getFramework(task->framework_id());
   if (framework != nullptr) { // A framework might not be re-registered yet.
-    framework->removeTask(task);
+    framework->removeTask(task, unreachable);
   }
 
   // Remove from slave.
@@ -10371,7 +10324,11 @@ double Master::_tasks_unreachable()
   double count = 0.0;
 
   foreachvalue (Framework* framework, frameworks.registered) {
-    count += framework->unreachableTasks.size();
+    foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
+      if (task.get()->state() == TASK_UNREACHABLE) {
+        count++;
+      }
+    }
   }
 
   return count;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1c6a86f..3c86463 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -877,8 +877,11 @@ protected:
   // terminal.
   void updateTask(Task* task, const StatusUpdate& update);
 
-  // Removes the task.
-  void removeTask(Task* task);
+  // Removes the task. `unreachable` indicates whether the task is removed due
+  // to being unreachable. Note that we cannot rely on the task state because
+  // it may not reflect unreachability due to being set to TASK_LOST for
+  // backwards compatibility.
+  void removeTask(Task* task, bool unreachable = false);
 
   // Remove an executor and recover its resources.
   void removeExecutor(
@@ -2326,14 +2329,15 @@ struct Framework
 
   void addUnreachableTask(const Task& task)
   {
-    CHECK(protobuf::frameworkHasCapability(
-              info, FrameworkInfo::Capability::PARTITION_AWARE));
-
     // TODO(adam-mesos): Check if unreachable task already exists.
     unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task)));
   }
 
-  void removeTask(Task* task)
+  // Removes the task. `unreachable` indicates whether the task is removed due
+  // to being unreachable. Note that we cannot rely on the task state because
+  // it may not reflect unreachability due to being set to TASK_LOST for
+  // backwards compatibility.
+  void removeTask(Task* task, bool unreachable)
   {
     CHECK(tasks.contains(task->task_id()))
       << "Unknown task " << task->task_id()
@@ -2343,7 +2347,9 @@ struct Framework
       recoverResources(task);
     }
 
-    if (task->state() == TASK_UNREACHABLE) {
+    if (unreachable) {
+      CHECK(task->state() == TASK_UNREACHABLE || task->state() == TASK_LOST)
+        << task->state();
       addUnreachableTask(*task);
     } else {
       addCompletedTask(*task);
@@ -2853,15 +2859,12 @@ struct Framework
   // fixed-size cache to avoid consuming too much memory. We use
   // boost::circular_buffer rather than BoundedHashMap because there
   // can be multiple completed tasks with the same task ID.
-  //
-  // NOTE: When an agent is marked unreachable, non-partition-aware
-  // tasks are marked TASK_LOST and stored here; partition-aware tasks
-  // are marked TASK_UNREACHABLE and stored in `unreachableTasks`.
   boost::circular_buffer<process::Owned<Task>> completedTasks;
 
-  // Partition-aware tasks running on agents that have been marked
-  // unreachable. We only keep a fixed-size cache to avoid consuming
-  // too much memory.
+  // When an agent is marked unreachable, tasks running on it are stored
+  // here. We only keep a fixed-size cache to avoid consuming too much memory.
+  // NOTE: Non-partition-aware unreachable tasks in this map are marked
+  // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
   BoundedHashMap<TaskID, process::Owned<Task>> unreachableTasks;
 
   hashset<Offer*> offers; // Active offers for framework.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 067529a..31ebfe1 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -535,8 +535,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlavePartitionAware)
 
 
 // This test checks that a slave can reregister with the master after
-// a partition, and that non-PARTITION_AWARE tasks running on the
-// slave are shutdown.
+// a partition, and that non-PARTITION_AWARE tasks are still running on the
+// slave.
 TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
 {
   Clock::pause();
@@ -718,8 +718,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
   Clock::advance(agentFlags.registration_backoff_factor);
   AWAIT_READY(slaveReregistered);
 
-  // Perform explicit reconciliation. The task should not be running
-  // (TASK_LOST) because the framework is not PARTITION_AWARE.
+  // Perform explicit reconciliation. The task should be running
+  // for the non-PARTITION_AWARE framework.
   TaskStatus status;
   status.mutable_task_id()->CopyFrom(task.task_id());
   status.mutable_slave_id()->CopyFrom(slaveId);
@@ -732,16 +732,15 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
   driver.reconcileTasks({status});
 
   AWAIT_READY(reconcileUpdate);
-  EXPECT_EQ(TASK_LOST, reconcileUpdate->state());
+  EXPECT_EQ(TASK_RUNNING, reconcileUpdate->state());
   EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate->reason());
   EXPECT_FALSE(reconcileUpdate->has_unreachable_time());
 
   Clock::resume();
 
   // Check the master's "/state" endpoint. The "tasks" and
-  // "unreachable_tasks" fields should be empty; there should be a
-  // single completed task (we report LOST tasks as "completed" for
-  // backward compatibility).
+  // "unreachable_tasks" fields should be empty and there should be
+  // no completed tasks.
   {
     Future<Response> response = process::http::get(
         master.get()->pid,
@@ -765,7 +764,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
 
     JSON::Array runningTasks = framework.values["tasks"].as<JSON::Array>();
 
-    EXPECT_TRUE(runningTasks.values.empty());
+    EXPECT_EQ(1u, runningTasks.values.size());
 
     JSON::Array unreachableTasks =
       framework.values["unreachable_tasks"].as<JSON::Array>();
@@ -775,14 +774,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
     JSON::Array completedTasks =
       framework.values["completed_tasks"].as<JSON::Array>();
 
-    JSON::Object completedTask =
-      completedTasks.values.front().as<JSON::Object>();
-
-    EXPECT_EQ(
-        task.task_id(), completedTask.values["id"].as<JSON::String>().value);
-    EXPECT_EQ(
-        "TASK_LOST",
-        completedTask.values["state"].as<JSON::String>().value);
+    EXPECT_TRUE(completedTasks.values.empty());
   }
 
   // Check the master's "/state-summary" endpoint.
@@ -805,9 +797,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
 
     JSON::Object framework = frameworks.values.front().as<JSON::Object>();
 
-    EXPECT_EQ(0, framework.values["TASK_RUNNING"].as<JSON::Number>());
+    EXPECT_EQ(1, framework.values["TASK_RUNNING"].as<JSON::Number>());
     EXPECT_EQ(0, framework.values["TASK_UNREACHABLE"].as<JSON::Number>());
-    EXPECT_EQ(1, framework.values["TASK_LOST"].as<JSON::Number>());
+    EXPECT_EQ(0, framework.values["TASK_LOST"].as<JSON::Number>());
   }
 
   {
@@ -2097,19 +2089,15 @@ TEST_F(PartitionTest, TaskCompletedOnPartitionedAgent)
   Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
       SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
 
-  Future<Nothing> execShutdown;
-  EXPECT_CALL(exec, shutdown(_))
-    .WillOnce(FutureSatisfy(&execShutdown));
-
   Future<TaskStatus> finishedStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&finishedStatus));
+    .WillOnce(FutureArg<1>(&finishedStatus))
+    .WillRepeatedly(Return()); // The agent may resend status updates.
 
   detector.appoint(master.get()->pid);
 
   Clock::advance(agentFlags.registration_backoff_factor);
   AWAIT_READY(slaveReregistered);
-  AWAIT_READY(execShutdown);
 
   AWAIT_READY(finishedStatus);
   EXPECT_EQ(TASK_FINISHED, finishedStatus->state());
@@ -2175,13 +2163,10 @@ TEST_F(PartitionTest, TaskCompletedOnPartitionedAgent)
     JSON::Object completedTask =
       completedTasks.values.front().as<JSON::Object>();
 
-    // TODO(neilc): It might be better to report TASK_FINISHED here. We
-    // report TASK_LOST currently because the master doesn't update
-    // the state of tasks it has already marked as completed.
     EXPECT_EQ(
         task.task_id(), completedTask.values["id"].as<JSON::String>().value);
     EXPECT_EQ(
-        "TASK_LOST",
+        "TASK_FINISHED",
         completedTask.values["state"].as<JSON::String>().value);
   }