You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2016/08/24 21:33:52 UTC

[1/2] mesos git commit: Added a test to ensure the master handles launching task groups.

Repository: mesos
Updated Branches:
  refs/heads/master 3674c58ad -> 09429d96c


Added a test to ensure the master handles launching task groups.

For now this test ensures the message is sent to the agent
in the successful launch path. More tests will be added to
test the all-or-nothing paths (killed, invalid, unauthorized).

Review: https://reviews.apache.org/r/51321


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/09429d96
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/09429d96
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/09429d96

Branch: refs/heads/master
Commit: 09429d96c50cc5e9adc96f94d3138a9b245520d8
Parents: bf3957f
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Aug 22 20:45:33 2016 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 24 14:33:39 2016 -0700

----------------------------------------------------------------------
 src/tests/scheduler_tests.cpp | 123 +++++++++++++++++++++++++++++++++++++
 1 file changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/09429d96/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index ccd0f2f..931c185 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -432,6 +432,129 @@ TEST_P(SchedulerTest, TaskRunning)
 }
 
 
+// Ensures that a task group can be successfully launched
+// on the `DEFAULT` executor.
+//
+// TODO(bmahler): We currently only test the master-side
+// of task group handling, since the rest is unimplemented.
+TEST_P(SchedulerTest, TaskGroupRunning)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  ContentType contentType = GetParam();
+
+  scheduler::TestV1Mesos mesos(master.get()->pid, contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  Future<RunTaskGroupMessage> runTaskGroupMessage =
+    FUTURE_PROTOBUF(RunTaskGroupMessage(), master.get()->pid, slave.get()->pid);
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executor;
+  executor.set_type(v1::ExecutorInfo::DEFAULT);
+  executor.mutable_executor_id()->set_value("E");
+  executor.mutable_framework_id()->CopyFrom(subscribed->framework_id());
+  executor.mutable_resources()->CopyFrom(resources);
+
+  v1::TaskInfo task1;
+  task1.set_name("1");
+  task1.mutable_task_id()->set_value("1");
+  task1.mutable_agent_id()->CopyFrom(
+      offers->offers(0).agent_id());
+  task1.mutable_resources()->CopyFrom(resources);
+
+  v1::TaskInfo task2;
+  task2.set_name("2");
+  task2.mutable_task_id()->set_value("2");
+  task2.mutable_agent_id()->CopyFrom(
+      offers->offers(0).agent_id());
+  task2.mutable_resources()->CopyFrom(resources);
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(task1);
+  taskGroup.add_tasks()->CopyFrom(task2);
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offers->offers(0).id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+
+    v1::Offer::Operation::LaunchGroup* launchGroup =
+      operation->mutable_launch_group();
+
+    launchGroup->mutable_executor()->CopyFrom(executor);
+    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+
+    mesos.send(call);
+  }
+
+  // TODO(bmahler): For now we only ensure that the message is
+  // sent to the agent, since the agent-side of task groups is
+  // not yet implemented.
+
+  AWAIT_READY(runTaskGroupMessage);
+
+  EXPECT_EQ(devolve(frameworkId), runTaskGroupMessage->framework().id());
+
+  EXPECT_EQ(devolve(executor.executor_id()),
+            runTaskGroupMessage->executor().executor_id());
+
+  ASSERT_EQ(2, runTaskGroupMessage->task_group().tasks().size());
+  EXPECT_EQ(devolve(task1.task_id()),
+            runTaskGroupMessage->task_group().tasks(0).task_id());
+  EXPECT_EQ(devolve(task2.task_id()),
+            runTaskGroupMessage->task_group().tasks(1).task_id());
+}
+
+
 TEST_P(SchedulerTest, ReconcileTask)
 {
   Try<Owned<cluster::Master>> master = StartMaster();


[2/2] mesos git commit: Implemented the LaunchGroup Offer::Operation in the master.

Posted by bm...@apache.org.
Implemented the LaunchGroup Offer::Operation in the master.

This operation is all-or-nothing, in that all tasks must be
launched together. If the operation fails, all tasks will
fail with TASK_ERROR and the appropriate GROUP reason.
If a task was killed before delivery to the executor, all
tasks are killed.

Review: https://reviews.apache.org/r/51320


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf3957f4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf3957f4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf3957f4

Branch: refs/heads/master
Commit: bf3957f4b534a04a0f76456f76c4e28bdee4e76d
Parents: 3674c58
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Aug 22 20:38:50 2016 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 24 14:33:39 2016 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto    |   2 +
 include/mesos/v1/mesos.proto |   2 +
 src/master/master.cpp        | 226 +++++++++++++++++++++++++++++++++++---
 src/messages/messages.proto  |  15 +++
 4 files changed, 230 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 53b6547..a93db55 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1536,6 +1536,8 @@ message TaskStatus {
     REASON_SLAVE_REMOVED = 11;
     REASON_SLAVE_RESTARTED = 12;
     REASON_SLAVE_UNKNOWN = 13;
+    REASON_TASK_GROUP_INVALID = 25;
+    REASON_TASK_GROUP_UNAUTHORIZED = 26;
     REASON_TASK_INVALID = 14;
     REASON_TASK_UNAUTHORIZED = 15;
     REASON_TASK_UNKNOWN = 16;

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index f6b59e1..4a7e998 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1535,6 +1535,8 @@ message TaskStatus {
     REASON_AGENT_REMOVED = 11;
     REASON_AGENT_RESTARTED = 12;
     REASON_AGENT_UNKNOWN = 13;
+    REASON_TASK_GROUP_INVALID = 25;
+    REASON_TASK_GROUP_UNAUTHORIZED = 26;
     REASON_TASK_INVALID = 14;
     REASON_TASK_UNAUTHORIZED = 15;
     REASON_TASK_UNKNOWN = 16;

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 910293a..c300e1f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -22,6 +22,7 @@
 #include <iomanip>
 #include <list>
 #include <memory>
+#include <set>
 #include <sstream>
 
 #include <mesos/module.hpp>
@@ -89,7 +90,10 @@
 
 #include "watcher/whitelist_watcher.hpp"
 
+using google::protobuf::RepeatedPtrField;
+
 using std::list;
+using std::set;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -3310,11 +3314,21 @@ void Master::accept(
                  << "': " << error.get().message;
 
     foreach (const Offer::Operation& operation, accept.operations()) {
-      if (operation.type() != Offer::Operation::LAUNCH) {
+      if (operation.type() != Offer::Operation::LAUNCH &&
+          operation.type() != Offer::Operation::LAUNCH_GROUP) {
         continue;
       }
 
-      foreach (const TaskInfo& task, operation.launch().task_infos()) {
+      const RepeatedPtrField<TaskInfo>& tasks = [&]() {
+        if (operation.type() == Offer::Operation::LAUNCH) {
+          return operation.launch().task_infos();
+        } else if (operation.type() == Offer::Operation::LAUNCH_GROUP) {
+          return operation.launch_group().task_group().tasks();
+        }
+        UNREACHABLE();
+      }();
+
+      foreach (const TaskInfo& task, tasks) {
         const StatusUpdate& update = protobuf::createStatusUpdate(
             framework->id(),
             task.slave_id(),
@@ -3349,10 +3363,20 @@ void Master::accept(
   list<Future<bool>> futures;
   foreach (const Offer::Operation& operation, accept.operations()) {
     switch (operation.type()) {
-      case Offer::Operation::LAUNCH: {
+      case Offer::Operation::LAUNCH:
+      case Offer::Operation::LAUNCH_GROUP: {
+        const RepeatedPtrField<TaskInfo>& tasks = [&]() {
+          if (operation.type() == Offer::Operation::LAUNCH) {
+            return operation.launch().task_infos();
+          } else if (operation.type() == Offer::Operation::LAUNCH_GROUP) {
+            return operation.launch_group().task_group().tasks();
+          }
+          UNREACHABLE();
+        }();
+
         // Authorize the tasks. A task is in 'framework->pendingTasks'
         // before it is authorized.
-        foreach (const TaskInfo& task, operation.launch().task_infos()) {
+        foreach (const TaskInfo& task, tasks) {
           futures.push_back(authorizeTask(task, framework));
 
           // Add to pending tasks.
@@ -3371,12 +3395,6 @@ void Master::accept(
         break;
       }
 
-      case Offer::Operation::LAUNCH_GROUP : {
-        // TODO(vinod): Implement this.
-        LOG(WARNING) << "Ignoring unimplemented LAUNCH_GROUP operation";
-        break;
-      }
-
       // NOTE: When handling RESERVE and UNRESERVE operations, authorization
       // will proceed even if no principal is specified, although currently
       // resources cannot be reserved or unreserved unless a principal is
@@ -3485,11 +3503,21 @@ void Master::_accept(
 
   if (slave == nullptr || !slave->connected) {
     foreach (const Offer::Operation& operation, accept.operations()) {
-      if (operation.type() != Offer::Operation::LAUNCH) {
+      if (operation.type() != Offer::Operation::LAUNCH &&
+          operation.type() != Offer::Operation::LAUNCH_GROUP) {
         continue;
       }
 
-      foreach (const TaskInfo& task, operation.launch().task_infos()) {
+      const RepeatedPtrField<TaskInfo>& tasks = [&]() {
+        if (operation.type() == Offer::Operation::LAUNCH) {
+          return operation.launch().task_infos();
+        } else {
+          CHECK_EQ(Offer::Operation::LAUNCH_GROUP, operation.type());
+          return operation.launch_group().task_group().tasks();
+        }
+      }();
+
+      foreach (const TaskInfo& task, tasks) {
         const TaskStatus::Reason reason =
             slave == nullptr ? TaskStatus::REASON_SLAVE_REMOVED
                           : TaskStatus::REASON_SLAVE_DISCONNECTED;
@@ -3885,9 +3913,177 @@ void Master::_accept(
         break;
       }
 
-      case Offer::Operation::LAUNCH_GROUP : {
-        // TODO(vinod): Implement this.
-        LOG(WARNING) << "Ignoring unimplemented LAUNCH_GROUP operation";
+      case Offer::Operation::LAUNCH_GROUP: {
+        // We must ensure that the entire group can be launched. This
+        // means all tasks in the group must be authorized and valid.
+        // If any tasks in the group have been killed in the interim
+        // we must kill the entire group.
+        const ExecutorInfo& executor = operation.launch_group().executor();
+        const TaskGroupInfo& taskGroup = operation.launch_group().task_group();
+
+        // Note that we do not fill in the `ExecutorInfo.framework_id`
+        // since we do not have to support backwards compatiblity like
+        // in the `Launch` operation case.
+
+        // TODO(bmahler): Consider injecting some default (cpus, mem, disk)
+        // resources when the framework omits the executor resources.
+
+        // See if there are any validation or authorization errors.
+        // Note that we'll only report the first error we encounter
+        // for the group.
+        Option<Error> error =
+          validation::task::group::validate(
+              taskGroup, executor, framework, slave, _offeredResources);
+
+        Option<TaskStatus::Reason> reason = None();
+
+        if (error.isSome()) {
+          reason = TaskStatus::REASON_TASK_GROUP_INVALID;
+        } else {
+          foreach (const TaskInfo& task, taskGroup.tasks()) {
+            Future<bool> authorization = authorizations.front();
+            authorizations.pop_front();
+
+            CHECK(!authorization.isDiscarded());
+
+            if (authorization.isFailed()) {
+              error = Error("Failed to authorize task"
+                            " '" + stringify(task.task_id()) + "'"
+                            ": " + authorization.failure());
+
+              reason = TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+
+              break;
+            }
+
+            if (!authorization.get()) {
+              string user = framework->info.user(); // Default user.
+              if (task.has_command() && task.command().has_user()) {
+                user = task.command().user();
+              }
+
+              error = Error("Task '" + stringify(task.task_id()) + "'"
+                            " is not authorized to launch as"
+                            " user '" + user + "'");
+
+              reason = TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+
+              break;
+            }
+          }
+        }
+
+        if (error.isSome()) {
+          CHECK_SOME(reason);
+
+          // NOTE: If some of these invalid or unauthorized tasks were
+          // killed already, here we end up sending a TASK_ERROR after
+          // having already sent TASK_KILLED.
+          foreach (const TaskInfo& task, taskGroup.tasks()) {
+            const StatusUpdate& update = protobuf::createStatusUpdate(
+                framework->id(),
+                task.slave_id(),
+                task.task_id(),
+                TASK_ERROR,
+                TaskStatus::SOURCE_MASTER,
+                None(),
+                error->message,
+                reason.get());
+
+            metrics->tasks_error++;
+
+            metrics->incrementTasksStates(
+                TASK_ERROR, TaskStatus::SOURCE_MASTER, reason.get());
+
+            forward(update, UPID(), framework);
+          }
+
+          break;
+        }
+
+        // Remove all the tasks from being pending. If any of the tasks
+        // have been killed in the interim, we will send TASK_KILLED
+        // for all other tasks in the group, since a TaskGroup must
+        // be delivered in its entirety.
+        hashset<TaskID> killed;
+        foreach (const TaskInfo& task, taskGroup.tasks()) {
+          bool pending = framework->pendingTasks.contains(task.task_id());
+          framework->pendingTasks.erase(task.task_id());
+
+          if (!pending) {
+            killed.insert(task.task_id());
+          }
+        }
+
+        // If task(s) were killed, send TASK_KILLED for
+        // all of the remaining tasks.
+        //
+        // TODO(bmahler): Do this killing when processing
+        // the `Kill` call, rather than doing it here.
+        if (!killed.empty()) {
+          foreach (const TaskInfo& task, taskGroup.tasks()) {
+            if (!killed.contains(task.task_id())) {
+              const StatusUpdate& update = protobuf::createStatusUpdate(
+                  framework->id(),
+                  task.slave_id(),
+                  task.task_id(),
+                  TASK_KILLED,
+                  TaskStatus::SOURCE_MASTER,
+                  None(),
+                  "A task within the task group was killed before"
+                  " delivery to the agent");
+
+              metrics->tasks_killed++;
+
+              // TODO(bmahler): Increment the task state source metric,
+              // we currently cannot because it requires each source
+              // requires a reason.
+
+              forward(update, UPID(), framework);
+            }
+          }
+
+          break;
+        }
+
+        // Now launch the task group!
+        RunTaskGroupMessage message;
+        message.mutable_framework()->CopyFrom(framework->info);
+        message.mutable_executor()->CopyFrom(executor);
+        message.mutable_task_group()->CopyFrom(taskGroup);
+
+        set<TaskID> taskIds;
+        Resources totalResources;
+
+        for (int i = 0; i < message.task_group().tasks().size(); ++i) {
+          TaskInfo* task = message.mutable_task_group()->mutable_tasks(i);
+
+          taskIds.insert(task->task_id());
+          totalResources += task->resources();
+
+          const Resources consumed = addTask(*task, framework, slave);
+
+          CHECK(_offeredResources.contains(consumed))
+            << _offeredResources << " does not contain " << consumed;
+
+          _offeredResources -= consumed;
+
+          if (HookManager::hooksAvailable()) {
+            // Set labels retrieved from label-decorator hooks.
+            task->mutable_labels()->CopyFrom(
+                HookManager::masterLaunchTaskLabelDecorator(
+                    *task,
+                    framework->info,
+                    slave->info));
+          }
+        }
+
+        LOG(INFO) << "Launching task group " << stringify(taskIds)
+                  << " of framework " << *framework
+                  << " with resources " << totalResources
+                  << " on agent " << *slave;
+
+        send(slave->pid, message);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf3957f4/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 7b5e24f..17bb352 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -301,6 +301,21 @@ message RunTaskMessage {
 
 
 /**
+ * This message either notifies an existing executor to run a task
+ * group, or starts a new executor and runs the task group. This
+ * message is sent when scheduler::Call::Accept is sent with
+ * Offer::Operation::LaunchGroup.
+ *
+ * See executor::Event::LaunchGroup.
+ */
+message RunTaskGroupMessage {
+  required FrameworkInfo framework = 1;
+  required ExecutorInfo executor = 2;
+  required TaskGroupInfo task_group = 3;
+}
+
+
+/**
  * Kills a specific task.
  *
  * See scheduler::Call::Kill and executor::Event::Kill.