You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2019/07/15 18:50:30 UTC

[mesos] 10/14: Added recovery of agent drain information.

This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 54fb43e7fb5fb1884e8f0ba087ae3db8cfc8d498
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Mon Jul 15 10:26:02 2019 -0700

    Added recovery of agent drain information.
    
    With this patch the agent will, after executor reregistration finished,
    replay any active drain information so remaining tasks are drained as
    well. We need to wait until executors had a chance to register so they
    are not terminated should we try to send kill task request before the
    executor has registered.
    
    Review: https://reviews.apache.org/r/70907/
---
 src/slave/slave.cpp       |  12 +++++
 src/slave/state.cpp       |  16 +++++++
 src/slave/state.hpp       |   3 ++
 src/tests/slave_tests.cpp | 117 ++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 148 insertions(+)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 19b4769..3878ab8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5630,6 +5630,16 @@ void Slave::reregisterExecutorTimeout()
     }
   }
 
+  // Replay any active draining.
+  if (drainConfig.isSome()) {
+    DrainSlaveMessage drainSlaveMessage;
+    *drainSlaveMessage.mutable_config() = *drainConfig;
+
+    LOG(INFO) << "Replaying in-process agent draining";
+
+    drain(self(), std::move(drainSlaveMessage));
+  }
+
   // Signal the end of recovery.
   // TODO(greggomann): Allow the agent to complete recovery before the executor
   // re-registration timeout has elapsed. See MESOS-7539
@@ -7512,6 +7522,8 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
     // we can not reuse the id, we will either crash or erase it again.
     info.mutable_id()->CopyFrom(slaveState->info->id());
 
+    drainConfig = slaveState->drainConfig;
+
     // Check for SlaveInfo compatibility.
     Try<Nothing> _compatible =
       compatible(slaveState->info.get(), info);
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index e0a850e..cd3fac7 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -203,6 +203,22 @@ Try<SlaveState> SlaveState::recover(
     state.errors += framework->errors;
   }
 
+  // Recover any drain state.
+  const string drainConfigPath = paths::getDrainConfigPath(rootDir, slaveId);
+  if (os::exists(drainConfigPath)) {
+    Result<DrainConfig> drainConfig = state::read<DrainConfig>(drainConfigPath);
+    if (drainConfig.isError()) {
+      string message = "Failed to read agent state file '"
+                       + drainConfigPath + "': " + drainConfig.error();
+
+      LOG(WARNING) << message;
+      state.errors++;
+    }
+    if (drainConfig.isSome()) {
+      state.drainConfig = *drainConfig;
+    }
+  }
+
   const string resourceStatePath = paths::getResourceStatePath(rootDir);
   if (os::exists(resourceStatePath)) {
     Result<ResourceState> resourceState =
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 45836e5..6d6ae01 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -391,6 +391,9 @@ struct SlaveState
   // state didn't support checkpointing operations.
   Option<std::vector<Operation>> operations;
 
+  // The drain state of the agent, if any.
+  Option<DrainConfig> drainConfig;
+
   unsigned int errors;
 };
 
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 147967d..5f8e53c 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -12216,6 +12216,123 @@ TEST_F(SlaveTest, DrainAgentKillsPendingTask)
   EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
 }
 
+
+// This test verifies that if the agent recovers that it is in
+// draining state any tasks after the restart are killed.
+TEST_F(SlaveTest, CheckpointedDrainInfo)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  MockExecutor exec(executorId);
+  TestContainerizer containerizer(&exec);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(&detector, &containerizer, slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger the agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start a framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  SlaveID slaveId = offers.get()[0].slave_id();
+  TaskInfo task = createTask(
+      slaveId,
+      Resources::parse("cpus:1;mem:32").get(),
+      SLEEP_COMMAND(1000),
+      executorId);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  constexpr int GRACE_PERIOD_NANOS = 1000000;
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(GRACE_PERIOD_NANOS);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(true);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning));
+
+  AWAIT_READY(statusRunning);
+  ASSERT_EQ(TaskState::TASK_RUNNING, statusRunning->state());
+
+  // We expect a request to kill the task when the drain request is initially
+  // received. The executor ignores the request and reregisters after agent
+  // restart.
+  Future<Nothing> killTask1;
+  EXPECT_CALL(exec, killTask(_, _))
+    .WillOnce(FutureSatisfy(&killTask1));
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  AWAIT_READY(killTask1);
+
+  Future<Nothing> reregistered;
+  EXPECT_CALL(exec, reregistered(_, _))
+    .WillOnce(DoAll(
+        Invoke(&exec, &MockExecutor::reregistered),
+        FutureSatisfy(&reregistered)))
+    .WillRepeatedly(DoDefault());
+
+  // Once the agent has finished recovering executors it should send
+  // another task kill request to the executor.
+  Future<Nothing> killTask2;
+  EXPECT_CALL(exec, killTask(_, _))
+    .WillOnce(FutureSatisfy(&killTask2));
+
+  // Restart the agent.
+  slave.get()->terminate();
+  slave = StartSlave(&detector, &containerizer, slaveFlags);
+
+  AWAIT_READY(reregistered);
+
+  // Advance the clock to finish the executor reregistration phase.
+  Clock::advance(slaveFlags.executor_reregistration_timeout);
+
+  AWAIT_READY(killTask2);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {