You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/02/09 00:36:27 UTC

[5/5] mesos git commit: Inject resource allocation info in agent for received tasks.

Inject resource allocation info in agent for received tasks.

Per MESOS-7077, the agent needs to inject allocation info into
tasks and executors coming from an old master to maintain the
invariant that all tasks and executors on the agent have an
allocation info.

Review: https://reviews.apache.org/r/56422


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8362e2f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8362e2f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8362e2f7

Branch: refs/heads/master
Commit: 8362e2f7b22bc929fa4ebace304ec533368dc71b
Parents: 55ca107
Author: Benjamin Mahler <bm...@apache.org>
Authored: Tue Feb 7 18:28:57 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Feb 8 16:36:02 2017 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp       | 70 +++++++++++++++++++++++++++++++++++++++---
 src/slave/slave.hpp       |  2 +-
 src/tests/slave_tests.cpp | 32 +++++++++++--------
 3 files changed, 86 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8362e2f7/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 92564ff..d9c37b0 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1561,7 +1561,7 @@ void Slave::runTask(
 
 void Slave::run(
     const FrameworkInfo& frameworkInfo,
-    const ExecutorInfo& executorInfo,
+    ExecutorInfo executorInfo,
     Option<TaskInfo> task,
     Option<TaskGroupInfo> taskGroup,
     const UPID& pid)
@@ -1569,6 +1569,48 @@ void Slave::run(
   CHECK_NE(task.isSome(), taskGroup.isSome())
     << "Either task or task group should be set but not both";
 
+  auto injectAllocationInfo = [](
+      RepeatedPtrField<Resource>* resources,
+      const FrameworkInfo& frameworkInfo) {
+    set<string> roles = protobuf::framework::getRoles(frameworkInfo);
+
+    foreach (Resource& resource, *resources) {
+      if (!resource.has_allocation_info()) {
+        if (roles.size() != 1) {
+          LOG(FATAL) << "Missing 'Resource.AllocationInfo' for resources"
+                     << " allocated to MULTI_ROLE framework"
+                     << " '" << frameworkInfo.name() << "'";
+        }
+
+        resource.mutable_allocation_info()->set_role(*roles.begin());
+      }
+    }
+  };
+
+  injectAllocationInfo(executorInfo.mutable_resources(), frameworkInfo);
+
+  if (task.isSome()) {
+    injectAllocationInfo(task->mutable_resources(), frameworkInfo);
+
+    if (task->has_executor()) {
+      injectAllocationInfo(
+          task->mutable_executor()->mutable_resources(),
+          frameworkInfo);
+    }
+  }
+
+  if (taskGroup.isSome()) {
+    foreach (TaskInfo& task, *taskGroup->mutable_tasks()) {
+      injectAllocationInfo(task.mutable_resources(), frameworkInfo);
+
+      if (task.has_executor()) {
+        injectAllocationInfo(
+            task.mutable_executor()->mutable_resources(),
+            frameworkInfo);
+      }
+    }
+  }
+
   vector<TaskInfo> tasks;
   if (task.isSome()) {
     tasks.push_back(task.get());
@@ -4548,15 +4590,33 @@ ExecutorInfo Slave::getExecutorInfo(
 
   // Add an allowance for the command executor. This does lead to a
   // small overcommit of resources.
+  //
   // TODO(vinod): If a task is using revocable resources, mark the
   // corresponding executor resource (e.g., cpus) to be also
   // revocable. Currently, it is OK because the containerizer is
   // given task + executor resources on task launch resulting in
   // the container being correctly marked as revocable.
-  executor.mutable_resources()->MergeFrom(
-      Resources::parse(
-        "cpus:" + stringify(DEFAULT_EXECUTOR_CPUS) + ";" +
-        "mem:" + stringify(DEFAULT_EXECUTOR_MEM.megabytes())).get());
+  Resources executorOverhead = Resources::parse(
+      "cpus:" + stringify(DEFAULT_EXECUTOR_CPUS) + ";" +
+      "mem:" + stringify(DEFAULT_EXECUTOR_MEM.megabytes())).get();
+
+  // Inject the task's allocation role into the executor's resources.
+  Option<string> role = None();
+  foreach (const Resource& resource, task.resources()) {
+    CHECK(resource.has_allocation_info());
+
+    if (role.isNone()) {
+      role = resource.allocation_info().role();
+    }
+
+    CHECK_EQ(role.get(), resource.allocation_info().role());
+  }
+
+  CHECK_SOME(role);
+
+  executorOverhead.allocate(role.get());
+
+  executor.mutable_resources()->CopyFrom(executorOverhead);
 
   return executor;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/8362e2f7/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 75dde73..3b0aea4 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -143,7 +143,7 @@ public:
 
   void run(
       const FrameworkInfo& frameworkInfo,
-      const ExecutorInfo& executorInfo,
+      ExecutorInfo executorInfo,
       Option<TaskInfo> task,
       Option<TaskGroupInfo> taskGroup,
       const process::UPID& pid);

http://git-wip-us.apache.org/repos/asf/mesos/blob/8362e2f7/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 7f357a6..b6f7682 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -776,13 +776,15 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, GetExecutorInfo)
   frameworkInfo.mutable_id()->CopyFrom(frameworkId);
 
   // Launch a task with the command executor.
+  Resources taskResources = Resources::parse("cpus:0.1;mem:32").get();
+  taskResources.allocate(frameworkInfo.role());
+
   TaskInfo task;
   task.set_name("task");
   task.mutable_task_id()->set_value("1");
   task.mutable_slave_id()->set_value(
       "20141010-221431-251662764-60288-32120-0001");
-  task.mutable_resources()->MergeFrom(
-      Resources::parse("cpus:0.1;mem:32").get());
+  task.mutable_resources()->MergeFrom(taskResources);
 
   CommandInfo command;
   command.set_shell(false);
@@ -828,15 +830,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, GetExecutorInfoForTaskWithContainer)
 
   MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
 
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.mutable_id()->set_value(
+      "20141010-221431-251662764-60288-12345-0000");
+
   // Launch a task with the command executor and ContainerInfo with
   // NetworkInfo.
+  Resources taskResources = Resources::parse("cpus:0.1;mem:32").get();
+  taskResources.allocate(frameworkInfo.role());
+
   TaskInfo task;
   task.set_name("task");
   task.mutable_task_id()->set_value("1");
   task.mutable_slave_id()->set_value(
       "20141010-221431-251662764-60288-12345-0001");
-  task.mutable_resources()->MergeFrom(
-      Resources::parse("cpus:0.1;mem:32").get());
+  task.mutable_resources()->MergeFrom(taskResources);
 
   CommandInfo command;
   command.set_shell(false);
@@ -853,9 +861,6 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, GetExecutorInfoForTaskWithContainer)
   network->add_ip_addresses()->set_ip_address("4.3.2.1");
   network->add_groups("public");
 
-  FrameworkInfo frameworkInfo;
-  frameworkInfo.mutable_id()->set_value(
-      "20141010-221431-251662764-60288-12345-0000");
   const ExecutorInfo& executor = slave.getExecutorInfo(frameworkInfo, task);
 
   // Now assert that the executor has both the command and ContainerInfo
@@ -902,6 +907,13 @@ TEST_F(SlaveTest, ROOT_LaunchTaskInfoWithContainerInfo)
   StandaloneMasterDetector detector;
   MockSlave slave(flags, &detector, containerizer.get());
 
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.mutable_id()->set_value(
+      "20141010-221431-251662764-60288-12345-0000");
+
+  Resources taskResources = Resources::parse("cpus:0.1;mem:32").get();
+  taskResources.allocate(frameworkInfo.role());
+
   // Launch a task with the command executor and ContainerInfo with
   // NetworkInfo.
   TaskInfo task;
@@ -909,8 +921,7 @@ TEST_F(SlaveTest, ROOT_LaunchTaskInfoWithContainerInfo)
   task.mutable_task_id()->set_value("1");
   task.mutable_slave_id()->set_value(
       "20141010-221431-251662764-60288-12345-0001");
-  task.mutable_resources()->MergeFrom(
-      Resources::parse("cpus:0.1;mem:32").get());
+  task.mutable_resources()->MergeFrom(taskResources);
 
   CommandInfo command;
   command.set_shell(false);
@@ -930,9 +941,6 @@ TEST_F(SlaveTest, ROOT_LaunchTaskInfoWithContainerInfo)
   network->add_ip_addresses()->set_ip_address("4.3.2.1");
   network->add_groups("public");
 
-  FrameworkInfo frameworkInfo;
-  frameworkInfo.mutable_id()->set_value(
-      "20141010-221431-251662764-60288-12345-0000");
   const ExecutorInfo& executor = slave.getExecutorInfo(frameworkInfo, task);
 
   Try<string> sandbox = environment->mkdtemp();