You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/11/29 23:15:43 UTC
mesos git commit: Do not kill non-partition-aware tasks upon agent
reregistration.
Repository: mesos
Updated Branches:
refs/heads/master 5bfdc3bec -> 9d0cadc82
Do not kill non-partition-aware tasks upon agent reregistration.
The master used to send a ShutdownFrameworkMessage to the agent
to kill the tasks from non-partition-aware framework but this
misuse of ShutdownFrameworkMessage has led to problematic behavior
in MESOS-7215. This commit fixes the problem by not killing the
tasks for non-partition-aware frameworks when an unreachable agent
re-registers. Instead, the master just recovers these tasks.
Review: https://reviews.apache.org/r/61473/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d0cadc8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d0cadc8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d0cadc8
Branch: refs/heads/master
Commit: 9d0cadc82210a78cf632913808c496fd0ade713e
Parents: 5bfdc3b
Author: Megha Sharma <ms...@apple.com>
Authored: Wed Nov 29 14:24:13 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed Nov 29 15:15:29 2017 -0800
----------------------------------------------------------------------
include/mesos/mesos.proto | 14 +++---
include/mesos/v1/mesos.proto | 14 +++---
src/master/http.cpp | 31 +++++++++++-
src/master/master.cpp | 99 +++++++++++---------------------------
src/master/master.hpp | 31 ++++++------
src/tests/partition_tests.cpp | 43 ++++++-----------
6 files changed, 105 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index b1ebfe2..524665a 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -336,14 +336,16 @@ message FrameworkInfo {
// Frameworks that enable this capability can define how they
// would like to handle partitioned tasks. Frameworks will
// receive TASK_UNREACHABLE for tasks on agents that are
- // partitioned from the master. If/when a partitioned agent
- // reregisters, tasks on the agent that were started by
- // PARTITION_AWARE frameworks will not killed.
+ // partitioned from the master.
//
// Without this capability, frameworks will receive TASK_LOST
- // for tasks on partitioned agents; such tasks will be killed by
- // Mesos when the agent reregisters (unless the master has
- // failed over).
+ // for tasks on partitioned agents.
+ // NOTE: Prior to Mesos 1.5, such tasks will be killed by Mesos
+ // when the agent reregisters (unless the master has failed over).
+ // However due to the lack of benefit in maintaining different
+ // behaviors depending on whether the master has failed over
+ // (see MESOS-7215), as of 1.5, Mesos will not kill these
+ // tasks in either case.
PARTITION_AWARE = 5;
// This expresses the ability for the framework to be
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index d535eb4..0ba6032 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -334,14 +334,16 @@ message FrameworkInfo {
// Frameworks that enable this capability can define how they
// would like to handle partitioned tasks. Frameworks will
// receive TASK_UNREACHABLE for tasks on agents that are
- // partitioned from the master. If/when a partitioned agent
- // reregisters, tasks on the agent that were started by
- // PARTITION_AWARE frameworks will not killed.
+ // partitioned from the master.
//
// Without this capability, frameworks will receive TASK_LOST
- // for tasks on partitioned agents; such tasks will be killed by
- // Mesos when the agent reregisters (unless the master has
- // failed over).
+ // for tasks on partitioned agents.
+ // NOTE: Prior to Mesos 1.5, such tasks will be killed by Mesos
+ // when the agent reregisters (unless the master has failed over).
+ // However due to the lack of benefit in maintaining different
+ // behaviors depending on whether the master has failed over
+ // (see MESOS-7215), as of 1.5, Mesos will not kill these
+ // tasks in either case.
PARTITION_AWARE = 5;
// This expresses the ability for the framework to be
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 9dcdcbe..d1138a2 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -328,6 +328,12 @@ struct FullFrameworkWriter {
writer->field("unreachable_tasks", [this](JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
+ // There could be TASK_LOST tasks in this map. See comment for
+ // `unreachableTasks`.
+ if (task.get()->state() != TASK_UNREACHABLE) {
+ continue;
+ }
+
// Skip unauthorized tasks.
if (!authorizeTask_->accept(*task, framework_->info)) {
continue;
@@ -346,6 +352,22 @@ struct FullFrameworkWriter {
writer->element(*task);
}
+
+ // Unreachable tasks belonging to a non-partition-aware framework
+ // have been stored as TASK_LOST for backward compatibility so we
+ // should export them as completed tasks.
+ foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
+ if (task.get()->state() != TASK_LOST) {
+ continue;
+ }
+
+ // Skip unauthorized tasks.
+ if (!authorizeTask_->accept(*task, framework_->info)) {
+ continue;
+ }
+
+ writer->element(*task);
+ }
});
// Model all of the offers associated with a framework.
@@ -4221,7 +4243,14 @@ mesos::master::Response::GetTasks Master::Http::_getTasks(
continue;
}
- getTasks.add_unreachable_tasks()->CopyFrom(*task);
+ if (task.get()->state() == TASK_UNREACHABLE) {
+ getTasks.add_unreachable_tasks()->CopyFrom(*task);
+ } else {
+ // Unreachable tasks belonging to a non-partition-aware framework
+ // have been stored as TASK_LOST for backward compatibility so we
+ // should export them as completed tasks.
+ getTasks.add_completed_tasks()->CopyFrom(*task);
+ }
}
// Completed tasks.
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7bcdb74..1876d7d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6788,32 +6788,9 @@ void Master::__reregisterSlave(
}
}
- // Check if this master was the one that removed the reregistering
- // agent from the cluster originally. This is false if the master
- // has failed over since the agent was removed, for example.
- //
- // TODO(neilc): Since `removed` is a cache, we might mistakenly
- // think the master has failed over and neglect to shutdown
- // non-partition-aware frameworks on reregistering agents.
- bool slaveWasRemoved = slaves.removed.get(slaveInfo.id()).isSome();
-
- // Decide how to handle the tasks running on the agent:
- //
- // (a) If the master has not failed over since the agent was marked
- // unreachable, only partition-aware tasks are re-added to the
+ // All tasks except the ones from completed frameworks are re-added to the
// master (those tasks were previously marked "unreachable", so they
- // should be removed from that collection). Any non-partition-aware
- // frameworks running on the agent are shutdown. We already marked
- // such tasks "completed" when the agent was marked unreachable, so
- // no further cleanup for non-partition-aware tasks is required.
- //
- // In addition, we also filter any tasks whose frameworks have
- // completed. As in case (a), such frameworks will be shutdown and
- // their tasks have already been marked "completed".
- //
- // (b) If the master has failed over, all tasks are re-added to the
- // master. The master shouldn't have any record of the tasks running
- // on the agent, so no further cleanup is required.
+ // should be removed from that collection).
vector<Task> recoveredTasks;
foreach (const Task& task, tasks) {
const FrameworkID& frameworkId = task.framework_id();
@@ -6824,18 +6801,11 @@ void Master::__reregisterSlave(
continue;
}
- // Always re-add partition-aware tasks.
- if (partitionAwareFrameworks.contains(frameworkId)) {
- recoveredTasks.push_back(task);
+ recoveredTasks.push_back(task);
- Framework* framework = getFramework(frameworkId);
- if (framework != nullptr) {
- framework->unreachableTasks.erase(task.task_id());
- }
- } else if (!slaveWasRemoved) {
- // Only re-add non-partition-aware tasks if the master has
- // failed over since the agent was marked unreachable.
- recoveredTasks.push_back(task);
+ Framework* framework = getFramework(frameworkId);
+ if (framework != nullptr) {
+ framework->unreachableTasks.erase(task.task_id());
}
}
@@ -6878,35 +6848,15 @@ void Master::__reregisterSlave(
LOG(INFO) << "Re-registered agent " << *slave
<< " with " << Resources(slave->info.resources());
- // Determine which frameworks on the slave to shutdown, if any. This
- // happens in two cases:
- //
- // (1) If this master marked the slave unreachable (i.e., master has
- // not failed over), we shutdown any non-partition-aware frameworks
- // running on the slave. This matches the Mesos <= 1.0 "non-strict"
- // registry semantics.
- //
- // (2) Any framework that is completed at the master but still
- // running at the slave is shutdown. This can occur if the framework
- // was removed when the slave was partitioned. NOTE: This is just a
+ // Any framework that is completed at the master but still running
+ // at the slave is shutdown. This can occur if the framework was
+ // removed when the slave was partitioned. NOTE: This is just a
// short-term hack because information about completed frameworks is
// lost when the master fails over. Also, we only store a limited
// number of completed frameworks. A proper fix likely involves
// storing framework information in the registry (MESOS-1719).
foreach (const FrameworkInfo& framework, frameworks) {
- if (slaveWasRemoved && !partitionAwareFrameworks.contains(framework.id())) {
- LOG(INFO) << "Shutting down framework " << framework.id()
- << " at re-registered agent " << *slave
- << " because the framework is not partition-aware";
-
- ShutdownFrameworkMessage message;
- message.mutable_framework_id()->MergeFrom(framework.id());
- send(slave->pid, message);
-
- // The framework's tasks should not be stored in the master's
- // in-memory state, because they were not re-added above.
- CHECK(!slave->tasks.contains(framework.id()));
- } else if (isCompletedFramework(framework.id())) {
+ if (isCompletedFramework(framework.id())) {
LOG(INFO) << "Shutting down framework " << framework.id()
<< " at re-registered agent " << *slave
<< " because the framework has been shutdown at the master";
@@ -9369,15 +9319,16 @@ void Master::__removeSlave(
TaskState newTaskState = TASK_UNREACHABLE;
TaskStatus::Reason newTaskReason = TaskStatus::REASON_SLAVE_REMOVED;
+ // Needed to convey task unreachability because we lose this
+ // information from the task state if `TASK_LOST` is used.
+ bool unreachable = true;
+
if (!framework->capabilities.partitionAware) {
newTaskState = TASK_LOST;
- } else {
- if (unreachableTime.isSome()) {
- newTaskState = TASK_UNREACHABLE;
- } else {
- newTaskState = TASK_GONE_BY_OPERATOR;
- newTaskReason = TaskStatus::REASON_SLAVE_REMOVED_BY_OPERATOR;
- }
+ } else if (unreachableTime.isNone()) {
+ unreachable = false;
+ newTaskState = TASK_GONE_BY_OPERATOR;
+ newTaskReason = TaskStatus::REASON_SLAVE_REMOVED_BY_OPERATOR;
}
foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
@@ -9399,7 +9350,7 @@ void Master::__removeSlave(
unreachableTime.isSome() ? unreachableTime : None());
updateTask(task, update);
- removeTask(task);
+ removeTask(task, unreachable);
if (!framework->connected()) {
LOG(WARNING) << "Dropping update " << update
@@ -9602,7 +9553,7 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
}
-void Master::removeTask(Task* task)
+void Master::removeTask(Task* task, bool unreachable)
{
CHECK_NOTNULL(task);
@@ -9611,6 +9562,8 @@ void Master::removeTask(Task* task)
CHECK_NOTNULL(slave);
if (!isRemovable(task->state())) {
+ CHECK(!unreachable) << task->task_id();
+
// Note that we convert to `Resources` for output as it's faster than
// logging raw protobuf data. Conversion is safe, as resources have
// already passed validation.
@@ -9640,7 +9593,7 @@ void Master::removeTask(Task* task)
// Remove from framework.
Framework* framework = getFramework(task->framework_id());
if (framework != nullptr) { // A framework might not be re-registered yet.
- framework->removeTask(task);
+ framework->removeTask(task, unreachable);
}
// Remove from slave.
@@ -10371,7 +10324,11 @@ double Master::_tasks_unreachable()
double count = 0.0;
foreachvalue (Framework* framework, frameworks.registered) {
- count += framework->unreachableTasks.size();
+ foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
+ if (task.get()->state() == TASK_UNREACHABLE) {
+ count++;
+ }
+ }
}
return count;
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1c6a86f..3c86463 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -877,8 +877,11 @@ protected:
// terminal.
void updateTask(Task* task, const StatusUpdate& update);
- // Removes the task.
- void removeTask(Task* task);
+ // Removes the task. `unreachable` indicates whether the task is removed due
+ // to being unreachable. Note that we cannot rely on the task state because
+ // it may not reflect unreachability due to being set to TASK_LOST for
+ // backwards compatibility.
+ void removeTask(Task* task, bool unreachable = false);
// Remove an executor and recover its resources.
void removeExecutor(
@@ -2326,14 +2329,15 @@ struct Framework
void addUnreachableTask(const Task& task)
{
- CHECK(protobuf::frameworkHasCapability(
- info, FrameworkInfo::Capability::PARTITION_AWARE));
-
// TODO(adam-mesos): Check if unreachable task already exists.
unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task)));
}
- void removeTask(Task* task)
+ // Removes the task. `unreachable` indicates whether the task is removed due
+ // to being unreachable. Note that we cannot rely on the task state because
+ // it may not reflect unreachability due to being set to TASK_LOST for
+ // backwards compatibility.
+ void removeTask(Task* task, bool unreachable)
{
CHECK(tasks.contains(task->task_id()))
<< "Unknown task " << task->task_id()
@@ -2343,7 +2347,9 @@ struct Framework
recoverResources(task);
}
- if (task->state() == TASK_UNREACHABLE) {
+ if (unreachable) {
+ CHECK(task->state() == TASK_UNREACHABLE || task->state() == TASK_LOST)
+ << task->state();
addUnreachableTask(*task);
} else {
addCompletedTask(*task);
@@ -2853,15 +2859,12 @@ struct Framework
// fixed-size cache to avoid consuming too much memory. We use
// boost::circular_buffer rather than BoundedHashMap because there
// can be multiple completed tasks with the same task ID.
- //
- // NOTE: When an agent is marked unreachable, non-partition-aware
- // tasks are marked TASK_LOST and stored here; partition-aware tasks
- // are marked TASK_UNREACHABLE and stored in `unreachableTasks`.
boost::circular_buffer<process::Owned<Task>> completedTasks;
- // Partition-aware tasks running on agents that have been marked
- // unreachable. We only keep a fixed-size cache to avoid consuming
- // too much memory.
+ // When an agent is marked unreachable, tasks running on it are stored
+ // here. We only keep a fixed-size cache to avoid consuming too much memory.
+ // NOTE: Non-partition-aware unreachable tasks in this map are marked
+ // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
BoundedHashMap<TaskID, process::Owned<Task>> unreachableTasks;
hashset<Offer*> offers; // Active offers for framework.
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d0cadc8/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 067529a..31ebfe1 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -535,8 +535,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlavePartitionAware)
// This test checks that a slave can reregister with the master after
-// a partition, and that non-PARTITION_AWARE tasks running on the
-// slave are shutdown.
+// a partition, and that non-PARTITION_AWARE tasks are still running on the
+// slave.
TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
{
Clock::pause();
@@ -718,8 +718,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
Clock::advance(agentFlags.registration_backoff_factor);
AWAIT_READY(slaveReregistered);
- // Perform explicit reconciliation. The task should not be running
- // (TASK_LOST) because the framework is not PARTITION_AWARE.
+ // Perform explicit reconciliation. The task should be running
+ // for the non-PARTITION_AWARE framework.
TaskStatus status;
status.mutable_task_id()->CopyFrom(task.task_id());
status.mutable_slave_id()->CopyFrom(slaveId);
@@ -732,16 +732,15 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
driver.reconcileTasks({status});
AWAIT_READY(reconcileUpdate);
- EXPECT_EQ(TASK_LOST, reconcileUpdate->state());
+ EXPECT_EQ(TASK_RUNNING, reconcileUpdate->state());
EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate->reason());
EXPECT_FALSE(reconcileUpdate->has_unreachable_time());
Clock::resume();
// Check the master's "/state" endpoint. The "tasks" and
- // "unreachable_tasks" fields should be empty; there should be a
- // single completed task (we report LOST tasks as "completed" for
- // backward compatibility).
+ // "unreachable_tasks" fields should be empty and there should be
+ // no completed tasks.
{
Future<Response> response = process::http::get(
master.get()->pid,
@@ -765,7 +764,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
JSON::Array runningTasks = framework.values["tasks"].as<JSON::Array>();
- EXPECT_TRUE(runningTasks.values.empty());
+ EXPECT_EQ(1u, runningTasks.values.size());
JSON::Array unreachableTasks =
framework.values["unreachable_tasks"].as<JSON::Array>();
@@ -775,14 +774,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
JSON::Array completedTasks =
framework.values["completed_tasks"].as<JSON::Array>();
- JSON::Object completedTask =
- completedTasks.values.front().as<JSON::Object>();
-
- EXPECT_EQ(
- task.task_id(), completedTask.values["id"].as<JSON::String>().value);
- EXPECT_EQ(
- "TASK_LOST",
- completedTask.values["state"].as<JSON::String>().value);
+ EXPECT_TRUE(completedTasks.values.empty());
}
// Check the master's "/state-summary" endpoint.
@@ -805,9 +797,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
JSON::Object framework = frameworks.values.front().as<JSON::Object>();
- EXPECT_EQ(0, framework.values["TASK_RUNNING"].as<JSON::Number>());
+ EXPECT_EQ(1, framework.values["TASK_RUNNING"].as<JSON::Number>());
EXPECT_EQ(0, framework.values["TASK_UNREACHABLE"].as<JSON::Number>());
- EXPECT_EQ(1, framework.values["TASK_LOST"].as<JSON::Number>());
+ EXPECT_EQ(0, framework.values["TASK_LOST"].as<JSON::Number>());
}
{
@@ -2097,19 +2089,15 @@ TEST_F(PartitionTest, TaskCompletedOnPartitionedAgent)
Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
- Future<Nothing> execShutdown;
- EXPECT_CALL(exec, shutdown(_))
- .WillOnce(FutureSatisfy(&execShutdown));
-
Future<TaskStatus> finishedStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&finishedStatus));
+ .WillOnce(FutureArg<1>(&finishedStatus))
+ .WillRepeatedly(Return()); // The agent may resend status updates.
detector.appoint(master.get()->pid);
Clock::advance(agentFlags.registration_backoff_factor);
AWAIT_READY(slaveReregistered);
- AWAIT_READY(execShutdown);
AWAIT_READY(finishedStatus);
EXPECT_EQ(TASK_FINISHED, finishedStatus->state());
@@ -2175,13 +2163,10 @@ TEST_F(PartitionTest, TaskCompletedOnPartitionedAgent)
JSON::Object completedTask =
completedTasks.values.front().as<JSON::Object>();
- // TODO(neilc): It might be better to report TASK_FINISHED here. We
- // report TASK_LOST currently because the master doesn't update
- // the state of tasks it has already marked as completed.
EXPECT_EQ(
task.task_id(), completedTask.values["id"].as<JSON::String>().value);
EXPECT_EQ(
- "TASK_LOST",
+ "TASK_FINISHED",
completedTask.values["state"].as<JSON::String>().value);
}