You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/11/21 17:41:55 UTC

[1/2] mesos git commit: Fixed a task status update race in default executor tests.

Repository: mesos
Updated Branches:
  refs/heads/master 84365a140 -> 9ee4da2b5


Fixed a task status update race in default executor tests.

Previously in the test `DefaultExecutorTest.KillMultipleTasks` and
`DefaultExecutorTest.KillTaskGroupOnTaskFailure`, when launching a
task group which has multiple tasks, we expected the scheduler will
receive all the TASK_STARTING status updates before receiving any
TASK_RUNNING status updates. However this is not guaranteed, e.g.,
it is possible for the scheduler to receive TASK_RUNNING for the
first task before receiving TASK_STARTING for the second task.

So in this patch, we used `Sequence` to guarantee the order of
TASK_STARTING and TASK_RUNNING for each task but do not care about
the order between tasks.

The following 3 tests have their own solutions to handle this issue,
in this patch, I updated them to use the above solution.
  `DefaultExecutorTest.KillTask`
  `DefaultExecutorTest.CommitSuicideOnKillTask`
  `DefaultExecutorTest.ROOT_ContainerStatusForTask`

Review: https://reviews.apache.org/r/63577/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e6c6ab85
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e6c6ab85
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e6c6ab85

Branch: refs/heads/master
Commit: e6c6ab856d4f4829cbe71b60e93bb2c1dc6b44bc
Parents: 84365a1
Author: Qian Zhang <zh...@gmail.com>
Authored: Tue Nov 21 18:39:17 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Tue Nov 21 18:39:17 2017 +0100

----------------------------------------------------------------------
 src/tests/default_executor_tests.cpp | 652 +++++++++++++++++-------------
 src/tests/mesos.hpp                  |  14 +
 2 files changed, 394 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e6c6ab85/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index 6515021..04d9e1b 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -71,6 +71,7 @@ using std::string;
 using std::vector;
 
 using testing::_;
+using testing::AllOf;
 using testing::DoAll;
 using testing::Return;
 using testing::WithParamInterface;
@@ -296,28 +297,80 @@ TEST_P(DefaultExecutorTest, KillTask)
   v1::TaskInfo taskInfo2 =
     v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  const hashset<v1::TaskID> tasks1{taskInfo1.task_id(), taskInfo2.task_id()};
-
   Future<v1::scheduler::Event::Update> startingUpdate1;
-  Future<v1::scheduler::Event::Update> startingOrRunningUpdate1;
-  Future<v1::scheduler::Event::Update> startingOrRunningUpdate2;
   Future<v1::scheduler::Event::Update> runningUpdate1;
-  EXPECT_CALL(*scheduler, update(_, _))
+  Future<v1::scheduler::Event::Update> killedUpdate1;
+
+  testing::Sequence task1;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task1)
     .WillOnce(
         DoAll(
             FutureArg<1>(&startingUpdate1),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task1)
     .WillOnce(
         DoAll(
-            FutureArg<1>(&startingOrRunningUpdate1),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task1)
     .WillOnce(
         DoAll(
-            FutureArg<1>(&startingOrRunningUpdate2),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+            FutureArg<1>(&killedUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Update> startingUpdate2;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  Future<v1::scheduler::Event::Update> killedUpdate2;
+
+  testing::Sequence task2;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task2)
     .WillOnce(
         DoAll(
-            FutureArg<1>(&runningUpdate1),
+            FutureArg<1>(&startingUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&killedUpdate2),
             v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
   Future<v1::scheduler::Event::Offers> offers2;
@@ -329,6 +382,7 @@ TEST_P(DefaultExecutorTest, KillTask)
     v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
         executorInfo,
         v1::createTaskGroupInfo({taskInfo1, taskInfo2}));
+
     Call call = v1::createCallAccept(frameworkId, offer1, {launchGroup});
 
     // Set a 0s filter to immediately get another offer to launch
@@ -339,20 +393,10 @@ TEST_P(DefaultExecutorTest, KillTask)
   }
 
   AWAIT_READY(startingUpdate1);
-  ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state());
-
   AWAIT_READY(runningUpdate1);
-  ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
-
-  // When running a task, TASK_RUNNING updates for the tasks in a
-  // task group can be received in any order.
-  const hashset<v1::TaskID> tasksRunning{
-    startingUpdate1->status().task_id(),
-    startingOrRunningUpdate1->status().task_id(),
-    startingOrRunningUpdate2->status().task_id(),
-    runningUpdate1->status().task_id()};
 
-  ASSERT_EQ(tasks1, tasksRunning);
+  AWAIT_READY(startingUpdate2);
+  AWAIT_READY(runningUpdate2);
 
   AWAIT_READY(offers2);
   const v1::Offer& offer2 = offers2->offers(0);
@@ -362,15 +406,41 @@ TEST_P(DefaultExecutorTest, KillTask)
 
   Future<v1::scheduler::Event::Update> startingUpdate3;
   Future<v1::scheduler::Event::Update> runningUpdate3;
-  EXPECT_CALL(*scheduler, update(_, _))
+  Future<v1::scheduler::Event::Update> killedUpdate3;
+
+  testing::Sequence task3;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo3),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task3)
     .WillOnce(
         DoAll(
             FutureArg<1>(&startingUpdate3),
-            v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id())))
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo3),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task3)
     .WillOnce(
         DoAll(
             FutureArg<1>(&runningUpdate3),
-            v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id())));
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo3),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task3)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&killedUpdate3),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
   // Launch the second task group.
   mesos.send(
@@ -381,56 +451,32 @@ TEST_P(DefaultExecutorTest, KillTask)
               executorInfo, v1::createTaskGroupInfo({taskInfo3}))}));
 
   AWAIT_READY(startingUpdate3);
-  ASSERT_EQ(TASK_STARTING, startingUpdate3->status().state());
-  ASSERT_EQ(taskInfo3.task_id(), startingUpdate3->status().task_id());
-
   AWAIT_READY(runningUpdate3);
-  ASSERT_EQ(TASK_RUNNING, runningUpdate3->status().state());
-  ASSERT_EQ(taskInfo3.task_id(), runningUpdate3->status().task_id());
-
-  Future<v1::scheduler::Event::Update> killedUpdate1;
-  Future<v1::scheduler::Event::Update> killedUpdate2;
-  EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&killedUpdate1))
-    .WillOnce(FutureArg<1>(&killedUpdate2));
 
   Future<v1::scheduler::Event::Failure> executorFailure;
   EXPECT_CALL(*scheduler, failure(_, _))
     .WillOnce(FutureArg<1>(&executorFailure));
 
+  ASSERT_TRUE(killedUpdate1.isPending());
+  ASSERT_TRUE(killedUpdate2.isPending());
+  ASSERT_TRUE(killedUpdate3.isPending());
+
   // Now kill a task in the first task group.
   mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id()));
 
-  // All the tasks in the first task group should be killed.
-
+  // Only the tasks in the first group were killed.
   AWAIT_READY(killedUpdate1);
-  ASSERT_EQ(TASK_KILLED, killedUpdate1->status().state());
-
   AWAIT_READY(killedUpdate2);
-  ASSERT_EQ(TASK_KILLED, killedUpdate2->status().state());
-
-  // When killing a task, TASK_KILLED updates for the tasks in a task
-  // group can be received in any order.
-  const hashset<v1::TaskID> tasksKilled{
-    killedUpdate1->status().task_id(),
-    killedUpdate2->status().task_id()};
-
-  ASSERT_EQ(tasks1, tasksKilled);
+  ASSERT_TRUE(killedUpdate3.isPending());
 
   // The executor should still be alive after the first task
   // group has been killed.
   ASSERT_TRUE(executorFailure.isPending());
 
-  Future<v1::scheduler::Event::Update> killedUpdate3;
-  EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&killedUpdate3));
-
   // Now kill the only task present in the second task group.
   mesos.send(v1::createCallKill(frameworkId, taskInfo3.task_id()));
 
   AWAIT_READY(killedUpdate3);
-  ASSERT_EQ(TASK_KILLED, killedUpdate3->status().state());
-  ASSERT_EQ(taskInfo3.task_id(), killedUpdate3->status().task_id());
 
   // The executor should commit suicide after all the tasks have been
   // killed.
@@ -505,30 +551,82 @@ TEST_P(DefaultExecutorTest, KillMultipleTasks)
   v1::TaskInfo taskInfo2 =
     v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
-
   Future<v1::scheduler::Event::Update> startingUpdate1;
-  Future<v1::scheduler::Event::Update> startingUpdate2;
   Future<v1::scheduler::Event::Update> runningUpdate1;
-  Future<v1::scheduler::Event::Update> runningUpdate2;
-  EXPECT_CALL(*scheduler, update(_, _))
+  Future<v1::scheduler::Event::Update> killedUpdate1;
+
+  testing::Sequence task1;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task1)
     .WillOnce(
         DoAll(
             FutureArg<1>(&startingUpdate1),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task1)
     .WillOnce(
         DoAll(
-            FutureArg<1>(&startingUpdate2),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task1)
     .WillOnce(
         DoAll(
-            FutureArg<1>(&runningUpdate1),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+            FutureArg<1>(&killedUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Update> startingUpdate2;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  Future<v1::scheduler::Event::Update> killedUpdate2;
+
+  testing::Sequence task2;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task2)
     .WillOnce(
         DoAll(
             FutureArg<1>(&runningUpdate2),
             v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&killedUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
   mesos.send(
       v1::createCallAccept(
           frameworkId,
@@ -537,30 +635,10 @@ TEST_P(DefaultExecutorTest, KillMultipleTasks)
               executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
 
   AWAIT_READY(startingUpdate1);
-  ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state());
-
-  AWAIT_READY(startingUpdate2);
-  ASSERT_EQ(TASK_STARTING, startingUpdate2->status().state());
-
   AWAIT_READY(runningUpdate1);
-  ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
 
+  AWAIT_READY(startingUpdate2);
   AWAIT_READY(runningUpdate2);
-  ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state());
-
-  // When running a task, TASK_RUNNING updates for the tasks in a
-  // task group can be received in any order.
-  const hashset<v1::TaskID> tasksRunning{
-    runningUpdate1->status().task_id(),
-    runningUpdate2->status().task_id()};
-
-  ASSERT_EQ(tasks, tasksRunning);
-
-  Future<v1::scheduler::Event::Update> killedUpdate1;
-  Future<v1::scheduler::Event::Update> killedUpdate2;
-  EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&killedUpdate1))
-    .WillOnce(FutureArg<1>(&killedUpdate2));
 
   // Now kill all tasks in the task group.
   mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id()));
@@ -568,18 +646,7 @@ TEST_P(DefaultExecutorTest, KillMultipleTasks)
 
   // All the tasks in the task group should be killed.
   AWAIT_READY(killedUpdate1);
-  ASSERT_EQ(TASK_KILLED, killedUpdate1->status().state());
-
   AWAIT_READY(killedUpdate2);
-  ASSERT_EQ(TASK_KILLED, killedUpdate2->status().state());
-
-  // When killing a task, TASK_KILLED updates for the tasks in a task
-  // group can be received in any order.
-  const hashset<v1::TaskID> tasksKilled{
-    killedUpdate1->status().task_id(),
-    killedUpdate2->status().task_id()};
-
-  ASSERT_EQ(tasks, tasksKilled);
 }
 
 
@@ -644,17 +711,81 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
   v1::TaskInfo taskInfo2 =
     v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
-
   Future<v1::scheduler::Event::Update> startingUpdate1;
-  Future<v1::scheduler::Event::Update> startingUpdate2;
   Future<v1::scheduler::Event::Update> runningUpdate1;
+  Future<v1::scheduler::Event::Update> failedUpdate1;
+
+  testing::Sequence task1;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_FAILED))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&failedUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Update> startingUpdate2;
   Future<v1::scheduler::Event::Update> runningUpdate2;
-  EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&startingUpdate1))
-    .WillOnce(FutureArg<1>(&startingUpdate2))
-    .WillOnce(FutureArg<1>(&runningUpdate1))
-    .WillOnce(FutureArg<1>(&runningUpdate2));
+  Future<v1::scheduler::Event::Update> killedUpdate2;
+
+  testing::Sequence task2;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&killedUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
   mesos.send(
       v1::createCallAccept(
@@ -664,60 +795,12 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
               executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
 
   AWAIT_READY(startingUpdate1);
-  ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state());
-
-  AWAIT_READY(startingUpdate2);
-  ASSERT_EQ(TASK_STARTING, startingUpdate2->status().state());
-
-  mesos.send(
-      v1::createCallAcknowledge(frameworkId, agentId, startingUpdate1.get()));
-  mesos.send(
-      v1::createCallAcknowledge(frameworkId, agentId, startingUpdate2.get()));
-
   AWAIT_READY(runningUpdate1);
-  ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
+  AWAIT_READY(failedUpdate1);
 
+  AWAIT_READY(startingUpdate2);
   AWAIT_READY(runningUpdate2);
-  ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state());
-
-  // When running a task, TASK_RUNNING updates for the tasks in a task
-  // group can be received in any order.
-  const hashset<v1::TaskID> tasksRunning{
-    runningUpdate1->status().task_id(),
-    runningUpdate2->status().task_id()};
-
-  ASSERT_EQ(tasks, tasksRunning);
-
-  Future<v1::scheduler::Event::Update> update1;
-  Future<v1::scheduler::Event::Update> update2;
-  EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&update1))
-    .WillOnce(FutureArg<1>(&update2));
-
-  // Acknowledge the TASK_RUNNING updates to receive the next updates.
-  mesos.send(
-      v1::createCallAcknowledge(frameworkId, agentId, runningUpdate1.get()));
-  mesos.send(
-      v1::createCallAcknowledge(frameworkId, agentId, runningUpdate2.get()));
-
-  // Updates for the tasks in a task group can be received in any order.
-  set<pair<v1::TaskID, v1::TaskState>> taskStates;
-
-  taskStates.insert({taskInfo1.task_id(), v1::TASK_FAILED});
-  taskStates.insert({taskInfo2.task_id(), v1::TASK_KILLED});
-
-  AWAIT_READY(update1);
-  AWAIT_READY(update2);
-
-  set<std::pair<v1::TaskID, v1::TaskState>> expectedTaskStates;
-
-  expectedTaskStates.insert(
-      {update1->status().task_id(), update1->status().state()});
-
-  expectedTaskStates.insert(
-      {update2->status().task_id(), update2->status().state()});
-
-  ASSERT_EQ(expectedTaskStates, taskStates);
+  AWAIT_READY(killedUpdate2);
 }
 
 
@@ -858,30 +941,62 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask)
   const v1::Offer& offer = offers->offers(0);
   const v1::AgentID& agentId = offer.agent_id();
 
-  v1::TaskInfo task1 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+  v1::TaskInfo taskInfo1 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  v1::TaskInfo task2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+  v1::TaskInfo taskInfo2 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  Future<Event::Update> updateStarting1;
-  Future<Event::Update> updateStartingOrRunning1;
-  Future<Event::Update> updateStartingOrRunning2;
-  Future<Event::Update> updateRunning2;
-  EXPECT_CALL(*scheduler, update(_, _))
+  Future<v1::scheduler::Event::Update> startingUpdate1;
+  Future<v1::scheduler::Event::Update> runningUpdate1;
+
+  testing::Sequence task1;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task1)
     .WillOnce(
         DoAll(
-            FutureArg<1>(&updateStarting1),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+            FutureArg<1>(&startingUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task1)
     .WillOnce(
         DoAll(
-            FutureArg<1>(&updateStartingOrRunning1),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Update> startingUpdate2;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+
+  testing::Sequence task2;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task2)
     .WillOnce(
         DoAll(
-            FutureArg<1>(&updateStartingOrRunning2),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+            FutureArg<1>(&startingUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task2)
     .WillOnce(
         DoAll(
-            FutureArg<1>(&updateRunning2),
+            FutureArg<1>(&runningUpdate2),
             v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
   mesos.send(
@@ -889,32 +1004,19 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask)
           frameworkId,
           offer,
           {v1::LAUNCH_GROUP(
-              executorInfo, v1::createTaskGroupInfo({task1, task2}))}));
-
-
-  AWAIT_READY(updateStarting1);
-  AWAIT_READY(updateStartingOrRunning1);
-  AWAIT_READY(updateStartingOrRunning2);
-  AWAIT_READY(updateRunning2);
-
-  ASSERT_EQ(TASK_STARTING, updateStarting1->status().state());
-  ASSERT_EQ(TASK_RUNNING, updateRunning2->status().state());
+              executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
 
-  // Select the two TASK_RUNNING updates from the first four updates
-  Event::Update update1 = updateStartingOrRunning1.get();
-  if (update1.status().state() == v1::TASK_STARTING) {
-    update1 = updateStartingOrRunning2.get();
-  }
-  Event::Update update2 = updateRunning2.get();
+  AWAIT_READY(startingUpdate1);
+  AWAIT_READY(runningUpdate1);
 
-  ASSERT_EQ(TASK_RUNNING, update1.status().state());
-  ASSERT_EQ(TASK_RUNNING, update2.status().state());
+  AWAIT_READY(startingUpdate2);
+  AWAIT_READY(runningUpdate2);
 
-  ASSERT_TRUE(update1.status().has_container_status());
-  ASSERT_TRUE(update2.status().has_container_status());
+  ASSERT_TRUE(runningUpdate1->status().has_container_status());
+  ASSERT_TRUE(runningUpdate2->status().has_container_status());
 
-  v1::ContainerStatus status1 = update1.status().container_status();
-  v1::ContainerStatus status2 = update2.status().container_status();
+  v1::ContainerStatus status1 = runningUpdate1->status().container_status();
+  v1::ContainerStatus status2 = runningUpdate2->status().container_status();
 
   ASSERT_TRUE(status1.has_container_id());
   ASSERT_TRUE(status2.has_container_id());
@@ -1086,27 +1188,86 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
   // The first task finishes successfully while the second
   // task is explicitly killed later.
 
-  v1::TaskInfo task1 = v1::createTask(agentId, resources, "exit 0");
+  v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "exit 0");
 
-  v1::TaskInfo task2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+  v1::TaskInfo taskInfo2 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  // We expect two TASK_STARTING, two TASK_RUNNING, and one TASK_FINISHED
-  // updates.
-  vector<Future<v1::scheduler::Event::Update>> updates(5);
+  Future<v1::scheduler::Event::Update> startingUpdate1;
+  Future<v1::scheduler::Event::Update> runningUpdate1;
+  Future<v1::scheduler::Event::Update> finishedUpdate1;
+
+  testing::Sequence task1;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
-  {
-    // This variable doesn't have to be used explicitly. We need it so that the
-    // futures are satisfied in the order in which the updates are received.
-    testing::InSequence inSequence;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
-    foreach (Future<v1::scheduler::Event::Update>& update, updates) {
-      EXPECT_CALL(*scheduler, update(_, _))
-        .WillOnce(
-            DoAll(
-                FutureArg<1>(&update),
-                v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-    }
-  }
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1),
+          TaskStatusUpdateStateEq(v1::TASK_FINISHED))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&finishedUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Update> startingUpdate2;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  Future<v1::scheduler::Event::Update> killedUpdate2;
+
+  testing::Sequence task2;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&killedUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
   Future<v1::scheduler::Event::Failure> executorFailure;
   EXPECT_CALL(*scheduler, failure(_, _))
@@ -1117,77 +1278,24 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
           frameworkId,
           offer,
           {v1::LAUNCH_GROUP(
-              executorInfo, v1::createTaskGroupInfo({task1, task2}))}));
-
-  enum class Stage
-  {
-    INITIAL,
-    STARTING,
-    RUNNING,
-    FINISHED
-  };
-
-  hashmap<v1::TaskID, Stage> taskStages;
-  taskStages[task1.task_id()] = Stage::INITIAL;
-  taskStages[task2.task_id()] = Stage::INITIAL;
-
-  foreach (Future<v1::scheduler::Event::Update>& update, updates) {
-    AWAIT_READY(update);
-
-    const v1::TaskStatus& taskStatus = update->status();
-
-    Option<Stage> taskStage = taskStages.get(taskStatus.task_id());
-    ASSERT_SOME(taskStage);
-
-    switch (taskStage.get()) {
-      case Stage::INITIAL: {
-        ASSERT_EQ(TASK_STARTING, taskStatus.state());
-
-        taskStages[taskStatus.task_id()] = Stage::STARTING;
-
-        break;
-      }
-      case Stage::STARTING: {
-        ASSERT_EQ(TASK_RUNNING, taskStatus.state());
-
-        taskStages[taskStatus.task_id()] = Stage::RUNNING;
-
-        break;
-      }
-      case Stage::RUNNING: {
-        ASSERT_EQ(TASK_FINISHED, taskStatus.state());
+              executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
 
-        taskStages[taskStatus.task_id()] = Stage::FINISHED;
+  AWAIT_READY(startingUpdate1);
+  AWAIT_READY(runningUpdate1);
+  AWAIT_READY(finishedUpdate1);
 
-        break;
-      }
-      case Stage::FINISHED: {
-        FAIL() << "Unexpected task update: " << update->DebugString();
-        break;
-      }
-    }
-  }
+  AWAIT_READY(startingUpdate2);
+  AWAIT_READY(runningUpdate2);
 
-  // `task1` should have finished, `task2` should still be running.
-  ASSERT_EQ(Stage::FINISHED, taskStages[task1.task_id()]);
-  ASSERT_EQ(Stage::RUNNING, taskStages[task2.task_id()]);
+  ASSERT_TRUE(killedUpdate2.isPending());
 
   // The executor should still be alive after task1 has finished successfully.
   ASSERT_TRUE(executorFailure.isPending());
 
-  Future<v1::scheduler::Event::Update> killedUpdate;
-  EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(
-        DoAll(
-            FutureArg<1>(&killedUpdate),
-            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
-
   // Now kill the second task in the task group.
-  mesos.send(v1::createCallKill(frameworkId, task2.task_id()));
+  mesos.send(v1::createCallKill(frameworkId, taskInfo2.task_id()));
 
-  AWAIT_READY(killedUpdate);
-  ASSERT_EQ(TASK_KILLED, killedUpdate->status().state());
-  ASSERT_EQ(task2.task_id(), killedUpdate->status().task_id());
+  AWAIT_READY(killedUpdate2);
 
   // The executor should commit suicide after the task is killed.
   AWAIT_READY(executorFailure);

http://git-wip-us.apache.org/repos/asf/mesos/blob/e6c6ab85/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 345b883..d8ca9a3 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -3064,6 +3064,20 @@ void ExpectNoFutureUnionHttpProtobufs(
 MATCHER_P(TaskStatusEq, task, "") { return arg.task_id() == task.task_id(); }
 
 
+// This matcher is used to match the task id of `Event.update.status` message.
+MATCHER_P(TaskStatusUpdateTaskIdEq, taskInfo, "")
+{
+  return arg.status().task_id() == taskInfo.task_id();
+}
+
+
+// This matcher is used to match the state of `Event.update.status` message.
+MATCHER_P(TaskStatusUpdateStateEq, taskState, "")
+{
+  return arg.status().state() == taskState;
+}
+
+
 struct ParamExecutorType
 {
 public:


[2/2] mesos git commit: Renamed `TaskStatusEq()` to `TaskStatusTaskIdEq()`.

Posted by al...@apache.org.
Renamed `TaskStatusEq()` to `TaskStatusTaskIdEq()`.

Review: https://reviews.apache.org/r/63943/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ee4da2b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ee4da2b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ee4da2b

Branch: refs/heads/master
Commit: 9ee4da2b55cb9c2c67fc5a80078675d0e3da46ab
Parents: e6c6ab8
Author: Qian Zhang <zh...@gmail.com>
Authored: Tue Nov 21 18:39:29 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Tue Nov 21 18:39:29 2017 +0100

----------------------------------------------------------------------
 src/tests/master_tests.cpp |  4 ++--
 src/tests/mesos.hpp        | 12 +++++-------
 2 files changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9ee4da2b/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 139125b..01f45a9 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -5783,7 +5783,7 @@ TEST_F(MasterTest, MasterFailoverLongLivedExecutor)
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
   Future<TaskStatus> status1;
-  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusEq(task1)))
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusTaskIdEq(task1)))
     .WillOnce(FutureArg<1>(&status1))
     .WillRepeatedly(Return());
 
@@ -5815,7 +5815,7 @@ TEST_F(MasterTest, MasterFailoverLongLivedExecutor)
   task2.mutable_task_id()->set_value("2");
 
   Future<TaskStatus> status2;
-  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusEq(task2)))
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusTaskIdEq(task2)))
     .WillOnce(FutureArg<1>(&status2))
     .WillRepeatedly(Return());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9ee4da2b/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index d8ca9a3..30b8ac5 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -3055,13 +3055,11 @@ void ExpectNoFutureUnionHttpProtobufs(
 }
 
 
-// This matcher is used to match the task ids of TaskStatus messages.
-// Suppose we set up N futures for LaunchTasks and N futures for StatusUpdates.
-// (This is a common pattern). We get into a situation where all StatusUpdates
-// are satisfied before the LaunchTasks if the master re-sends StatusUpdates.
-// We use this matcher to only satisfy the StatusUpdate future if the
-// StatusUpdate came from the corresponding task.
-MATCHER_P(TaskStatusEq, task, "") { return arg.task_id() == task.task_id(); }
+// This matcher is used to match the task id of `TaskStatus` message.
+MATCHER_P(TaskStatusTaskIdEq, taskInfo, "")
+{
+  return arg.task_id() == taskInfo.task_id();
+}
 
 
 // This matcher is used to match the task id of `Event.update.status` message.