You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/10/14 19:00:06 UTC

[2/5] mesos git commit: Filled missing executor info in tasks when `LAUNCH_GROUP`.

Filled missing executor info in tasks when `LAUNCH_GROUP`.

This fixed the navigate error in Web UI because Web UI uses the
executor id of the task to search the corresponding sandbox directory.
Web UI uses the task id as the executor id if the executor id of the
task is empty when searching the sandbox directory. It works fine when
tasks are launched by `CommandExecutor` because the executor id of the
task is equal to the task id in this case. However, when tasks are
launched by `DefaultExecutor`, the executor id of the task is defined
in the framework side and may different with the task id. So we need to
fill the `ExecutorInfo` of the `TaskInfo` when `LAUNCH_GROUP` to avoid
the Web UI uses incorrect executor id to search sandbox directory.

Review: https://reviews.apache.org/r/52470/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d2da824c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d2da824c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d2da824c

Branch: refs/heads/master
Commit: d2da824cfbfc4242ea4962d763da9726faf7aaca
Parents: ebe7ea5
Author: haosdent huang <ha...@gmail.com>
Authored: Fri Oct 14 11:59:32 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Oct 14 11:59:32 2016 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto             |   5 +-
 include/mesos/v1/mesos.proto          |   5 +-
 src/master/master.cpp                 |  25 +++++--
 src/master/master.hpp                 |   2 +-
 src/master/validation.cpp             |  15 +++-
 src/tests/default_executor_tests.cpp  | 112 +++++++++++++++++++++++++++++
 src/tests/master_validation_tests.cpp |  20 ++++--
 7 files changed, 166 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 05988d4..d071431 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1436,8 +1436,9 @@ message TaskInfo {
  * allow the group to be launched "atomically".
  *
  * NOTES:
- * 1) `TaskInfo.executor` must not be set.
- * 2) `NetworkInfo` must not be set inside task's `ContainerInfo`.
+ * 1) `NetworkInfo` must not be set inside task's `ContainerInfo`.
+ * 2) `TaskInfo.executor` doesn't need to set. If set, it should match
+ *    `LaunchGroup.executor`.
  */
 message TaskGroupInfo {
   repeated TaskInfo tasks = 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 08a536c..74761a0 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1435,8 +1435,9 @@ message TaskInfo {
  * allow the group to be launched "atomically".
  *
  * NOTES:
- * 1) `TaskInfo.executor` must not be set.
- * 2) `NetworkInfo` must not be set inside task's `ContainerInfo`.
+ * 1) `NetworkInfo` must not be set inside task's `ContainerInfo`.
+ * 2) `TaskInfo.executor` doesn't need to set. If set, it should match
+ *    `LaunchGroup.executor`.
  */
 message TaskGroupInfo {
   repeated TaskInfo tasks = 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7ef8987..3c6b18e 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3405,13 +3405,15 @@ Resources Master::addTask(
 
 void Master::accept(
     Framework* framework,
-    const scheduler::Call::Accept& accept)
+    scheduler::Call::Accept accept)
 {
   CHECK_NOTNULL(framework);
 
-  foreach (const Offer::Operation& operation, accept.operations()) {
-    if (operation.type() == Offer::Operation::LAUNCH) {
-      if (operation.launch().task_infos().size() > 0) {
+  for (int i = 0; i < accept.operations_size(); ++i) {
+    Offer::Operation* operation = accept.mutable_operations(i);
+
+    if (operation->type() == Offer::Operation::LAUNCH) {
+      if (operation->launch().task_infos().size() > 0) {
         ++metrics->messages_launch_tasks;
       } else {
         ++metrics->messages_decline_offers;
@@ -3419,6 +3421,21 @@ void Master::accept(
                      << " in ACCEPT call for framework " << framework->id()
                      << " as the launch operation specified no tasks";
       }
+    } else if (operation->type() == Offer::Operation::LAUNCH_GROUP) {
+      const ExecutorInfo& executor = operation->launch_group().executor();
+
+      TaskGroupInfo* taskGroup =
+        operation->mutable_launch_group()->mutable_task_group();
+
+      // Mutate `TaskInfo` to include `ExecutorInfo` to make it easy
+      // for operator API and WebUI to get access to the corresponding
+      // executor for tasks in the task group.
+      for (int j = 0; j < taskGroup->tasks().size(); ++j) {
+        TaskInfo* task = taskGroup->mutable_tasks(j);
+        if (!task->has_executor()) {
+          task->mutable_executor()->CopyFrom(executor);
+        }
+      }
     }
 
     // TODO(jieyu): Add metrics for non launch operations.

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 43518b9..881f0d6 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -911,7 +911,7 @@ private:
 
   void accept(
       Framework* framework,
-      const scheduler::Call::Accept& accept);
+      scheduler::Call::Accept accept);
 
   void _accept(
       const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 480a94b..f690a9e 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -1011,8 +1011,8 @@ Option<Error> validateTask(
 
   // Now do `TaskGroup` specific validation.
 
-  if (task.has_executor()) {
-    return Error("'TaskInfo.executor' must not be set");
+  if (!task.has_executor()) {
+    return Error("'TaskInfo.executor' must be set");
   }
 
   if (task.has_container()) {
@@ -1087,6 +1087,17 @@ Option<Error> validateExecutor(
     return Error("Docker ContainerInfo is not supported on the executor");
   }
 
+  // Validate the `ExecutorInfo` in all tasks are same.
+
+  foreach (const TaskInfo& task, taskGroup.tasks()) {
+    if (task.has_executor() && task.executor() != executor) {
+      return Error(
+          "The `ExecutorInfo` of "
+          "task '" + stringify(task.task_id()) + "' is different from "
+          "executor '" + stringify(executor.executor_id()) + "'");
+    }
+  }
+
   const Resources& executorResources = executor.resources();
 
   // Validate minimal cpus and memory resources of executor.

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index 9e0fd67..dc002c6 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -541,6 +541,118 @@ TEST_F(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
   ASSERT_EQ(expectedTaskStates, taskStates);
 }
 
+
+// Verifies that a task in a task group with an executor is accepted
+// during `TaskGroupInfo` validation.
+TEST_P(DefaultExecutorTest, ROOT_TaskUsesExecutor)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+  Resources resources =
+    Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+  ExecutorInfo executorInfo;
+  executorInfo.set_type(ExecutorInfo::DEFAULT);
+
+  executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+  flags.containerizers = GetParam();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::TestV1Mesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo));
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId));
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  Future<v1::scheduler::Event::Update> update;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  const v1::Offer& offer = offers->offers(0);
+  const SlaveID slaveId = devolve(offer.agent_id());
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(slaveId, resources, "sleep 1000"));
+
+  taskInfo.mutable_executor()->CopyFrom(evolve(executorInfo));
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+
+    v1::Offer::Operation::LaunchGroup* launchGroup =
+      operation->mutable_launch_group();
+
+    launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo));
+    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(update);
+
+  ASSERT_EQ(TASK_RUNNING, update->status().state());
+  EXPECT_EQ(taskInfo.task_id(), update->status().task_id());
+  EXPECT_TRUE(update->status().has_timestamp());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index 0f8d33b..da43f99 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -2105,9 +2105,9 @@ TEST_F(TaskGroupValidationTest, TaskUsesNetworkInfo)
 }
 
 
-// Ensures that a task in a task group with an executor
+// Ensures that a task in a task group with a different executor
 // is rejected during `TaskGroupInfo` validation.
-TEST_F(TaskGroupValidationTest, TaskUsesExecutor)
+TEST_F(TaskGroupValidationTest, TaskUsesDifferentExecutor)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
@@ -2116,9 +2116,12 @@ TEST_F(TaskGroupValidationTest, TaskUsesExecutor)
   Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
   ASSERT_SOME(slave);
 
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.mutable_id()->set_value("Test_Framework");
+
   MockScheduler sched;
   MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
@@ -2137,23 +2140,25 @@ TEST_F(TaskGroupValidationTest, TaskUsesExecutor)
   Resources resources = Resources::parse("cpus:1;mem:512;disk:32").get();
 
   ExecutorInfo executor(DEFAULT_EXECUTOR_INFO);
+  executor.mutable_framework_id()->CopyFrom(frameworkInfo.id());
   executor.set_type(ExecutorInfo::CUSTOM);
   executor.mutable_resources()->CopyFrom(resources);
 
-  // Create an invalid task that has executor.
+  // Create an invalid task that has a different executor.
   TaskInfo task1;
   task1.set_name("1");
   task1.mutable_task_id()->set_value("1");
   task1.mutable_slave_id()->MergeFrom(offer.slave_id());
   task1.mutable_resources()->MergeFrom(resources);
   task1.mutable_executor()->MergeFrom(executor);
+  task1.mutable_executor()->set_type(ExecutorInfo::DEFAULT);
 
   // Create a valid task.
   TaskInfo task2;
   task2.set_name("2");
   task2.mutable_task_id()->set_value("2");
   task2.mutable_slave_id()->MergeFrom(offer.slave_id());
-  task1.mutable_resources()->MergeFrom(resources);
+  task2.mutable_resources()->MergeFrom(resources);
 
   TaskGroupInfo taskGroup;
   taskGroup.add_tasks()->CopyFrom(task1);
@@ -2180,8 +2185,9 @@ TEST_F(TaskGroupValidationTest, TaskUsesExecutor)
   EXPECT_EQ(task1.task_id(), task1Status->task_id());
   EXPECT_EQ(TASK_ERROR, task1Status->state());
   EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task1Status->reason());
-  EXPECT_EQ("Task '1' is invalid: 'TaskInfo.executor' must not be set",
-            task1Status->message());
+  EXPECT_EQ(
+      "The `ExecutorInfo` of task '1' is different from executor 'default'",
+      task1Status->message());
 
   AWAIT_READY(task2Status);
   EXPECT_EQ(task2.task_id(), task2Status->task_id());