You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/10/08 04:17:17 UTC

[3/4] mesos git commit: Fixed agent to correctly handle kill task of unregistered executor.

Fixed agent to correctly handle kill task of unregistered executor.

This change brings in the same logic we had introduced for driver
based executors to shut them down if all of their tasks have been
killed before the executor could initially register (MESOS-5380).
Note that the executors still need to commit suicide if they notice
a disconnection with the agent before they could subscribe with
the agent or if they did not receive a launch event within a
timeout. See MESOS-5385 for more details.

Review: https://reviews.apache.org/r/52651/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e6ab3431
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e6ab3431
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e6ab3431

Branch: refs/heads/master
Commit: e6ab3431100eaf90725eac24af9a1a99994877bc
Parents: a910c75
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Oct 7 20:02:41 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Oct 7 20:02:41 2016 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       |  20 ++++++
 src/tests/slave_tests.cpp | 154 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 174 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e6ab3431/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d30001b..119fb36 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3082,6 +3082,26 @@ void Slave::subscribe(
         CHECK_SOME(os::touch(path));
       }
 
+      // Here, we kill the executor if it no longer has any task or task group
+      // to run (e.g., framework sent a `killTask()`). This is a workaround for
+      // those executors (e.g., command executor, default executor) that do not
+      // have a proper self terminating logic when they haven't received the
+      // task or task group within a timeout.
+      if (state != RECOVERING &&
+          executor->queuedTasks.empty() &&
+          executor->queuedTaskGroups.empty()) {
+        CHECK(executor->launchedTasks.empty())
+            << " Newly registered executor '" << executor->id
+            << "' has launched tasks";
+
+        LOG(WARNING) << "Shutting down the executor " << *executor
+                     << " because it has no tasks to run";
+
+        _shutdownExecutor(framework, executor);
+
+        return;
+      }
+
       // Tell executor it's registered and give it any queued tasks
       // or task groups.
       executor::Event event;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e6ab3431/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 3471314..bb6216f 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -3319,6 +3319,160 @@ TEST_F(SlaveTest, KillTaskUnregisteredExecutor)
 }
 
 
+// This test ensures that if a `killTask()` for an HTTP based executor is
+// received by the agent before the executor registers, the executor is
+// properly cleaned up.
+TEST_F(SlaveTest, KillTaskUnregisteredHTTPExecutor)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  Resources resources =
+    Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  ExecutorInfo executorInfo;
+  executorInfo.set_type(ExecutorInfo::DEFAULT);
+
+  executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+
+  const ExecutorID& executorId = executorInfo.executor_id();
+  TestContainerizer containerizer(executorId, executor);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::TestV1Mesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId));
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  const v1::Offer& offer = offers->offers(0);
+  const SlaveID slaveId = devolve(offer.agent_id());
+
+  Future<v1::executor::Mesos*> executorLib;
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(FutureArg<0>(&executorLib));
+
+  v1::TaskInfo task1 =
+    evolve(createTask(slaveId, resources, ""));
+
+  v1::TaskInfo task2 =
+    evolve(createTask(slaveId, resources, ""));
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(task1);
+  taskGroup.add_tasks()->CopyFrom(task2);
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offers->offers(0).id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+
+    v1::Offer::Operation::LaunchGroup* launchGroup =
+      operation->mutable_launch_group();
+
+    launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo));
+    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+
+    mesos.send(call);
+  }
+
+  // Wait for the executor to be launched and then kill the task before
+  // the executor subscribes with the agent.
+  AWAIT_READY(executorLib);
+
+  Future<v1::scheduler::Event::Update> update1;
+  Future<v1::scheduler::Event::Update> update2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&update1))
+    .WillOnce(FutureArg<1>(&update2));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::KILL);
+
+    Call::Kill* kill = call.mutable_kill();
+    kill->mutable_task_id()->CopyFrom(task1.task_id());
+    kill->mutable_agent_id()->CopyFrom(offer.agent_id());
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(update1);
+  AWAIT_READY(update2);
+
+  ASSERT_EQ(v1::TASK_KILLED, update1->status().state());
+  ASSERT_EQ(v1::TASK_KILLED, update2->status().state());
+
+  Future<Nothing> shutdown;
+  EXPECT_CALL(*executor, shutdown(_))
+    .WillOnce(FutureSatisfy(&shutdown));
+
+  // The executor should receive the shutdown event upon subscribing
+  // with the agent.
+  {
+    v1::executor::Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+    call.set_type(v1::executor::Call::SUBSCRIBE);
+
+    call.mutable_subscribe();
+
+    executorLib.get()->send(call);
+  }
+
+  AWAIT_READY(shutdown);
+}
+
+
 // This test verifies that when a slave re-registers with the master
 // it correctly includes the latest and status update task states.
 TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)