You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2018/05/03 16:16:37 UTC

[8/8] mesos git commit: Tested default executor support of `max_completion_time`.

Tested default executor support of `max_completion_time`.

Review: https://reviews.apache.org/r/66293/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0dcf5c7f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0dcf5c7f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0dcf5c7f

Branch: refs/heads/master
Commit: 0dcf5c7f0a013f67fff1fb012d1f45455f394708
Parents: a2ce5e4
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 08:35:03 2018 -0700
Committer: James Peach <jp...@apache.org>
Committed: Thu May 3 08:35:03 2018 -0700

----------------------------------------------------------------------
 src/tests/default_executor_tests.cpp | 223 ++++++++++++++++++++++++++++++
 1 file changed, 223 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0dcf5c7f/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index bf849c4..cd7e263 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -1705,6 +1705,229 @@ TEST_P(DefaultExecutorTest, SigkillExecutor)
 }
 
 
+// This test verifies that the default executor terminates the entire group
+// when some task exceeded its `max_completion_time`.
+TEST_P(DefaultExecutorTest, MaxCompletionTime)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  // Task 1 will finish before its max_completion_time.
+  v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "exit 0");
+
+  taskInfo1.mutable_max_completion_time()->set_nanoseconds(Seconds(2).ns());
+
+  // Task 2 will trigger its max_completion_time.
+  v1::TaskInfo taskInfo2 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  taskInfo2.mutable_max_completion_time()->set_nanoseconds(Seconds(2).ns());
+
+  // Task 3 has no max_completion_time.
+  v1::TaskInfo taskInfo3 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  Future<v1::scheduler::Event::Update> startingUpdate1;
+  Future<v1::scheduler::Event::Update> runningUpdate1;
+  Future<v1::scheduler::Event::Update> finishedUpdate1;
+
+  testing::Sequence task1;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_FINISHED))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&finishedUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Update> startingUpdate2;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  Future<v1::scheduler::Event::Update> failedUpdate2;
+
+  testing::Sequence task2;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_FAILED))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&failedUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Update> startingUpdate3;
+  Future<v1::scheduler::Event::Update> runningUpdate3;
+  Future<v1::scheduler::Event::Update> killedUpdate3;
+
+  testing::Sequence task3;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo3),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task3)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate3),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo3),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task3)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate3),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo3),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task3)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&killedUpdate3),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Failure> executorFailure;
+  EXPECT_CALL(*scheduler, failure(_, _))
+    .WillOnce(FutureArg<1>(&executorFailure));
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo(
+                  {taskInfo1, taskInfo2, taskInfo3}))}));
+
+  AWAIT_READY(startingUpdate1);
+  AWAIT_READY(runningUpdate1);
+  AWAIT_READY(finishedUpdate1);
+
+  AWAIT_READY(startingUpdate2);
+  AWAIT_READY(runningUpdate2);
+  AWAIT_READY(failedUpdate2);
+
+  EXPECT_EQ(
+      v1::TaskStatus::REASON_MAX_COMPLETION_TIME_REACHED,
+      failedUpdate2->status().reason());
+
+  AWAIT_READY(startingUpdate3);
+  AWAIT_READY(runningUpdate3);
+  AWAIT_READY(killedUpdate3);
+
+  EXPECT_NE(
+      v1::TaskStatus::REASON_MAX_COMPLETION_TIME_REACHED,
+      killedUpdate3->status().reason());
+
+  // The executor should commit suicide after the task is killed.
+  AWAIT_READY(executorFailure);
+
+  // Even though the task failed, the executor should exit gracefully.
+  ASSERT_TRUE(executorFailure->has_status());
+  ASSERT_EQ(0, executorFailure->status());
+}
+
 // TODO(qianzhang): Kill policy helpers are not yet enabled on Windows. See
 // MESOS-8168.
 #ifndef __WINDOWS__