You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/02/15 22:39:03 UTC
mesos git commit: Fixed a crash on the default executor due to a
failed invariant check.
Repository: mesos
Updated Branches:
refs/heads/master 199d3941b -> 7ae60706a
Fixed a crash on the default executor due to a failed invariant check.
There was an incorrect assumption when kiling a task group that
all its other tasks would still be active. Its possible that
some of them terminated successfully earlier.
Review: https://reviews.apache.org/r/56716
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ae60706
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ae60706
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ae60706
Branch: refs/heads/master
Commit: 7ae60706a60357d21df4b16debd86f7f5ed808dd
Parents: 199d394
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Feb 14 18:07:21 2017 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Wed Feb 15 14:38:42 2017 -0800
----------------------------------------------------------------------
src/launcher/default_executor.cpp | 8 +-
src/tests/default_executor_tests.cpp | 208 ++++++++++++++++++++++++++++++
2 files changed, 213 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7ae60706/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index e63cf15..d97324b 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -679,6 +679,7 @@ protected:
// Shutdown the executor if all the active child containers have terminated.
if (containers.empty()) {
__shutdown();
+ return;
}
// Ignore if the executor is already in the process of shutting down.
@@ -712,13 +713,14 @@ protected:
container->killingTaskGroup = true;
foreach (const TaskInfo& task, container->taskGroup.tasks()) {
const TaskID& taskId = task.task_id();
- if (taskId == container->taskId) {
+
+ // Ignore if it's the same task that triggered this callback or
+ // if the task is no longer active.
+ if (taskId == container->taskId || !containers.contains(taskId)) {
continue;
}
- CHECK(containers.contains(taskId));
Owned<Container> container_ = containers.at(taskId);
-
container_->killingTaskGroup = true;
kill(container_);
http://git-wip-us.apache.org/repos/asf/mesos/blob/7ae60706/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index 1afab82..ffb69e9 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -1097,6 +1097,214 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorTest, CommitSuicideOnTaskFailure)
ASSERT_EQ(0, executorFailure->status());
}
+
+// This test verifies that the default executor does not commit suicide
+// with a non-zero exit code after killing a task from a task group when
+// one of its tasks finished successfully earlier (See MESOS-7129).
+TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Resources resources =
+ Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+ ExecutorInfo executorInfo;
+ executorInfo.set_type(ExecutorInfo::DEFAULT);
+
+ executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+ executorInfo.mutable_resources()->CopyFrom(resources);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected));
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo));
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ // Update `executorInfo` with the subscribed `frameworkId`.
+ executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId));
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ Future<v1::scheduler::Event::Update> runningUpdate1;
+ Future<v1::scheduler::Event::Update> runningUpdate2;
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&runningUpdate1))
+ .WillOnce(FutureArg<1>(&runningUpdate2));
+
+ Future<v1::scheduler::Event::Failure> executorFailure;
+ EXPECT_CALL(*scheduler, failure(_, _))
+ .WillOnce(FutureArg<1>(&executorFailure));
+
+ const v1::Offer& offer = offers->offers(0);
+ const SlaveID slaveId = devolve(offer.agent_id());
+
+ // The first task finishes successfully while the second
+ // task is explicitly killed later.
+
+ v1::TaskInfo taskInfo1 =
+ evolve(createTask(slaveId, resources, "exit 0"));
+
+ v1::TaskInfo taskInfo2 =
+ evolve(createTask(slaveId, resources, SLEEP_COMMAND(1000)));
+
+ v1::TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(taskInfo1);
+ taskGroup.add_tasks()->CopyFrom(taskInfo2);
+
+ const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+
+ v1::Offer::Operation::LaunchGroup* launchGroup =
+ operation->mutable_launch_group();
+
+ launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo));
+ launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(runningUpdate1);
+ ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
+
+ AWAIT_READY(runningUpdate2);
+ ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state());
+
+ // When running a task, TASK_RUNNING updates for the tasks in a
+ // task group can be received in any order.
+ const hashset<v1::TaskID> tasksRunning{
+ runningUpdate1->status().task_id(),
+ runningUpdate2->status().task_id()};
+
+ ASSERT_EQ(tasks, tasksRunning);
+
+ Future<v1::scheduler::Event::Update> finishedUpdate;
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&finishedUpdate));
+
+ // Acknowledge the TASK_RUNNING updates to receive the next updates.
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+
+ acknowledge->mutable_task_id()->CopyFrom(
+ runningUpdate1->status().task_id());
+
+ acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
+ acknowledge->set_uuid(runningUpdate1->status().uuid());
+
+ mesos.send(call);
+ }
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+
+ acknowledge->mutable_task_id()->CopyFrom(
+ runningUpdate2->status().task_id());
+
+ acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
+ acknowledge->set_uuid(runningUpdate2->status().uuid());
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(finishedUpdate);
+ ASSERT_EQ(TASK_FINISHED, finishedUpdate->status().state());
+ ASSERT_EQ(taskInfo1.task_id(), finishedUpdate->status().task_id());
+
+ // The executor should still be alive after the task
+ // has finished successfully.
+ ASSERT_TRUE(executorFailure.isPending());
+
+ Future<v1::scheduler::Event::Update> killedUpdate;
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&killedUpdate));
+
+ // Now kill the second task in the task group.
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::KILL);
+
+ Call::Kill* kill = call.mutable_kill();
+ kill->mutable_task_id()->CopyFrom(taskInfo2.task_id());
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(killedUpdate);
+ ASSERT_EQ(TASK_KILLED, killedUpdate->status().state());
+ ASSERT_EQ(taskInfo2.task_id(), killedUpdate->status().task_id());
+
+ // The executor should commit suicide after the task is killed.
+ AWAIT_READY(executorFailure);
+
+ // Even though the task failed, the executor should exit gracefully.
+ ASSERT_TRUE(executorFailure->has_status());
+ ASSERT_EQ(0, executorFailure->status());
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {