You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2018/11/14 21:03:34 UTC

[mesos] branch master updated: Fixed flaky agent reconfiguration test.

This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 025451d  Fixed flaky agent reconfiguration test.
025451d is described below

commit 025451dbd0b5fe15928cc008d3c1c6e844f1666b
Author: Benno Evers <be...@mesosphere.com>
AuthorDate: Wed Nov 14 12:15:58 2018 -0800

    Fixed flaky agent reconfiguration test.
    
    Removed some flakyness from the test
    SlaveRecoveryTest.AgentReconfigurationWithRunningTask
    by removing the `refuse_offers` filter and by pausing
    the clock during the test.
    
    Review: https://reviews.apache.org/r/69273/
---
 src/tests/slave_recovery_tests.cpp | 67 ++++++++++++++++++++++++++------------
 1 file changed, 47 insertions(+), 20 deletions(-)

diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 0fef5aa..5ee64a4 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -4719,8 +4719,11 @@ TYPED_TEST(SlaveRecoveryTest, RestartBeforeContainerizerLaunch)
 // offers.
 TYPED_TEST(SlaveRecoveryTest, AgentReconfigurationWithRunningTask)
 {
+  Clock::pause();
+
   // Start a master.
-  Try<Owned<cluster::Master>> master = this->StartMaster();
+  master::Flags masterFlags = this->CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   // Start a framework.
@@ -4735,35 +4738,37 @@ TYPED_TEST(SlaveRecoveryTest, AgentReconfigurationWithRunningTask)
 
   Future<vector<Offer>> offers1;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers1))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
+    .WillOnce(FutureArg<1>(&offers1));
 
   driver.start();
 
   // Start a slave.
-  slave::Flags flags = this->CreateSlaveFlags();
+  slave::Flags slaveFlags = this->CreateSlaveFlags();
 
   // NOTE: These tests will start with "zero" memory, and the default Windows
   // isolators will enforce this, causing the task to crash. The default POSIX
   // isolators don't actually perform isolation, and so this does not occur.
   // However, these tests are not testing isolation, they're testing resource
   // accounting, so we can just use "no" isolators.
-  flags.isolation = "";
-  flags.resources = "cpus:5;mem:0;disk:0;ports:0";
+  slaveFlags.isolation = "";
+  slaveFlags.resources = "cpus:5;mem:0;disk:0;ports:0";
 
-  Fetcher fetcher(flags);
+  Fetcher fetcher(slaveFlags);
+
+  Try<TypeParam*> _containerizer = TypeParam::create(
+      slaveFlags, true, &fetcher);
 
-  Try<TypeParam*> _containerizer = TypeParam::create(flags, true, &fetcher);
   ASSERT_SOME(_containerizer);
   Owned<slave::Containerizer> containerizer(_containerizer.get());
 
   Owned<MasterDetector> detector = master.get()->createDetector();
   Try<Owned<cluster::Slave>> slave =
-    this->StartSlave(detector.get(), containerizer.get(), flags);
+    this->StartSlave(detector.get(), containerizer.get(), slaveFlags);
 
   ASSERT_SOME(slave);
 
   // Start a long-running task on the slave.
+  Clock::advance(masterFlags.allocation_interval);
   AWAIT_READY(offers1);
   ASSERT_FALSE(offers1->empty());
 
@@ -4783,7 +4788,13 @@ TYPED_TEST(SlaveRecoveryTest, AgentReconfigurationWithRunningTask)
     .WillOnce(FutureArg<1>(&statusRunning))
     .WillOnce(FutureArg<1>(&statusKilled));
 
-  driver.launchTasks(offers1.get()[0].id(), {task});
+  // Explicitly set the `refuse_seconds` offer filter to 0,
+  // because the default five seconds can be enough to cause
+  // timeouts when awaiting the subsequent offer on heavily
+  // loaded systems. (MESOS-9358)
+  Filters filters;
+  filters.set_refuse_seconds(0);
+  driver.launchTasks(offers1.get()[0].id(), {task}, filters);
 
   AWAIT_READY(statusStarting);
   AWAIT_READY(statusRunning);
@@ -4791,9 +4802,9 @@ TYPED_TEST(SlaveRecoveryTest, AgentReconfigurationWithRunningTask)
   // Grab one of the offers while the task is running.
   Future<vector<Offer>> offers2;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers2))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
+    .WillOnce(FutureArg<1>(&offers2));
 
+  Clock::advance(masterFlags.allocation_interval);
   AWAIT_READY(offers2);
   ASSERT_FALSE(offers2->empty());
 
@@ -4803,29 +4814,44 @@ TYPED_TEST(SlaveRecoveryTest, AgentReconfigurationWithRunningTask)
 
   driver.declineOffer(offers2.get()[0].id());
 
+
   // Restart the slave with increased resources.
   slave.get()->terminate();
-  flags.reconfiguration_policy = "additive";
-  flags.resources = "cpus:10;mem:512;disk:0;ports:0";
+  slaveFlags.reconfiguration_policy = "additive";
+  slaveFlags.resources = "cpus:10;mem:512;disk:0;ports:0";
+
+  // Remove the registration delay to circumvent a delay in the recovery
+  // logic. This will allow the agent to register with the master after
+  // detecting the master, even when the clock is paused.
+  slaveFlags.registration_backoff_factor = Seconds(0);
 
   // Restart the slave with a new containerizer.
-  _containerizer = TypeParam::create(flags, true, &fetcher);
+  _containerizer = TypeParam::create(slaveFlags, true, &fetcher);
   ASSERT_SOME(_containerizer);
   containerizer.reset(_containerizer.get());
 
+  Future<ReregisterExecutorMessage> reregisterExecutorMessage =
+    FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
   Future<SlaveReregisteredMessage> slaveReregistered =
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
   // Grab one of the offers after the slave was restarted.
   Future<vector<Offer>> offers3;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers3))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
+    .WillOnce(FutureArg<1>(&offers3));
 
-  slave = this->StartSlave(detector.get(), containerizer.get(), flags);
+  slave = this->StartSlave(detector.get(), containerizer.get(), slaveFlags);
   ASSERT_SOME(slave);
+
+  // Wait for the executor and then skip forward to trigger the executor
+  // reregistration timeout, which resumes the agent recovery.
+  AWAIT_READY(reregisterExecutorMessage);
+  Clock::advance(slaveFlags.executor_reregistration_timeout);
+
   AWAIT_READY(slaveReregistered);
 
+  Clock::advance(masterFlags.allocation_interval);
   AWAIT_READY(offers3);
   EXPECT_EQ(
       offers3.get()[0].resources(),
@@ -4842,9 +4868,10 @@ TYPED_TEST(SlaveRecoveryTest, AgentReconfigurationWithRunningTask)
   // Grab one of the offers after the task was killed.
   Future<vector<Offer>> offers4;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers4))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
+    .WillOnce(FutureArg<1>(&offers4));
 
+  Clock::advance(masterFlags.allocation_interval);
+  AWAIT_READY(offers4);
   EXPECT_EQ(
       offers4.get()[0].resources(),
       allocatedResources(Resources::parse("cpus:10;mem:512").get(), "*"));