You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/03/25 19:06:44 UTC

[08/14] mesos git commit: Updated 'SchedulerTest.TaskGroupRunning'.

Updated 'SchedulerTest.TaskGroupRunning'.

This patch updates the test `SchedulerTest.TaskGroupRunning`
to confirm that the agent-side code responsible for launching
task groups works correctly. Previously, this test only
verified that the `RunTaskGroupMessage` was sent to the agent.

Review: https://reviews.apache.org/r/57807/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4050eb2c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4050eb2c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4050eb2c

Branch: refs/heads/master
Commit: 4050eb2c9d24ebc68fbf2efdc3e0601e50e1aacd
Parents: 2f6bb1f
Author: Greg Mann <gr...@mesosphere.io>
Authored: Sat Mar 25 12:05:22 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Sat Mar 25 12:05:22 2017 -0700

----------------------------------------------------------------------
 src/tests/scheduler_tests.cpp | 94 ++++++++++++++++++++++++++++++++++----
 1 file changed, 85 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4050eb2c/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 65259b4..0f5d9ad 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -38,6 +38,7 @@
 
 #include <process/metrics/metrics.hpp>
 
+#include <stout/hashset.hpp>
 #include <stout/lambda.hpp>
 #include <stout/try.hpp>
 
@@ -529,9 +530,6 @@ TEST_P(SchedulerTest, TaskRunning)
 
 // Ensures that a task group can be successfully launched
 // on the `DEFAULT` executor.
-//
-// TODO(bmahler): We currently only test the master-side
-// of task group handling, since the rest is unimplemented.
 TEST_P(SchedulerTest, TaskGroupRunning)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
@@ -539,8 +537,16 @@ TEST_P(SchedulerTest, TaskGroupRunning)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
+  slave::Flags flags = CreateSlaveFlags();
+#ifndef USE_SSL_SOCKET
+  // Executor authentication currently has SSL as a dependency, so we cannot
+  // require executors to authenticate with the agent operator API if Mesos
+  // was not built with SSL support.
+  flags.authenticate_http_readwrite = false;
+#endif // USE_SSL_SOCKET
+
   Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
   ASSERT_SOME(slave);
 
   Future<Nothing> connected;
@@ -565,7 +571,8 @@ TEST_P(SchedulerTest, TaskGroupRunning)
 
   Future<Event::Offers> offers;
   EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers));
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   {
     Call call;
@@ -602,6 +609,7 @@ TEST_P(SchedulerTest, TaskGroupRunning)
   task1.mutable_agent_id()->CopyFrom(
       offers->offers(0).agent_id());
   task1.mutable_resources()->CopyFrom(resources);
+  task1.mutable_command()->set_value("exit 0");
 
   v1::TaskInfo task2;
   task2.set_name("2");
@@ -609,11 +617,25 @@ TEST_P(SchedulerTest, TaskGroupRunning)
   task2.mutable_agent_id()->CopyFrom(
       offers->offers(0).agent_id());
   task2.mutable_resources()->CopyFrom(resources);
+  task2.mutable_command()->set_value("exit 0");
 
   v1::TaskGroupInfo taskGroup;
   taskGroup.add_tasks()->CopyFrom(task1);
   taskGroup.add_tasks()->CopyFrom(task2);
 
+  Future<Event::Update> runningUpdate1;
+  Future<Event::Update> runningUpdate2;
+  Future<Event::Update> finishedUpdate1;
+  Future<Event::Update> finishedUpdate2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&runningUpdate1))
+    .WillOnce(FutureArg<1>(&runningUpdate2))
+    .WillOnce(FutureArg<1>(&finishedUpdate1))
+    .WillOnce(FutureArg<1>(&finishedUpdate2));
+
+  EXPECT_CALL(*scheduler, failure(_, _))
+    .Times(AtMost(1));
+
   {
     Call call;
     call.mutable_framework_id()->CopyFrom(frameworkId);
@@ -634,10 +656,6 @@ TEST_P(SchedulerTest, TaskGroupRunning)
     mesos.send(call);
   }
 
-  // TODO(bmahler): For now we only ensure that the message is
-  // sent to the agent, since the agent-side of task groups is
-  // not yet implemented.
-
   AWAIT_READY(runTaskGroupMessage);
 
   EXPECT_EQ(devolve(frameworkId), runTaskGroupMessage->framework().id());
@@ -650,6 +668,64 @@ TEST_P(SchedulerTest, TaskGroupRunning)
             runTaskGroupMessage->task_group().tasks(0).task_id());
   EXPECT_EQ(devolve(task2.task_id()),
             runTaskGroupMessage->task_group().tasks(1).task_id());
+
+  AWAIT_READY(runningUpdate1);
+  ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state());
+
+  AWAIT_READY(runningUpdate2);
+  ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
+
+  const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
+
+  // TASK_RUNNING updates for the tasks in a
+  // task group can be received in any order.
+  const hashset<v1::TaskID> tasksRunning{
+    runningUpdate1->status().task_id(),
+    runningUpdate2->status().task_id()};
+
+  ASSERT_EQ(tasks, tasksRunning);
+
+  // Acknowledge the TASK_RUNNING updates so
+  // that subsequent updates can be received.
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(
+        runningUpdate1->status().task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+    acknowledge->set_uuid(runningUpdate1->status().uuid());
+
+    mesos.send(call);
+  }
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(
+        runningUpdate2->status().task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+    acknowledge->set_uuid(runningUpdate2->status().uuid());
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(finishedUpdate1);
+  EXPECT_EQ(v1::TASK_FINISHED, finishedUpdate1->status().state());
+
+  AWAIT_READY(finishedUpdate2);
+  EXPECT_EQ(v1::TASK_FINISHED, finishedUpdate2->status().state());
+
+  const hashset<v1::TaskID> tasksFinished{
+    finishedUpdate1->status().task_id(),
+    finishedUpdate2->status().task_id()};
+
+  EXPECT_EQ(tasks, tasksFinished);
 }