You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/02/15 22:39:03 UTC

mesos git commit: Fixed a crash on the default executor due to a failed invariant check.

Repository: mesos
Updated Branches:
  refs/heads/master 199d3941b -> 7ae60706a


Fixed a crash on the default executor due to a failed invariant check.

There was an incorrect assumption when kiling a task group that
all its other tasks would still be active. Its possible that
some of them terminated successfully earlier.

Review: https://reviews.apache.org/r/56716


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ae60706
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ae60706
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ae60706

Branch: refs/heads/master
Commit: 7ae60706a60357d21df4b16debd86f7f5ed808dd
Parents: 199d394
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Feb 14 18:07:21 2017 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Wed Feb 15 14:38:42 2017 -0800

----------------------------------------------------------------------
 src/launcher/default_executor.cpp    |   8 +-
 src/tests/default_executor_tests.cpp | 208 ++++++++++++++++++++++++++++++
 2 files changed, 213 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7ae60706/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index e63cf15..d97324b 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -679,6 +679,7 @@ protected:
     // Shutdown the executor if all the active child containers have terminated.
     if (containers.empty()) {
       __shutdown();
+      return;
     }
 
     // Ignore if the executor is already in the process of shutting down.
@@ -712,13 +713,14 @@ protected:
       container->killingTaskGroup = true;
       foreach (const TaskInfo& task, container->taskGroup.tasks()) {
         const TaskID& taskId = task.task_id();
-        if (taskId == container->taskId) {
+
+        // Ignore if it's the same task that triggered this callback or
+        // if the task is no longer active.
+        if (taskId == container->taskId || !containers.contains(taskId)) {
           continue;
         }
 
-        CHECK(containers.contains(taskId));
         Owned<Container> container_ = containers.at(taskId);
-
         container_->killingTaskGroup = true;
 
         kill(container_);

http://git-wip-us.apache.org/repos/asf/mesos/blob/7ae60706/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index 1afab82..ffb69e9 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -1097,6 +1097,214 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorTest, CommitSuicideOnTaskFailure)
   ASSERT_EQ(0, executorFailure->status());
 }
 
+
+// This test verifies that the default executor does not commit suicide
+// with a non-zero exit code after killing a task from a task group when
+// one of its tasks finished successfully earlier (See MESOS-7129).
+TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Resources resources =
+    Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+  ExecutorInfo executorInfo;
+  executorInfo.set_type(ExecutorInfo::DEFAULT);
+
+  executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo));
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId));
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  Future<v1::scheduler::Event::Update> runningUpdate1;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&runningUpdate1))
+    .WillOnce(FutureArg<1>(&runningUpdate2));
+
+  Future<v1::scheduler::Event::Failure> executorFailure;
+  EXPECT_CALL(*scheduler, failure(_, _))
+    .WillOnce(FutureArg<1>(&executorFailure));
+
+  const v1::Offer& offer = offers->offers(0);
+  const SlaveID slaveId = devolve(offer.agent_id());
+
+  // The first task finishes successfully while the second
+  // task is explicitly killed later.
+
+  v1::TaskInfo taskInfo1 =
+    evolve(createTask(slaveId, resources, "exit 0"));
+
+  v1::TaskInfo taskInfo2 =
+    evolve(createTask(slaveId, resources, SLEEP_COMMAND(1000)));
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo1);
+  taskGroup.add_tasks()->CopyFrom(taskInfo2);
+
+  const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+
+    v1::Offer::Operation::LaunchGroup* launchGroup =
+      operation->mutable_launch_group();
+
+    launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo));
+    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(runningUpdate1);
+  ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
+
+  AWAIT_READY(runningUpdate2);
+  ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state());
+
+  // When running a task, TASK_RUNNING updates for the tasks in a
+  // task group can be received in any order.
+  const hashset<v1::TaskID> tasksRunning{
+    runningUpdate1->status().task_id(),
+    runningUpdate2->status().task_id()};
+
+  ASSERT_EQ(tasks, tasksRunning);
+
+  Future<v1::scheduler::Event::Update> finishedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&finishedUpdate));
+
+  // Acknowledge the TASK_RUNNING updates to receive the next updates.
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+
+    acknowledge->mutable_task_id()->CopyFrom(
+        runningUpdate1->status().task_id());
+
+    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
+    acknowledge->set_uuid(runningUpdate1->status().uuid());
+
+    mesos.send(call);
+  }
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+
+    acknowledge->mutable_task_id()->CopyFrom(
+        runningUpdate2->status().task_id());
+
+    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
+    acknowledge->set_uuid(runningUpdate2->status().uuid());
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(finishedUpdate);
+  ASSERT_EQ(TASK_FINISHED, finishedUpdate->status().state());
+  ASSERT_EQ(taskInfo1.task_id(), finishedUpdate->status().task_id());
+
+  // The executor should still be alive after the task
+  // has finished successfully.
+  ASSERT_TRUE(executorFailure.isPending());
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate));
+
+  // Now kill the second task in the task group.
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::KILL);
+
+    Call::Kill* kill = call.mutable_kill();
+    kill->mutable_task_id()->CopyFrom(taskInfo2.task_id());
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(killedUpdate);
+  ASSERT_EQ(TASK_KILLED, killedUpdate->status().state());
+  ASSERT_EQ(taskInfo2.task_id(), killedUpdate->status().task_id());
+
+  // The executor should commit suicide after the task is killed.
+  AWAIT_READY(executorFailure);
+
+  // Even though the task failed, the executor should exit gracefully.
+  ASSERT_TRUE(executorFailure->has_status());
+  ASSERT_EQ(0, executorFailure->status());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {