You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/02/17 20:25:38 UTC

mesos git commit: Fixed a bug around executor not able to use reserved resources.

Repository: mesos
Updated Branches:
  refs/heads/master 788c7dcec -> 267d719c7


Fixed a bug around executor not able to use reserved resources.

We were not unallocating the resources before checking if the
executor resources were contained in the checkpointed resources
on the agent.

Review: https://reviews.apache.org/r/56778/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/267d719c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/267d719c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/267d719c

Branch: refs/heads/master
Commit: 267d719c7a8308e1a1b98c73f5091dbb7708c444
Parents: 788c7dc
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Feb 17 12:15:54 2017 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Feb 17 12:25:30 2017 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp                  |  17 +++--
 src/tests/default_executor_tests.cpp | 116 ++++++++++++++++++++++++++++++
 2 files changed, 126 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/267d719c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index ebba8e1..7564e8d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1904,6 +1904,12 @@ void Slave::_run(
     return;
   }
 
+  auto unallocated = [](const Resources& resources) {
+    Resources result = resources;
+    result.unallocate();
+    return result;
+  };
+
   // NOTE: If the task/task group or executor uses resources that are
   // checkpointed on the slave (e.g. persistent volumes), we should
   // already know about it. If the slave doesn't know about them (e.g.
@@ -1913,12 +1919,6 @@ void Slave::_run(
   // out of order.
   bool kill = false;
   foreach (const TaskInfo& _task, tasks) {
-    auto unallocated = [](const Resources& resources) {
-      Resources result = resources;
-      result.unallocate();
-      return result;
-    };
-
     // We must unallocate the resources to check whether they are
     // contained in the unallocated total checkpointed resources.
     Resources checkpointedTaskResources =
@@ -1971,8 +1971,11 @@ void Slave::_run(
   }
 
   CHECK_EQ(kill, false);
+
+  // Refer to the comment above when looping across tasks on
+  // why we need to unallocate resources.
   Resources checkpointedExecutorResources =
-    Resources(executorInfo.resources()).filter(needCheckpointing);
+    unallocated(executorInfo.resources()).filter(needCheckpointing);
 
   foreach (const Resource& resource, checkpointedExecutorResources) {
     if (!checkpointedResources.contains(resource)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/267d719c/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index ffb69e9..eaf6394 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -1305,6 +1305,122 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
   ASSERT_EQ(0, executorFailure->status());
 }
 
+
+// This test verifies that the default executor can be
+// launched using reserved resources.
+TEST_P(DefaultExecutorTest, ReservedResources)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role");
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Resources unreserved =
+    Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  Resources reserved = unreserved.flatten(
+      frameworkInfo.role(),
+      createReservationInfo(frameworkInfo.principal())).get();
+
+  ExecutorInfo executorInfo;
+  executorInfo.set_type(ExecutorInfo::DEFAULT);
+
+  executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(reserved);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo));
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId));
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  Future<v1::scheduler::Event::Update> runningUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&runningUpdate));
+
+  const v1::Offer& offer = offers->offers(0);
+  const SlaveID slaveId = devolve(offer.agent_id());
+
+  // Launch the task using unreserved resources.
+  v1::TaskInfo taskInfo =
+    evolve(createTask(slaveId, unreserved, SLEEP_COMMAND(1000)));
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    accept->add_operations()->CopyFrom(v1::RESERVE(evolve(reserved)));
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+
+    v1::Offer::Operation::LaunchGroup* launchGroup =
+      operation->mutable_launch_group();
+
+    launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo));
+    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(runningUpdate);
+  ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
+  ASSERT_EQ(taskInfo.task_id(), runningUpdate->status().task_id());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {