You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2018/04/20 16:22:10 UTC

[1/5] mesos git commit: Refactored the executor launch path.

Repository: mesos
Updated Branches:
  refs/heads/master 70ce35bb7 -> 4366d5510


Refactored the executor launch path.

Refactored the executor launch path so to make the conditional
logic clearer. After this there is just one place that adds and
launches a new executor and we will subsequently be able to add
error handling there.

Review: https://reviews.apache.org/r/66704/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b848b09e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b848b09e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b848b09e

Branch: refs/heads/master
Commit: b848b09e1fc769e9524c11ab6135f585649441bc
Parents: 70ce35b
Author: James Peach <jp...@apache.org>
Authored: Fri Apr 20 08:56:36 2018 -0700
Committer: James Peach <jp...@apache.org>
Committed: Fri Apr 20 08:56:36 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 220 +++++++++++++++++++++++------------------------
 1 file changed, 109 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b848b09e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e5d6c3f..6885124 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2891,136 +2891,134 @@ void Slave::__run(
 
   Executor* executor = framework->getExecutor(executorId);
 
-  if (launchExecutor.isNone()) {
-    // This is the legacy case where the master did not set the
-    // `launch_executor` flag. Executor will be launched if there is none.
-    if (executor == nullptr) {
-      executor = doLaunchExecutor();
-    }
-  } else {
+  // If launchExecutor is NONE, this is the legacy case where the master
+  // did not set the `launch_executor` flag. Executor will be launched if
+  // there is none.
+
+  if (launchExecutor.isSome()) {
     if (taskGroup.isNone() && task->has_command()) {
       // We are dealing with command task; a new command executor will be
       // launched.
       CHECK(executor == nullptr);
-      executor = doLaunchExecutor();
     } else {
       // Master set the `launch_executor` flag and this is not a command task.
-      if (launchExecutor.get()) {
-        // Master requests launching a new executor.
-        if (executor == nullptr) {
-          executor = doLaunchExecutor();
-        } else {
-          // Master requests launching executor but an executor still exits
-          // on the agent. In this case we will drop tasks. This could happen if
-          // the executor is already terminated on the agent (and agent has sent
-          // out the `ExitedExecutorMessage` and it was received by the master)
-          // but the agent is still waiting for all the status updates to be
-          // acked before removing the executor struct.
-
-          // We report TASK_DROPPED to the framework because the task was
-          // never launched. For non-partition-aware frameworks, we report
-          // TASK_LOST for backward compatibility.
-          mesos::TaskState taskState = TASK_DROPPED;
-          if (!protobuf::frameworkHasCapability(
-              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-            taskState = TASK_LOST;
-          }
-
-          foreach (const TaskInfo& _task, tasks) {
-            const StatusUpdate update = protobuf::createStatusUpdate(
-                frameworkId,
-                info.id(),
-                _task.task_id(),
-                taskState,
-                TaskStatus::SOURCE_SLAVE,
-                id::UUID::random(),
-                "Master wants to launch executor, but there already exits one",
-                TaskStatus::REASON_EXECUTOR_TERMINATED,
-                executorId);
-
-            statusUpdate(update, UPID());
-          }
+      if (launchExecutor.get() && executor != nullptr) {
+        // Master requests launching executor but an executor still exits
+        // on the agent. In this case we will drop tasks. This could happen if
+        // the executor is already terminated on the agent (and agent has sent
+        // out the `ExitedExecutorMessage` and it was received by the master)
+        // but the agent is still waiting for all the status updates to be
+        // acked before removing the executor struct.
+
+        // We report TASK_DROPPED to the framework because the task was
+        // never launched. For non-partition-aware frameworks, we report
+        // TASK_LOST for backward compatibility.
+        mesos::TaskState taskState = TASK_DROPPED;
+        if (!protobuf::frameworkHasCapability(
+            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+          taskState = TASK_LOST;
+        }
 
-          // Master expects a new executor to be launched for this task(s).
-          // To keep the master executor entries updated, the agent needs to
-          // send `ExitedExecutorMessage` even though no executor launched.
-          if (executor->state == Executor::TERMINATED) {
-            sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-          } else {
-            // This could happen if the following sequence of events happen:
-            //
-            //  (1) Master sends `runTaskMessage` to agent with
-            //      `launch_executor = true`;
-            //
-            //  (2) Before the agent got the `runTaskMessage`, it reconnects and
-            //      reconciles with the master. Master then removes the executor
-            //      entry it asked the agent to launch in step (1);
-            //
-            //  (3) Agent got the `runTaskMessage` sent in step (1), launches
-            //      the task and the executor (that the master does not know
-            //      about).
-            //
-            //  (4) Master now sends another `runTaskMessage` for the same
-            //      executor id with `launch_executor = true`.
-            //
-            // The agent ends up with a lingering executor that the master does
-            // not know about. We will shutdown the executor.
-            //
-            // TODO(mzhu): This could be avoided if the agent can
-            // tell whether the master's message was sent before or after the
-            // reconnection and discard the message in the former case.
-            //
-            // TODO(mzhu): Master needs to do proper executor reconciliation
-            // with the agent to avoid this from happening.
-            _shutdownExecutor(framework, executor);
-          }
+        foreach (const TaskInfo& _task, tasks) {
+          const StatusUpdate update = protobuf::createStatusUpdate(
+              frameworkId,
+              info.id(),
+              _task.task_id(),
+              taskState,
+              TaskStatus::SOURCE_SLAVE,
+              id::UUID::random(),
+              "Master wants to launch executor, but there already exits one",
+              TaskStatus::REASON_EXECUTOR_TERMINATED,
+              executorId);
 
-          return;
+          statusUpdate(update, UPID());
         }
-      } else {
-        // Master does not want to launch executor.
-        if (executor == nullptr) {
-          // Master wants no new executor launched and there is none running on
-          // the agent. This could happen if the task expects some previous
-          // tasks to launch the executor. However, the earlier task got killed
-          // or dropped hence did not launch the executor but the master doesn't
-          // know about it yet because the `ExitedExecutorMessage` is still in
-          // flight. In this case, we will drop the task.
+
+        // Master expects a new executor to be launched for this task(s).
+        // To keep the master executor entries updated, the agent needs to
+        // send `ExitedExecutorMessage` even though no executor launched.
+        if (executor->state == Executor::TERMINATED) {
+          sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+        } else {
+          // This could happen if the following sequence of events happen:
           //
-          // We report TASK_DROPPED to the framework because the task was
-          // never launched. For non-partition-aware frameworks, we report
-          // TASK_LOST for backward compatibility.
-          mesos::TaskState taskState = TASK_DROPPED;
-          if (!protobuf::frameworkHasCapability(
-              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-            taskState = TASK_LOST;
-          }
+          //  (1) Master sends `runTaskMessage` to agent with
+          //      `launch_executor = true`;
+          //
+          //  (2) Before the agent got the `runTaskMessage`, it reconnects and
+          //      reconciles with the master. Master then removes the executor
+          //      entry it asked the agent to launch in step (1);
+          //
+          //  (3) Agent got the `runTaskMessage` sent in step (1), launches
+          //      the task and the executor (that the master does not know
+          //      about).
+          //
+          //  (4) Master now sends another `runTaskMessage` for the same
+          //      executor id with `launch_executor = true`.
+          //
+          // The agent ends up with a lingering executor that the master does
+          // not know about. We will shutdown the executor.
+          //
+          // TODO(mzhu): This could be avoided if the agent can
+          // tell whether the master's message was sent before or after the
+          // reconnection and discard the message in the former case.
+          //
+          // TODO(mzhu): Master needs to do proper executor reconciliation
+          // with the agent to avoid this from happening.
+          _shutdownExecutor(framework, executor);
+        }
 
-          foreach (const TaskInfo& _task, tasks) {
-            const StatusUpdate update = protobuf::createStatusUpdate(
-                frameworkId,
-                info.id(),
-                _task.task_id(),
-                taskState,
-                TaskStatus::SOURCE_SLAVE,
-                id::UUID::random(),
-                "No executor is expected to launch and there is none running",
-                TaskStatus::REASON_EXECUTOR_TERMINATED,
-                executorId);
+        return;
+      }
 
-            statusUpdate(update, UPID());
-          }
+      if (!launchExecutor.get() && executor == nullptr) {
+        // Master wants no new executor launched and there is none running on
+        // the agent. This could happen if the task expects some previous
+        // tasks to launch the executor. However, the earlier task got killed
+        // or dropped hence did not launch the executor but the master doesn't
+        // know about it yet because the `ExitedExecutorMessage` is still in
+        // flight. In this case, we will drop the task.
+        //
+        // We report TASK_DROPPED to the framework because the task was
+        // never launched. For non-partition-aware frameworks, we report
+        // TASK_LOST for backward compatibility.
+        mesos::TaskState taskState = TASK_DROPPED;
+        if (!protobuf::frameworkHasCapability(
+            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+          taskState = TASK_LOST;
+        }
 
-          // We do not send `ExitedExecutorMessage` here because the expectation
-          // is that there is already one on the fly to master. If the message
-          // gets dropped, we will hopefully reconcile with the master later.
+        foreach (const TaskInfo& _task, tasks) {
+          const StatusUpdate update = protobuf::createStatusUpdate(
+              frameworkId,
+              info.id(),
+              _task.task_id(),
+              taskState,
+              TaskStatus::SOURCE_SLAVE,
+              id::UUID::random(),
+              "No executor is expected to launch and there is none running",
+              TaskStatus::REASON_EXECUTOR_TERMINATED,
+              executorId);
 
-          return;
+          statusUpdate(update, UPID());
         }
+
+        // We do not send `ExitedExecutorMessage` here because the expectation
+        // is that there is already one on the fly to master. If the message
+        // gets dropped, we will hopefully reconcile with the master later.
+
+        return;
       }
     }
   }
 
+  // Either the master explicitly requests launching a new executor
+  // or we are in the legacy case of launching one if there wasn't
+  // one already. Either way, let's launch executor now.
+  if (executor == nullptr) {
+    executor = doLaunchExecutor();
+  }
+
   CHECK_NOTNULL(executor);
 
   switch (executor->state) {


[4/5] mesos git commit: Added a test for launching a task as an unknown user.

Posted by jp...@apache.org.
Added a test for launching a task as an unknown user.

Added a test that launches a task as an unknown user and verifies
that an appropriate status update is generated by the agent.

Review: https://reviews.apache.org/r/66707/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7132365d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7132365d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7132365d

Branch: refs/heads/master
Commit: 7132365d4c294128764bd3f71fc4510e6239d550
Parents: 6e4440d
Author: James Peach <jp...@apache.org>
Authored: Fri Apr 20 08:57:08 2018 -0700
Committer: James Peach <jp...@apache.org>
Committed: Fri Apr 20 08:57:08 2018 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 87 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 87 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7132365d/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 04f7aca..4cda395 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1265,6 +1265,93 @@ TEST_F(SlaveTest, DISABLED_ROOT_RunTaskWithCommandInfoWithUser)
 #endif // __WINDOWS__
 
 
+// This test verifies that the agent gracefully drops tasks when
+// a scheduler launches as a user that is not present on the agent.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+    SlaveTest, ROOT_RunTaskWithCommandInfoWithInvalidUser)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  // Enable `switch_user` so the agent is forced to
+  // evaluate the provided user name.
+  flags.switch_user = true;
+
+  Fetcher fetcher(flags);
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+  Owned<MesosContainerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get());
+
+  ASSERT_SOME(slave);
+
+  // Enable partition awareness so that we can expect `TASK_DROPPED`
+  // rather than `TASK_LOST`.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  const string taskUser = id::UUID::random().toString();
+
+  // Create a command that would trivially succeed if only
+  // the user was valid.
+  CommandInfo command;
+  command.set_user(taskUser);
+  command.set_shell(true);
+  command.set_value("true");
+
+  TaskInfo task = createTask(
+      offers->front().slave_id(),
+      offers->front().resources(),
+      command);
+
+  Future<TaskStatus> statusDropped;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusDropped));
+
+  driver.launchTasks(offers->front().id(), {task});
+
+  AWAIT_READY(statusDropped);
+  EXPECT_EQ(TASK_DROPPED, statusDropped->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statusDropped->source());
+
+  // Since we expect the task to fail because the task user didn't
+  // exist, it's reasonable to check that the user was mentioned in
+  // the status message.
+  EXPECT_TRUE(strings::contains(statusDropped->message(), taskUser))
+    << statusDropped->message();
+
+  driver.stop();
+  driver.join();
+}
+
+
 // This test ensures that a status update acknowledgement from a
 // non-leading master is ignored.
 TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)


[2/5] mesos git commit: Propagated executor sandbox creation errors.

Posted by jp...@apache.org.
Propagated executor sandbox creation errors.

Rather than crashing if the agent fails to create the executor
directory, propagate the error to the caller so that it can
handle it appropriately.

Review: https://reviews.apache.org/r/66705/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/24773d48
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/24773d48
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/24773d48

Branch: refs/heads/master
Commit: 24773d4842dee3d0a59ecb4b3ca65dc4ba02c94d
Parents: b848b09
Author: James Peach <jp...@apache.org>
Authored: Fri Apr 20 08:56:49 2018 -0700
Committer: James Peach <jp...@apache.org>
Committed: Fri Apr 20 08:56:49 2018 -0700

----------------------------------------------------------------------
 src/slave/paths.cpp       | 19 +++++++++++--------
 src/slave/paths.hpp       |  2 +-
 src/slave/slave.cpp       | 13 +++++++++----
 src/tests/paths_tests.cpp |  4 ++--
 4 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/24773d48/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index 690bfe3..ed0b127 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -722,7 +722,7 @@ string getPersistentVolumePath(
 }
 
 
-string createExecutorDirectory(
+Try<string> createExecutorDirectory(
     const string& rootDir,
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
@@ -749,9 +749,11 @@ string createExecutorDirectory(
   }
 
   Try<Nothing> mkdir = createSandboxDirectory(directory, user);
-
-  CHECK_SOME(mkdir)
-    << "Failed to create executor directory '" << directory << "'";
+  if (mkdir.isError()) {
+    return Error(
+        "Failed to create executor directory '" + directory + "': " +
+        mkdir.error());
+  }
 
   // Remove the previous "latest" symlink.
   const string latest =
@@ -764,10 +766,11 @@ string createExecutorDirectory(
 
   // Symlink the new executor directory to "latest".
   Try<Nothing> symlink = ::fs::symlink(directory, latest);
-
-  CHECK_SOME(symlink)
-    << "Failed to symlink directory '" << directory
-    << "' to '" << latest << "'";
+  if (symlink.isError()) {
+    return Error(
+        "Failed to symlink '" + directory + "' to '" + latest + "': " +
+        symlink.error());
+  }
 
   return directory;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/24773d48/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index fe5ab9e..0158964 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -392,7 +392,7 @@ std::string getPersistentVolumePath(
     const Resource& resource);
 
 
-std::string createExecutorDirectory(
+Try<std::string> createExecutorDirectory(
     const std::string& rootDir,
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/24773d48/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6885124..fab31fd 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -8872,6 +8872,7 @@ Executor* Framework::addExecutor(const ExecutorInfo& executorInfo)
   containerId.set_value(id::UUID::random().toString());
 
   Option<string> user = None();
+
 #ifndef __WINDOWS__
   if (slave->flags.switch_user) {
     // The command (either in form of task or executor command) can
@@ -8891,7 +8892,7 @@ Executor* Framework::addExecutor(const ExecutorInfo& executorInfo)
 #endif // __WINDOWS__
 
   // Create a directory for the executor.
-  const string directory = paths::createExecutorDirectory(
+  Try<string> directory = paths::createExecutorDirectory(
       slave->flags.work_dir,
       slave->info.id(),
       id(),
@@ -8899,12 +8900,14 @@ Executor* Framework::addExecutor(const ExecutorInfo& executorInfo)
       containerId,
       user);
 
+  CHECK_SOME(directory);
+
   Executor* executor = new Executor(
       slave,
       id(),
       executorInfo,
       containerId,
-      directory,
+      directory.get(),
       user,
       info.checkpoint());
 
@@ -8920,7 +8923,7 @@ Executor* Framework::addExecutor(const ExecutorInfo& executorInfo)
   LOG(INFO) << "Launching executor '" << executorInfo.executor_id()
             << "' of framework " << id()
             << " with resources " << executorInfo.resources()
-            << " in work directory '" << directory << "'";
+            << " in work directory '" << directory.get() << "'";
 
   const ExecutorID& executorId = executorInfo.executor_id();
   FrameworkID frameworkId = id();
@@ -9610,8 +9613,10 @@ void Executor::checkpointExecutor()
 
   // Create the meta executor directory.
   // NOTE: This creates the 'latest' symlink in the meta directory.
-  paths::createExecutorDirectory(
+  Try<string> mkdir = paths::createExecutorDirectory(
       slave->metaDir, slave->info.id(), frameworkId, id, containerId);
+
+  CHECK_SOME(mkdir);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/24773d48/src/tests/paths_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/paths_tests.cpp b/src/tests/paths_tests.cpp
index dc765ed..3fa8f52 100644
--- a/src/tests/paths_tests.cpp
+++ b/src/tests/paths_tests.cpp
@@ -84,7 +84,7 @@ protected:
 
 TEST_F(PathsTest, CreateExecutorDirectory)
 {
-  const string& result = paths::createExecutorDirectory(
+  Try<string> result = paths::createExecutorDirectory(
       rootDir, slaveId, frameworkId, executorId, containerId);
 
   // Expected directory layout.
@@ -99,7 +99,7 @@ TEST_F(PathsTest, CreateExecutorDirectory)
       "runs",
       containerId.value());
 
-  ASSERT_EQ(dir, result);
+  ASSERT_SOME_EQ(dir, result);
 }
 
 


[3/5] mesos git commit: Handled failing to create the executor sandbox.

Posted by jp...@apache.org.
Handled failing to create the executor sandbox.

When the agents adds a new executor, creating the sandbox might
fail (most commonly because the requested task user is not present
on the agent). Rather than crashing the agent with a `CHECK`,
we report a `TASK_DROPPED` status update. This makes the behavior
more consistent with the nested containers API, which also reports
an error in this case.

Review: https://reviews.apache.org/r/66706/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6e4440d8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6e4440d8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6e4440d8

Branch: refs/heads/master
Commit: 6e4440d8ea045877e51338b620182cecd471818f
Parents: 24773d4
Author: James Peach <jp...@apache.org>
Authored: Fri Apr 20 08:56:58 2018 -0700
Committer: James Peach <jp...@apache.org>
Committed: Fri Apr 20 08:56:58 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 94 ++++++++++++++++++++++++++++++++++--------------
 src/slave/slave.hpp |  2 +-
 2 files changed, 69 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6e4440d8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fab31fd..455e3cc 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2866,29 +2866,6 @@ void Slave::__run(
   LOG(INFO) << "Launching " << taskOrTaskGroup(task, taskGroup)
             << " for framework " << frameworkId;
 
-  auto doLaunchExecutor = [&]() {
-    Executor* executor = framework->addExecutor(executorInfo);
-
-    if (secretGenerator) {
-      generateSecret(framework->id(), executor->id, executor->containerId)
-        .onAny(defer(
-              self(),
-              &Self::launchExecutor,
-              lambda::_1,
-              frameworkId,
-              executorId,
-              taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
-    } else {
-      Slave::launchExecutor(
-          None(),
-          frameworkId,
-          executorId,
-          taskGroup.isNone() ? task.get() : Option<TaskInfo>::none());
-    }
-
-    return executor;
-  };
-
   Executor* executor = framework->getExecutor(executorId);
 
   // If launchExecutor is NONE, this is the legacy case where the master
@@ -3016,7 +2993,70 @@ void Slave::__run(
   // or we are in the legacy case of launching one if there wasn't
   // one already. Either way, let's launch executor now.
   if (executor == nullptr) {
-    executor = doLaunchExecutor();
+    Try<Executor*> added = framework->addExecutor(executorInfo);
+
+    if (added.isError()) {
+      CHECK(framework->getExecutor(executorId) == nullptr);
+
+      mesos::TaskState taskState = TASK_DROPPED;
+      if (!protobuf::frameworkHasCapability(
+          frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
+      foreach (const TaskInfo& _task, tasks) {
+        const StatusUpdate update = protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            _task.task_id(),
+            taskState,
+            TaskStatus::SOURCE_SLAVE,
+            id::UUID::random(),
+            added.error(),
+            TaskStatus::REASON_EXECUTOR_TERMINATED,
+            executorId);
+
+        statusUpdate(update, UPID());
+      }
+
+      // Refer to the comment after 'framework->removePendingTask' above
+      // for why we need this.
+      if (framework->idle()) {
+        removeFramework(framework);
+      }
+
+      if (launchExecutor.isSome() && launchExecutor.get()) {
+        // Master expects a new executor to be launched for this task(s).
+        // To keep the master executor entries updated, the agent needs to send
+        // `ExitedExecutorMessage` even though no executor launched.
+        sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+        // See the declaration of `taskLaunchSequences` regarding its lifecycle
+        // management.
+        framework->taskLaunchSequences.erase(executorInfo.executor_id());
+      }
+
+      return;
+    }
+
+    executor = added.get();
+
+    if (secretGenerator) {
+      generateSecret(framework->id(), executor->id, executor->containerId)
+        .onAny(defer(
+              self(),
+              &Self::launchExecutor,
+              lambda::_1,
+              frameworkId,
+              executorId,
+              taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
+    } else {
+      Slave::launchExecutor(
+          None(),
+          frameworkId,
+          executorId,
+          taskGroup.isNone() ? task.get() : Option<TaskInfo>::none());
+    }
   }
 
   CHECK_NOTNULL(executor);
@@ -8855,7 +8895,7 @@ void Framework::checkpointFramework() const
 }
 
 
-Executor* Framework::addExecutor(const ExecutorInfo& executorInfo)
+Try<Executor*> Framework::addExecutor(const ExecutorInfo& executorInfo)
 {
   // Verify that Resource.AllocationInfo is set, if coming
   // from a MULTI_ROLE master this will be set, otherwise
@@ -8900,7 +8940,9 @@ Executor* Framework::addExecutor(const ExecutorInfo& executorInfo)
       containerId,
       user);
 
-  CHECK_SOME(directory);
+  if (directory.isError()) {
+    return Error(directory.error());
+  }
 
   Executor* executor = new Executor(
       slave,

http://git-wip-us.apache.org/repos/asf/mesos/blob/6e4440d8/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d00c7b2..c35996b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -1079,7 +1079,7 @@ public:
 
   const FrameworkID id() const { return info.id(); }
 
-  Executor* addExecutor(const ExecutorInfo& executorInfo);
+  Try<Executor*> addExecutor(const ExecutorInfo& executorInfo);
   Executor* getExecutor(const ExecutorID& executorId) const;
   Executor* getExecutor(const TaskID& taskId) const;
 


[5/5] mesos git commit: Refactored sending a TASK_DROPPED status update.

Posted by jp...@apache.org.
Refactored sending a TASK_DROPPED status update.

Refactored how the agent sends a TASK_DROPPED status update to
remove code duplication. This also makes the agent consistently
send the executor ID in all these status updates.

Review: https://reviews.apache.org/r/66708/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4366d551
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4366d551
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4366d551

Branch: refs/heads/master
Commit: 4366d55104b467cb2767d60128e1ee713708433e
Parents: 7132365
Author: James Peach <jp...@apache.org>
Authored: Fri Apr 20 08:57:23 2018 -0700
Committer: James Peach <jp...@apache.org>
Committed: Fri Apr 20 08:57:23 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 184 +++++++++++++----------------------------------
 1 file changed, 48 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4366d551/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 455e3cc..9d2d192 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2536,6 +2536,34 @@ void Slave::__run(
 
   const ExecutorID& executorId = executorInfo.executor_id();
 
+  // We report TASK_DROPPED to the framework because the task was
+  // never launched. For non-partition-aware frameworks, we report
+  // TASK_LOST for backward compatibility.
+  auto sendTaskDroppedUpdate =
+    [&](TaskStatus::Reason reason, const string& message) {
+      mesos::TaskState taskState = TASK_DROPPED;
+
+      if (!protobuf::frameworkHasCapability(
+              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
+      foreach (const TaskInfo& _task, tasks) {
+        const StatusUpdate update = protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            _task.task_id(),
+            taskState,
+            TaskStatus::SOURCE_SLAVE,
+            id::UUID::random(),
+            message,
+            reason,
+            executorId);
+
+        statusUpdate(update, UPID());
+      }
+    };
+
   // We don't send a status update here because a terminating
   // framework cannot send acknowledgements.
   if (framework->state == Framework::TERMINATING) {
@@ -2654,29 +2682,9 @@ void Slave::__run(
   }
 
   if (kill) {
-    // We report TASK_DROPPED to the framework because the task was
-    // never launched. For non-partition-aware frameworks, we report
-    // TASK_LOST for backward compatibility.
-    mesos::TaskState taskState = TASK_DROPPED;
-    if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-      taskState = TASK_LOST;
-    }
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          taskState,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "Tasks assumes outdated resource state",
-          TaskStatus::REASON_INVALID_OFFERS,
-          executorId);
-
-      statusUpdate(update, UPID());
-    }
+    sendTaskDroppedUpdate(
+        TaskStatus::REASON_INVALID_OFFERS,
+        "Task assumes outdated resource state");
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2732,29 +2740,10 @@ void Slave::__run(
   }
 
   if (kill) {
-    // We report TASK_DROPPED to the framework because the task was
-    // never launched. For non-partition-aware frameworks, we report
-    // TASK_LOST for backward compatibility.
-    mesos::TaskState taskState = TASK_DROPPED;
-    if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-      taskState = TASK_LOST;
-    }
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          taskState,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "The checkpointed resources being used by the task or task group are "
-          "unknown to the agent",
-          TaskStatus::REASON_RESOURCES_UNKNOWN);
-
-      statusUpdate(update, UPID());
-    }
+    sendTaskDroppedUpdate(
+        TaskStatus::REASON_RESOURCES_UNKNOWN,
+        "The checkpointed resources being used by the task or task group are "
+        "unknown to the agent");
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2795,30 +2784,10 @@ void Slave::__run(
   }
 
   if (kill) {
-    // We report TASK_DROPPED to the framework because the task was
-    // never launched. For non-partition-aware frameworks, we report
-    // TASK_LOST for backward compatibility.
-    mesos::TaskState taskState = TASK_DROPPED;
-    if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-      taskState = TASK_LOST;
-    }
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          taskState,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "The checkpointed resources being used by the executor are unknown "
-          "to the agent",
-          TaskStatus::REASON_RESOURCES_UNKNOWN,
-          executorId);
-
-      statusUpdate(update, UPID());
-    }
+    sendTaskDroppedUpdate(
+        TaskStatus::REASON_RESOURCES_UNKNOWN,
+        "The checkpointed resources being used by the executor are unknown "
+        "to the agent");
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2887,29 +2856,9 @@ void Slave::__run(
         // but the agent is still waiting for all the status updates to be
         // acked before removing the executor struct.
 
-        // We report TASK_DROPPED to the framework because the task was
-        // never launched. For non-partition-aware frameworks, we report
-        // TASK_LOST for backward compatibility.
-        mesos::TaskState taskState = TASK_DROPPED;
-        if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-          taskState = TASK_LOST;
-        }
-
-        foreach (const TaskInfo& _task, tasks) {
-          const StatusUpdate update = protobuf::createStatusUpdate(
-              frameworkId,
-              info.id(),
-              _task.task_id(),
-              taskState,
-              TaskStatus::SOURCE_SLAVE,
-              id::UUID::random(),
-              "Master wants to launch executor, but there already exits one",
-              TaskStatus::REASON_EXECUTOR_TERMINATED,
-              executorId);
-
-          statusUpdate(update, UPID());
-        }
+        sendTaskDroppedUpdate(
+            TaskStatus::REASON_EXECUTOR_TERMINATED,
+            "Master wants to launch executor, but one already exists");
 
         // Master expects a new executor to be launched for this task(s).
         // To keep the master executor entries updated, the agent needs to
@@ -2955,30 +2904,10 @@ void Slave::__run(
         // or dropped hence did not launch the executor but the master doesn't
         // know about it yet because the `ExitedExecutorMessage` is still in
         // flight. In this case, we will drop the task.
-        //
-        // We report TASK_DROPPED to the framework because the task was
-        // never launched. For non-partition-aware frameworks, we report
-        // TASK_LOST for backward compatibility.
-        mesos::TaskState taskState = TASK_DROPPED;
-        if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-          taskState = TASK_LOST;
-        }
 
-        foreach (const TaskInfo& _task, tasks) {
-          const StatusUpdate update = protobuf::createStatusUpdate(
-              frameworkId,
-              info.id(),
-              _task.task_id(),
-              taskState,
-              TaskStatus::SOURCE_SLAVE,
-              id::UUID::random(),
-              "No executor is expected to launch and there is none running",
-              TaskStatus::REASON_EXECUTOR_TERMINATED,
-              executorId);
-
-          statusUpdate(update, UPID());
-        }
+        sendTaskDroppedUpdate(
+            TaskStatus::REASON_EXECUTOR_TERMINATED,
+            "No executor is expected to launch and there is none running");
 
         // We do not send `ExitedExecutorMessage` here because the expectation
         // is that there is already one on the fly to master. If the message
@@ -2998,26 +2927,9 @@ void Slave::__run(
     if (added.isError()) {
       CHECK(framework->getExecutor(executorId) == nullptr);
 
-      mesos::TaskState taskState = TASK_DROPPED;
-      if (!protobuf::frameworkHasCapability(
-          frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-        taskState = TASK_LOST;
-      }
-
-      foreach (const TaskInfo& _task, tasks) {
-        const StatusUpdate update = protobuf::createStatusUpdate(
-            frameworkId,
-            info.id(),
-            _task.task_id(),
-            taskState,
-            TaskStatus::SOURCE_SLAVE,
-            id::UUID::random(),
-            added.error(),
-            TaskStatus::REASON_EXECUTOR_TERMINATED,
-            executorId);
-
-        statusUpdate(update, UPID());
-      }
+      sendTaskDroppedUpdate(
+          TaskStatus::REASON_EXECUTOR_TERMINATED,
+          added.error());
 
       // Refer to the comment after 'framework->removePendingTask' above
       // for why we need this.