You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/08/31 00:32:19 UTC

[2/2] mesos git commit: Added tests to ensure that tasks can access their parent's volumes.

Added tests to ensure that tasks can access their parent's volumes.

These tests verify that sibling tasks can share a persistent volume
owned by their parent executor using 'sandbox_path' volumes.

Review: https://reviews.apache.org/r/61921/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/be7d2ca4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/be7d2ca4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/be7d2ca4

Branch: refs/heads/master
Commit: be7d2ca48a765c247b644fb54d142602ac487d61
Parents: 065d2a8
Author: Gastón Kleiman <ga...@mesosphere.io>
Authored: Wed Aug 30 17:20:38 2017 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Aug 30 17:21:57 2017 -0700

----------------------------------------------------------------------
 src/tests/default_executor_tests.cpp | 433 ++++++++++++++++++++++++++++++
 1 file changed, 433 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/be7d2ca4/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index afe0afa..138c428 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -1859,6 +1859,439 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   EXPECT_SOME_EQ("abc\n", os::read(filePath));
 }
 
+
+// This test verifies that sibling tasks in the same task group can share a
+// Volume owned by their parent executor using 'sandbox_path' volumes.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(
+    PersistentVolumeDefaultExecutor, ROOT_TasksSharingViaSandboxVolumes)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.launcher = param.launcher;
+  flags.isolation = param.isolation;
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
+                    FutureSatisfy(&connected)));
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::Resources individualResources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get()
+      .pushReservation(
+          v1::createDynamicReservationInfo(
+              frameworkInfo.role(), frameworkInfo.principal()));
+
+  v1::Resources totalResources =
+    v1::Resources::parse("cpus:0.3;mem:96;disk:96").get()
+      .pushReservation(
+          v1::createDynamicReservationInfo(
+              frameworkInfo.role(), frameworkInfo.principal()));
+
+  v1::Resource executorVolume = v1::createPersistentVolume(
+      Megabytes(1),
+      frameworkInfo.role(),
+      "executor",
+      "executor_volume_path",
+      frameworkInfo.principal(),
+      None(),
+      frameworkInfo.principal());
+
+  v1::Resources executorResources =
+    individualResources.apply(v1::CREATE(executorVolume)).get();
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID.value(),
+      None(),
+      None(),
+      v1::ExecutorInfo::DEFAULT);
+
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+  executorInfo.mutable_resources()->CopyFrom(executorResources);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+
+  // Create a "producer" task that creates a file in a 'sandbox_path' Volume
+  // owned by the Executor, and a "consumer" task that waits for the file to
+  // exist in a 'sandbox_path' Volume owned by the Executor.
+  //
+  // The test will only succeed if the task volume's source path is set to the
+  // path of the executor's persistent volume.
+
+  // TODO(gilbert): Refactor the following code once the helper to create a
+  // 'sandbox_path' volume is supported.
+
+  mesos::v1::Volume taskVolume;
+  taskVolume.set_mode(mesos::v1::Volume::RW);
+  taskVolume.set_container_path("task_volume_path");
+
+  mesos::v1::Volume::Source* source = taskVolume.mutable_source();
+  source->set_type(mesos::v1::Volume::Source::SANDBOX_PATH);
+
+  mesos::v1::Volume::Source::SandboxPath* sandboxPath =
+    source->mutable_sandbox_path();
+
+  sandboxPath->set_type(mesos::v1::Volume::Source::SandboxPath::PARENT);
+  sandboxPath->set_path("executor_volume_path");
+
+  mesos::v1::ContainerInfo containerInfo;
+  containerInfo.set_type(mesos::v1::ContainerInfo::MESOS);
+  containerInfo.add_volumes()->CopyFrom(taskVolume);
+
+  // A "producer" task that expects the persistent volume to be mounted in its
+  // sandbox.
+  v1::TaskInfo producerInfo = v1::createTask(
+      offer.agent_id(),
+      individualResources,
+      "echo abc > task_volume_path/file",
+      None(),
+      "producer",
+      "producer");
+  producerInfo.mutable_container()->CopyFrom(containerInfo);
+
+  // A "consumer" task that expects the persistent volume to be mounted in its
+  // sandbox, and waits for a file to exist before exiting.
+  v1::TaskInfo consumerInfo = v1::createTask(
+      offer.agent_id(),
+      individualResources,
+      "while [ ! -f task_volume_path/file ]; do sleep 1; done\ntrue",
+      None(),
+      "consumer",
+      "consumer");
+  consumerInfo.mutable_container()->CopyFrom(containerInfo);
+
+  vector<Future<v1::scheduler::Event::Update>> updates(4);
+
+  {
+    // This variable doesn't have to be used explicitly. We need it so that the
+    // futures are satisfied in the order in which the updates are received.
+    testing::InSequence inSequence;
+
+    foreach (Future<v1::scheduler::Event::Update>& update, updates) {
+      EXPECT_CALL(*scheduler, update(_, _))
+        .WillOnce(
+            DoAll(
+                FutureArg<1>(&update),
+                v1::scheduler::SendAcknowledge(frameworkId, offer.agent_id())));
+    }
+  }
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::RESERVE(totalResources),
+           v1::CREATE(executorVolume),
+           v1::LAUNCH_GROUP(
+               executorInfo,
+               v1::createTaskGroupInfo({producerInfo, consumerInfo}))}));
+
+  // We track the status updates of each task separately to verify that they
+  // transition from TASK_RUNNING to TASK_FINISHED.
+
+  enum class Stage
+  {
+    INITIAL,
+    RUNNING,
+    FINISHED
+  };
+
+  hashmap<v1::TaskID, Stage> taskStages;
+  taskStages[producerInfo.task_id()] = Stage::INITIAL;
+  taskStages[consumerInfo.task_id()] = Stage::INITIAL;
+
+  foreach (Future<v1::scheduler::Event::Update>& update, updates) {
+    AWAIT_READY(update);
+
+    const v1::TaskStatus& taskStatus = update->status();
+
+    Option<Stage> taskStage = taskStages.get(taskStatus.task_id());
+    ASSERT_SOME(taskStage);
+
+    switch (taskStage.get()) {
+      case Stage::INITIAL: {
+        ASSERT_EQ(TASK_RUNNING, taskStatus.state());
+
+        taskStages[taskStatus.task_id()] = Stage::RUNNING;
+
+        break;
+      }
+      case Stage::RUNNING: {
+        ASSERT_EQ(TASK_FINISHED, taskStatus.state());
+
+        taskStages[taskStatus.task_id()] = Stage::FINISHED;
+
+        break;
+      }
+      case Stage::FINISHED: {
+        FAIL() << "Unexpected task update: " << update->DebugString();
+        break;
+      }
+    }
+  }
+
+  string volumePath = slave::paths::getPersistentVolumePath(
+      flags.work_dir, devolve(executorVolume));
+
+  string filePath = path::join(volumePath, "file");
+
+  // Ensure that the task was able to write to the persistent volume.
+  EXPECT_SOME_EQ("abc\n", os::read(filePath));
+}
+
+
+// This test verifies that sibling tasks in different task groups can share a
+// Volume owned by their parent executor using 'sandbox_path' volumes.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(
+    PersistentVolumeDefaultExecutor, ROOT_TaskGroupsSharingViaSandboxVolumes)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.launcher = param.launcher;
+  flags.isolation = param.isolation;
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
+                    FutureSatisfy(&connected)));
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::Resources individualResources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get()
+      .pushReservation(
+          v1::createDynamicReservationInfo(
+              frameworkInfo.role(), frameworkInfo.principal()));
+
+  v1::Resources totalResources =
+    v1::Resources::parse("cpus:0.3;mem:96;disk:96").get()
+      .pushReservation(
+          v1::createDynamicReservationInfo(
+              frameworkInfo.role(), frameworkInfo.principal()));
+
+  v1::Resource executorVolume = v1::createPersistentVolume(
+      Megabytes(1),
+      frameworkInfo.role(),
+      "executor",
+      "executor_volume_path",
+      frameworkInfo.principal(),
+      None(),
+      frameworkInfo.principal());
+
+  v1::Resources executorResources =
+    individualResources.apply(v1::CREATE(executorVolume)).get();
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID.value(),
+      None(),
+      None(),
+      v1::ExecutorInfo::DEFAULT);
+
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+  executorInfo.mutable_resources()->CopyFrom(executorResources);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+
+  // Create a "producer" task that creates a file in a 'sandbox_path' Volume
+  // owned by the Executor, and a "consumer" task that waits for the file to
+  // exist in a 'sandbox_path' Volume owned by the Executor.
+  //
+  // The test will only succeed if the task volume's source path is set to the
+  // path of the executor's persistent volume.
+
+  // TODO(gilbert): Refactor the following code once the helper to create a
+  // 'sandbox_path' volume is supported.
+
+  mesos::v1::Volume taskVolume;
+  taskVolume.set_mode(mesos::v1::Volume::RW);
+  taskVolume.set_container_path("task_volume_path");
+
+  mesos::v1::Volume::Source* source = taskVolume.mutable_source();
+  source->set_type(mesos::v1::Volume::Source::SANDBOX_PATH);
+
+  mesos::v1::Volume::Source::SandboxPath* sandboxPath =
+    source->mutable_sandbox_path();
+
+  sandboxPath->set_type(mesos::v1::Volume::Source::SandboxPath::PARENT);
+  sandboxPath->set_path("executor_volume_path");
+
+  mesos::v1::ContainerInfo containerInfo;
+  containerInfo.set_type(mesos::v1::ContainerInfo::MESOS);
+  containerInfo.add_volumes()->CopyFrom(taskVolume);
+
+  // A "producer" task that expects the persistent volume to be mounted in its
+  // sandbox.
+  v1::TaskInfo producerInfo = v1::createTask(
+      offer.agent_id(),
+      individualResources,
+      "echo abc > task_volume_path/file",
+      None(),
+      "producer",
+      "producer");
+  producerInfo.mutable_container()->CopyFrom(containerInfo);
+
+  // A "consumer" task that expects the persistent volume to be mounted in its
+  // sandbox, and waits for a file to exist before exiting.
+  v1::TaskInfo consumerInfo = v1::createTask(
+      offer.agent_id(),
+      individualResources,
+      "while [ ! -f task_volume_path/file ]; do sleep 1; done\ntrue",
+      None(),
+      "consumer",
+      "consumer");
+  consumerInfo.mutable_container()->CopyFrom(containerInfo);
+
+  vector<Future<v1::scheduler::Event::Update>> updates(4);
+
+  {
+    // This variable doesn't have to be used explicitly. We need it so that the
+    // futures are satisfied in the order in which the updates are received.
+    testing::InSequence inSequence;
+
+    foreach (Future<v1::scheduler::Event::Update>& update, updates) {
+      EXPECT_CALL(*scheduler, update(_, _))
+        .WillOnce(
+            DoAll(
+                FutureArg<1>(&update),
+                v1::scheduler::SendAcknowledge(frameworkId, offer.agent_id())));
+    }
+  }
+
+  // Reserve the resources, create the Executor's volume, and launch each task
+  // in a different task group.
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::RESERVE(totalResources),
+           v1::CREATE(executorVolume),
+           v1::LAUNCH_GROUP(
+               executorInfo, v1::createTaskGroupInfo({producerInfo})),
+           v1::LAUNCH_GROUP(
+               executorInfo, v1::createTaskGroupInfo({consumerInfo}))}));
+
+  // We track the status updates of each task separately to verify that they
+  // transition from TASK_RUNNING to TASK_FINISHED.
+
+  enum class Stage
+  {
+    INITIAL,
+    RUNNING,
+    FINISHED
+  };
+
+  hashmap<v1::TaskID, Stage> taskStages;
+  taskStages[producerInfo.task_id()] = Stage::INITIAL;
+  taskStages[consumerInfo.task_id()] = Stage::INITIAL;
+
+  foreach (Future<v1::scheduler::Event::Update>& update, updates) {
+    AWAIT_READY(update);
+
+    const v1::TaskStatus& taskStatus = update->status();
+
+    Option<Stage> taskStage = taskStages.get(taskStatus.task_id());
+    ASSERT_SOME(taskStage);
+
+    switch (taskStage.get()) {
+      case Stage::INITIAL: {
+        ASSERT_EQ(TASK_RUNNING, taskStatus.state());
+
+        taskStages[taskStatus.task_id()] = Stage::RUNNING;
+
+        break;
+      }
+      case Stage::RUNNING: {
+        ASSERT_EQ(TASK_FINISHED, taskStatus.state());
+
+        taskStages[taskStatus.task_id()] = Stage::FINISHED;
+
+        break;
+      }
+      case Stage::FINISHED: {
+        FAIL() << "Unexpected task update: " << update->DebugString();
+        break;
+      }
+    }
+  }
+
+  string volumePath = slave::paths::getPersistentVolumePath(
+      flags.work_dir, devolve(executorVolume));
+
+  string filePath = path::join(volumePath, "file");
+
+  // Ensure that the task was able to write to the persistent volume.
+  EXPECT_SOME_EQ("abc\n", os::read(filePath));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {