You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2018/09/05 23:54:16 UTC

[mesos] 05/06: Added tests for task metadata GC using the default executor.

This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch 1.7.x
in repository

commit c03df894d9f58d6b632e9f56fa1bf0a1b6152bf3
Author: Joseph Wu <>
AuthorDate: Tue Jul 24 13:59:07 2018 -0700

    Added tests for task metadata GC using the default executor.
    This adds two tests that launch a long-lived and short-lived task
    on the same executor.  The tests expect the short-lived task's
    metadata and sandbox to be garbage collected.
    One test restarts the agent before GC to ensure recovery will
    schedule completed tasks for GC too.
    One existing test is modified due to the extra GC event from
    task metadata.
 src/tests/gc_tests.cpp             | 463 +++++++++++++++++++++++++++++++++++++
 src/tests/slave_recovery_tests.cpp |  13 +-
 2 files changed, 472 insertions(+), 4 deletions(-)

diff --git a/src/tests/gc_tests.cpp b/src/tests/gc_tests.cpp
index 4f288cf..55cba06 100644
--- a/src/tests/gc_tests.cpp
+++ b/src/tests/gc_tests.cpp
@@ -25,6 +25,11 @@
 #include <mesos/resources.hpp>
 #include <mesos/scheduler.hpp>
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/scheduler.hpp>
+#include <process/collect.hpp>
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
@@ -84,7 +89,9 @@ using std::string;
 using std::vector;
 using testing::_;
+using testing::AllOf;
 using testing::AtMost;
+using testing::DoAll;
 using testing::Return;
 using testing::SaveArg;
@@ -646,6 +653,462 @@ TEST_F(GarbageCollectorIntegrationTest, ExitedExecutor)
+// This test verifies that task metadata and sandboxes are scheduled for GC
+// when a task finishes, but the executor is still running.
+TEST_F(GarbageCollectorIntegrationTest, LongLivedDefaultExecutor)
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+  // We need this for the agent's work directory and GC policy.
+  slave::Flags flags = CreateSlaveFlags();
+  // Turn on GC of nested container sandboxes by default.
+  flags.gc_non_executor_container_sandboxes = true;
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+  // Enable checkpointing, otherwise there will be no metadata to GC.
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+  // We launch two tasks for this test:
+  //   * One will be a long-lived task to keep the executor alive.
+  //   * One will be a short-lived task to exercise task metadata/sandbox GC.
+  v1::TaskInfo longLivedTaskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+  v1::TaskInfo shortLivedTaskInfo =
+    v1::createTask(agentId, resources, "exit 0");
+  // There should be a total of 5 updates:
+  //   * TASK_STARTING/RUNNING from the long-lived task,
+  //   * TASK_STARTING/RUNNING/FINISHED from the short-lived task.
+  testing::Sequence longTask;
+  Future<v1::scheduler::Event::Update> longStartingUpdate;
+  Future<v1::scheduler::Event::Update> longRunningUpdate;
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(longLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(longTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&longStartingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(longLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(longTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&longRunningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+  testing::Sequence shortTask;
+  Future<v1::scheduler::Event::Update> shortStartingUpdate;
+  Future<v1::scheduler::Event::Update> shortRunningUpdate;
+  Future<v1::scheduler::Event::Update> shortFinishedUpdate;
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(shortLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(shortTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&shortStartingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(shortLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(shortTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&shortRunningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(shortLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_FINISHED))))
+    .InSequence(shortTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&shortFinishedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+  // There should be two directories scheduled for GC:
+  // the short-lived task's metadata and sandbox.
+  vector<Future<Nothing>> schedules = {
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule)
+  };
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {
+            v1::LAUNCH_GROUP(
+                executorInfo, v1::createTaskGroupInfo({longLivedTaskInfo})),
+            v1::LAUNCH_GROUP(
+                executorInfo, v1::createTaskGroupInfo({shortLivedTaskInfo}))
+          }));
+  AWAIT_READY(collect(schedules));
+  AWAIT_READY(longStartingUpdate);
+  AWAIT_READY(longRunningUpdate);
+  AWAIT_READY(shortStartingUpdate);
+  AWAIT_READY(shortRunningUpdate);
+  AWAIT_READY(shortFinishedUpdate);
+  // Check that the short-lived task's metadata and sandbox exist.
+  string shortLivedTaskPath = slave::paths::getTaskPath(
+      slave::paths::getMetaRootDir(flags.work_dir),
+      devolve(agentId),
+      devolve(frameworkId),
+      devolve(executorInfo.executor_id()),
+      devolve(
+          shortStartingUpdate->status()
+            .container_status().container_id().parent()),
+      devolve(shortLivedTaskInfo.task_id()));
+  ASSERT_TRUE(os::exists(shortLivedTaskPath));
+  string shortLivedSandboxPath = path::join(
+      slave::paths::getExecutorRunPath(
+          flags.work_dir,
+            devolve(agentId),
+            devolve(frameworkId),
+            devolve(executorInfo.executor_id()),
+            devolve(
+                shortStartingUpdate->status()
+                  .container_status().container_id().parent())),
+      "containers",
+      shortStartingUpdate->status().container_status().container_id().value());
+  ASSERT_TRUE(os::exists(shortLivedSandboxPath));
+  // Check another metadata directory that should only be GC'd after the
+  // executor exits.
+  string executorMetaPath = slave::paths::getExecutorPath(
+      slave::paths::getMetaRootDir(flags.work_dir),
+      devolve(agentId),
+      devolve(frameworkId),
+      devolve(executorInfo.executor_id()));
+  ASSERT_TRUE(os::exists(executorMetaPath));
+  // Trigger garbage collection on the short-lived task's directories
+  // and check that those are properly deleted.
+  Clock::pause();
+  Clock::advance(flags.gc_delay);
+  Clock::settle();
+  Clock::resume();
+  ASSERT_FALSE(os::exists(shortLivedTaskPath));
+  ASSERT_FALSE(os::exists(shortLivedSandboxPath));
+  ASSERT_TRUE(os::exists(executorMetaPath));
+  // Kill the remaining task and trigger garbage collection again.
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(DoAll(
+        FutureArg<1>(&killedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+  // Since this is the last executor belonging to the framework, we expect
+  // multiple directories to be scheduled for GC:
+  //   * Task, Executor container, Executor, and Framework metadata directories.
+  //   * Executor sandbox and run directories.
+  //   * Framework work directory.
+  schedules = {
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule)
+  };
+  mesos.send(v1::createCallKill(frameworkId, longLivedTaskInfo.task_id()));
+  AWAIT_READY(killedUpdate);
+  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+  EXPECT_EQ(longLivedTaskInfo.task_id(), killedUpdate->status().task_id());
+  AWAIT_READY(collect(schedules));
+  // Trigger GC and then check one of the directories above.
+  Clock::pause();
+  Clock::advance(flags.gc_delay);
+  Clock::settle();
+  Clock::resume();
+  ASSERT_FALSE(os::exists(executorMetaPath));
+// This test verifies that task metadata and sandboxes are scheduled for GC
+// when a task finishes, but the executor is still running. This version of
+// the test restarts the agent to ensure recovered tasks are also scheduled
+// for GC.
+TEST_F(GarbageCollectorIntegrationTest, LongLivedDefaultExecutorRestart)
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+  // We need this for the agent's work directory and GC policy.
+  slave::Flags flags = CreateSlaveFlags();
+  // Turn on GC of nested container sandboxes by default.
+  flags.gc_non_executor_container_sandboxes = true;
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+  // Enable checkpointing, otherwise there will be no metadata to GC.
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+  // We launch two tasks for this test:
+  //   * One will be a long-lived task to keep the executor alive.
+  //   * One will be a short-lived task to exercise task metadata/sandbox GC.
+  v1::TaskInfo longLivedTaskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+  v1::TaskInfo shortLivedTaskInfo =
+    v1::createTask(agentId, resources, "exit 0");
+  // There should be a total of 5 updates:
+  //   * TASK_STARTING/RUNNING from the long-lived task,
+  //   * TASK_STARTING/RUNNING/FINISHED from the short-lived task.
+  testing::Sequence longTask;
+  Future<v1::scheduler::Event::Update> longStartingUpdate;
+  Future<v1::scheduler::Event::Update> longRunningUpdate;
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(longLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(longTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&longStartingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(longLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(longTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&longRunningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+  testing::Sequence shortTask;
+  Future<v1::scheduler::Event::Update> shortStartingUpdate;
+  Future<v1::scheduler::Event::Update> shortRunningUpdate;
+  Future<v1::scheduler::Event::Update> shortFinishedUpdate;
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(shortLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(shortTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&shortStartingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(shortLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(shortTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&shortRunningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(shortLivedTaskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_FINISHED))))
+    .InSequence(shortTask)
+    .WillOnce(DoAll(
+        FutureArg<1>(&shortFinishedUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+  // There should be two directories scheduled for GC:
+  // the short-lived task's metadata and sandbox.
+  vector<Future<Nothing>> schedules = {
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule)
+  };
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {
+            v1::LAUNCH_GROUP(
+                executorInfo, v1::createTaskGroupInfo({longLivedTaskInfo})),
+            v1::LAUNCH_GROUP(
+                executorInfo, v1::createTaskGroupInfo({shortLivedTaskInfo}))
+          }));
+  AWAIT_READY(collect(schedules));
+  AWAIT_READY(longStartingUpdate);
+  AWAIT_READY(longRunningUpdate);
+  AWAIT_READY(shortStartingUpdate);
+  AWAIT_READY(shortRunningUpdate);
+  AWAIT_READY(shortFinishedUpdate);
+  // Check that the short-lived task's metadata and sandbox exist.
+  string shortLivedTaskPath = slave::paths::getTaskPath(
+      slave::paths::getMetaRootDir(flags.work_dir),
+      devolve(agentId),
+      devolve(frameworkId),
+      devolve(executorInfo.executor_id()),
+      devolve(
+          shortStartingUpdate->status()
+            .container_status().container_id().parent()),
+      devolve(shortLivedTaskInfo.task_id()));
+  ASSERT_TRUE(os::exists(shortLivedTaskPath));
+  string shortLivedSandboxPath = path::join(
+      slave::paths::getExecutorRunPath(
+          flags.work_dir,
+            devolve(agentId),
+            devolve(frameworkId),
+            devolve(executorInfo.executor_id()),
+            devolve(
+                shortStartingUpdate->status()
+                  .container_status().container_id().parent())),
+      "containers",
+      shortStartingUpdate->status().container_status().container_id().value());
+  ASSERT_TRUE(os::exists(shortLivedSandboxPath));
+  // Restart the agent to wipe out any scheduled GC.
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+  slave.get()->terminate();
+  // The agent should reregister once recovery is complete, which also means
+  // that any finished tasks metadata/sandboxes should be rescheduled for GC.
+  slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+  AWAIT_READY(slaveReregisteredMessage);
+  // Trigger garbage collection on the short-lived task's directories
+  // and check that those are properly deleted.
+  Clock::pause();
+  Clock::advance(flags.gc_delay);
+  Clock::settle();
+  Clock::resume();
+  ASSERT_FALSE(os::exists(shortLivedTaskPath));
+  ASSERT_FALSE(os::exists(shortLivedSandboxPath));
 TEST_F(GarbageCollectorIntegrationTest, DiskUsage)
   Try<Owned<cluster::Master>> master = StartMaster();
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 9710518..5842ccf 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -33,6 +33,7 @@
 #include <mesos/scheduler/scheduler.hpp>
+#include <process/collect.hpp>
 #include <process/dispatch.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
@@ -1864,8 +1865,13 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
   Future<RegisterExecutorMessage> registerExecutor =
     FUTURE_PROTOBUF(RegisterExecutorMessage(), _, _);
-  Future<Nothing> schedule = FUTURE_DISPATCH(
-      _, &GarbageCollectorProcess::schedule);
+  // We use 'gc.schedule' as a proxy for the cleanup of the executor.
+  // The first event will correspond with the finished task, whereas
+  // the second is associated with the exited executor.
+  vector<Future<Nothing>> schedules = {
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule)
+  };
   driver.launchTasks(offers1.get()[0].id(), {task});
@@ -1873,8 +1879,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
   ExecutorID executorId = registerExecutor->executor_id();
-  // We use 'gc.schedule' as a proxy for the cleanup of the executor.
-  AWAIT_READY(schedule);
+  AWAIT_READY(collect(schedules));