You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2019/07/15 18:50:20 UTC

[mesos] branch master updated (a32fd27 -> c076c8c)

This is an automated email from the ASF dual-hosted git repository.

grag pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from a32fd27  Updated 3 unit tests by changing IO switchboard to local mode.
     new 3c959eb  Added minimal agent handler for 'DrainSlaveMessage'.
     new 7d08b66  Added the DrainConfig to agent API outputs.
     new 04d25af  Added test for DrainConfig in agent API outputs.
     new ef19f29  Refactored the agent's task-killing code.
     new 27f0cd3  Updated an equality operator.
     new 3bb8287  Added kill policy to the 'Task' message.
     new e1c7985  Killed all tasks on the agent when draining.
     new 505928a  Added tests for task killing when draining the agent.
     new 1a32b31  Fixed pid checkpointing for `TestContainerizer`.
     new 54fb43e  Added recovery of agent drain information.
     new 1889268  Adjusted task status updates during draining.
     new a7044bd  Changed agent to fail task launches received during draining.
     new 654faf9  Cleared agent drain state when draining is finished.
     new c076c8c  Added test for agent to leave draining state on its own.

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/mesos/agent/agent.proto    |   2 +
 include/mesos/mesos.proto          |   4 +
 include/mesos/type_utils.hpp       |   8 +
 include/mesos/v1/agent/agent.proto |   2 +
 include/mesos/v1/mesos.proto       |   4 +
 src/common/protobuf_utils.cpp      |   4 +
 src/common/type_utils.cpp          |  17 +-
 src/slave/http.cpp                 |  11 +
 src/slave/paths.cpp                |   9 +
 src/slave/paths.hpp                |   6 +
 src/slave/slave.cpp                | 366 ++++++++++++++----
 src/slave/slave.hpp                |  31 +-
 src/slave/state.cpp                |  16 +
 src/slave/state.hpp                |   3 +
 src/tests/containerizer.cpp        |  12 +
 src/tests/slave_tests.cpp          | 756 +++++++++++++++++++++++++++++++++++++
 16 files changed, 1173 insertions(+), 78 deletions(-)


[mesos] 06/14: Added kill policy to the 'Task' message.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3bb8287378bfdbef74288212302d0f2628d51b23
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Mon Jul 15 10:25:49 2019 -0700

    Added kill policy to the 'Task' message.
    
    Review: https://reviews.apache.org/r/70901/
---
 include/mesos/mesos.proto     | 4 ++++
 include/mesos/type_utils.hpp  | 1 +
 include/mesos/v1/mesos.proto  | 4 ++++
 src/common/protobuf_utils.cpp | 4 ++++
 src/common/type_utils.cpp     | 9 ++++++++-
 5 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index e0a2391..324f686 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -2338,6 +2338,10 @@ message Task {
 
   // TODO(greggomann): Add the task's `CheckInfo`. See MESOS-8780.
 
+  // The kill policy used for this task when it is killed. It's possible for
+  // this policy to be overridden by the scheduler when killing the task.
+  optional KillPolicy kill_policy = 16;
+
   // Specific user under which task is running.
   optional string user = 14;
 }
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index b9e6164..2fd8a62 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -63,6 +63,7 @@ bool operator==(const DiscoveryInfo& left, const DiscoveryInfo& right);
 bool operator==(const Environment& left, const Environment& right);
 bool operator==(const ExecutorInfo& left, const ExecutorInfo& right);
 bool operator==(const HealthCheck& left, const HealthCheck& right);
+bool operator==(const KillPolicy& left, const KillPolicy& right);
 bool operator==(const Label& left, const Label& right);
 bool operator==(const Labels& left, const Labels& right);
 bool operator==(const MasterInfo& left, const MasterInfo& right);
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index af29a14..aa9c525 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -2327,6 +2327,10 @@ message Task {
 
   // TODO(greggomann): Add the task's `CheckInfo`. See MESOS-8780.
 
+  // The kill policy used for this task when it is killed. It's possible for
+  // this policy to be overridden by the scheduler when killing the task.
+  optional KillPolicy kill_policy = 16;
+
   // Specific user under which task is running.
   optional string user = 14;
 }
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 0112fcb..c91d543 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -416,6 +416,10 @@ Task createTask(
     t.mutable_health_check()->CopyFrom(task.health_check());
   }
 
+  if (task.has_kill_policy()) {
+    t.mutable_kill_policy()->CopyFrom(task.kill_policy());
+  }
+
   // Copy `user` if set.
   if (task.has_command() && task.command().has_user()) {
     t.set_user(task.command().user());
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index 16d6657..5bf7113 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -406,6 +406,12 @@ bool operator==(const HealthCheck& left, const HealthCheck& right)
 }
 
 
+bool operator==(const KillPolicy& left, const KillPolicy& right)
+{
+  return google::protobuf::util::MessageDifferencer::Equals(left, right);
+}
+
+
 bool operator==(const MasterInfo& left, const MasterInfo& right)
 {
   return left.id() == right.id() &&
@@ -583,7 +589,8 @@ bool operator==(const Task& left, const Task& right)
     left.discovery() == right.discovery() &&
     left.user() == right.user() &&
     left.container() == right.container() &&
-    left.health_check() == right.health_check();
+    left.health_check() == right.health_check() &&
+    left.kill_policy() == right.kill_policy();
 }
 
 


[mesos] 12/14: Changed agent to fail task launches received during draining.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a7044bdcd91e467173bb44263658ac5c8df08d8c
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Mon Jul 15 10:26:18 2019 -0700

    Changed agent to fail task launches received during draining.
    
    With this patch the agent will now reject task launches while draining.
    While we do not expect the master to send task launches to draining
    agents it is still worthwhile to ensure no new tasks can be launched
    while draining. This invariant simplifies e.g., the handling of drain
    requests since we know that once the agent has entered a draining state
    we only need to terminate existing tasks and no new tasks can appear.
    
    Review: https://reviews.apache.org/r/70958/
---
 src/slave/slave.cpp | 74 +++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 52 insertions(+), 22 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 37385bd..eecd71e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2736,17 +2736,51 @@ void Slave::__run(
     CHECK(framework->removePendingTask(_task.task_id()));
   }
 
-  // Check task invariants.
+  // Check task launch invariants.
   //
   // TODO(bbannier): Instead of copy-pasting identical code to deal
   // with cases where tasks need to be terminated, consolidate code
   // below to decouple checking from terminating.
+  Option<string> kill = None();
+
+  // Fail the launch if the agent is draining.
+  if (drainConfig.isSome()) {
+    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+                 << " of framework " << frameworkId
+                 << " because the agent is draining";
+
+    kill = "Task was received while agent was already draining";
+  }
+
+  if (kill.isSome()) {
+    sendTaskDroppedUpdate(TaskStatus::REASON_SLAVE_DRAINING, *kill);
+
+    // Refer to the comment after 'framework->removePendingTask' above
+    // for why we need this.
+    if (framework->idle()) {
+      removeFramework(framework);
+    }
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects a new executor to be launched for this task(s).
+      // To keep the master executor entries updated, the agent needs to send
+      // `ExitedExecutorMessage` even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
+    }
+
+    return;
+  }
+
+  CHECK_NONE(kill);
 
   // If the master sent resource versions, perform a best-effort check
   // that they are consistent with the resources the task uses.
   //
   // TODO(bbannier): Also check executor resources.
-  bool kill = false;
   if (!resourceVersionUuids.empty()) {
     hashset<Option<ResourceProviderID>> usedResourceProviderIds;
     foreach (const TaskInfo& _task, tasks) {
@@ -2767,7 +2801,7 @@ void Slave::__run(
         CHECK(receivedResourceVersions.contains(None()));
 
         if (resourceVersion != receivedResourceVersions.at(None())) {
-          kill = true;
+          kill = "Task assumes outdated resource state";
         }
       } else {
         ResourceProvider* resourceProvider =
@@ -2776,16 +2810,14 @@ void Slave::__run(
         if (resourceProvider == nullptr ||
             resourceProvider->resourceVersion !=
               receivedResourceVersions.at(resourceProviderId.get())) {
-          kill = true;
+          kill = "Task assumes outdated resource state";
         }
       }
     }
   }
 
-  if (kill) {
-    sendTaskDroppedUpdate(
-        TaskStatus::REASON_INVALID_OFFERS,
-        "Task assumes outdated resource state");
+  if (kill.isSome()) {
+    sendTaskDroppedUpdate(TaskStatus::REASON_INVALID_OFFERS, *kill);
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2813,7 +2845,7 @@ void Slave::__run(
     return result;
   };
 
-  CHECK_EQ(kill, false);
+  CHECK_NONE(kill);
 
   // NOTE: If the task/task group or executor uses resources that are
   // checkpointed on the slave (e.g. persistent volumes), we should
@@ -2834,17 +2866,16 @@ void Slave::__run(
                      << " for task " << _task
                      << " of framework " << frameworkId;
 
-        kill = true;
+        kill =
+          "The checkpointed resources being used by the task or task group are "
+          "unknown to the agent";
         break;
       }
     }
   }
 
-  if (kill) {
-    sendTaskDroppedUpdate(
-        TaskStatus::REASON_RESOURCES_UNKNOWN,
-        "The checkpointed resources being used by the task or task group are "
-        "unknown to the agent");
+  if (kill.isSome()) {
+    sendTaskDroppedUpdate(TaskStatus::REASON_RESOURCES_UNKNOWN, *kill);
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2866,7 +2897,7 @@ void Slave::__run(
     return;
   }
 
-  CHECK_EQ(kill, false);
+  CHECK_NONE(kill);
 
   // Refer to the comment above when looping across tasks on
   // why we need to unallocate resources.
@@ -2879,16 +2910,15 @@ void Slave::__run(
                    << " for executor '" << executorId
                    << "' of framework " << frameworkId;
 
-      kill = true;
+      kill =
+        "The checkpointed resources being used by the executor are unknown "
+        "to the agent";
       break;
     }
   }
 
-  if (kill) {
-    sendTaskDroppedUpdate(
-        TaskStatus::REASON_RESOURCES_UNKNOWN,
-        "The checkpointed resources being used by the executor are unknown "
-        "to the agent");
+  if (kill.isSome()) {
+    sendTaskDroppedUpdate(TaskStatus::REASON_RESOURCES_UNKNOWN, *kill);
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.


[mesos] 02/14: Added the DrainConfig to agent API outputs.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 7d08b667e446840dc31538d9d40705e3d8fb12a0
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Mon Jul 15 10:25:35 2019 -0700

    Added the DrainConfig to agent API outputs.
    
    Review: https://reviews.apache.org/r/70835/
---
 include/mesos/agent/agent.proto    |  2 ++
 include/mesos/v1/agent/agent.proto |  2 ++
 src/slave/http.cpp                 | 11 +++++++++++
 3 files changed, 15 insertions(+)

diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 83eb7bb..3cb622d 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -569,6 +569,8 @@ message Response {
   // Contains the agent's information.
   message GetAgent {
     optional SlaveInfo slave_info = 1;
+
+    optional DrainConfig drain_config = 2;
   }
 
   // Lists information about all resource providers known to the agent
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index f6574cb..4324ad6 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -569,6 +569,8 @@ message Response {
   // Contains the agent's information.
   message GetAgent {
     optional AgentInfo agent_info = 1;
+
+    optional DrainConfig drain_config = 2;
   }
 
   // Lists information about all resource providers known to the agent
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 69e6d74..321dca7 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -1331,6 +1331,12 @@ Future<Response> Http::state(
               writer->field("domain", slave->info.domain());
             }
 
+            if (slave->drainConfig.isSome()) {
+              writer->field(
+                  "drain_config",
+                  JSON::Protobuf(slave->drainConfig.get()));
+            }
+
             const Resources& totalResources = slave->totalResources;
 
             writer->field("resources", totalResources);
@@ -1842,6 +1848,11 @@ Future<Response> Http::getAgent(
 
   response.mutable_get_agent()->mutable_slave_info()->CopyFrom(slave->info);
 
+  if (slave->drainConfig.isSome()) {
+    response.mutable_get_agent()->mutable_drain_config()->CopyFrom(
+        slave->drainConfig.get());
+  }
+
   return OK(serialize(acceptType, evolve(response)),
             stringify(acceptType));
 }


[mesos] 01/14: Added minimal agent handler for 'DrainSlaveMessage'.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3c959eb769ec4a39721947e7ec173dd9eefc6af4
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Mon Jul 15 10:25:30 2019 -0700

    Added minimal agent handler for 'DrainSlaveMessage'.
    
    This patch adds a minimal handler to the agent for the
    `DrainSlaveMessage`. This handler will later be extended
    to implement the full functionality.
    
    Review: https://reviews.apache.org/r/70834/
---
 src/slave/paths.cpp |  9 +++++++++
 src/slave/paths.hpp |  6 ++++++
 src/slave/slave.cpp | 20 ++++++++++++++++++++
 src/slave/slave.hpp |  8 ++++++++
 4 files changed, 43 insertions(+)

diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index 1163c88..28a7cf9 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -55,6 +55,7 @@ namespace paths {
 // File names.
 const char BOOT_ID_FILE[] = "boot_id";
 const char SLAVE_INFO_FILE[] = "slave.info";
+const char DRAIN_CONFIG_FILE[] = "drain.config";
 const char FRAMEWORK_PID_FILE[] = "framework.pid";
 const char FRAMEWORK_INFO_FILE[] = "framework.info";
 const char LIBPROCESS_PID_FILE[] = "libprocess.pid";
@@ -658,6 +659,14 @@ string getResourcesTargetPath(
 }
 
 
+string getDrainConfigPath(
+    const string& metaDir,
+    const SlaveID& slaveId)
+{
+  return path::join(getSlavePath(metaDir, slaveId), DRAIN_CONFIG_FILE);
+}
+
+
 Try<list<string>> getPersistentVolumePaths(
     const std::string& workDir)
 {
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index ad76826..e077587 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -76,6 +76,7 @@ namespace paths {
 //   |   |   |-- latest (symlink)
 //   |   |   |-- <slave_id>
 //   |   |       |-- slave.info
+//   |   |       |-- drain.config
 //   |   |       |-- operations
 //   |   |       |   |-- <operation_uuid>
 //   |   |       |       |-- operation.updates
@@ -422,6 +423,11 @@ std::string getResourcesTargetPath(
     const std::string& rootDir);
 
 
+std::string getDrainConfigPath(
+    const std::string& metaDir,
+    const SlaveID& slaveId);
+
+
 Try<std::list<std::string>> getPersistentVolumePaths(
     const std::string& workDir);
 
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 30039b0..fc688dc 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -749,6 +749,8 @@ void Slave::initialize()
       &Slave::shutdown,
       &ShutdownMessage::message);
 
+  install<DrainSlaveMessage>(&Slave::drain);
+
   install<PingSlaveMessage>(
       &Slave::ping,
       &PingSlaveMessage::connected);
@@ -982,6 +984,24 @@ void Slave::shutdown(const UPID& from, const string& message)
 }
 
 
+void Slave::drain(
+    const UPID& from,
+    DrainSlaveMessage&& drainSlaveMessage)
+{
+  LOG(INFO)
+    << "Checkpointing DrainConfig. Previous drain config was "
+    << (drainConfig.isSome() ? stringify(drainConfig.get()) : "NONE")
+    << ", new drain config is " << drainSlaveMessage.config();
+
+  CHECK_SOME(state::checkpoint(
+      paths::getDrainConfigPath(metaDir, info.id()),
+      drainSlaveMessage.config()))
+    << "Failed to checkpoint DrainConfig";
+
+  drainConfig = drainSlaveMessage.config();
+}
+
+
 void Slave::fileAttached(
     const Future<Nothing>& result,
     const string& path,
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 6954f53..dbcceed 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -376,6 +376,10 @@ public:
       const process::UPID& from,
       const AcknowledgeOperationStatusMessage& acknowledgement);
 
+  void drain(
+      const process::UPID& from,
+      DrainSlaveMessage&& drainSlaveMessage);
+
   void executorLaunched(
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,
@@ -885,6 +889,10 @@ private:
 
   // Operations that are checkpointed by the agent.
   hashmap<UUID, Operation> checkpointedOperations;
+
+  // If the agent is currently draining, contains the configuration used to
+  // drain the agent. If NONE, the agent is not currently draining.
+  Option<DrainConfig> drainConfig;
 };
 
 


[mesos] 08/14: Added tests for task killing when draining the agent.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 505928a3f51555bd3e45f2fc9787fdf890b28bfb
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Mon Jul 15 10:25:56 2019 -0700

    Added tests for task killing when draining the agent.
    
    Review: https://reviews.apache.org/r/70904/
---
 src/tests/slave_tests.cpp | 335 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 335 insertions(+)

diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 8098a1a..147967d 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -94,6 +94,8 @@
 #include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
+#include "tests/containerizer/mock_containerizer.hpp"
+
 using namespace mesos::internal::slave;
 
 #ifdef USE_SSL_SOCKET
@@ -11881,6 +11883,339 @@ TEST_F(SlaveTest, DrainInfoInAPIOutputs)
   }
 }
 
+
+// When an agent receives a `DrainSlaveMessage`, it should kill running tasks.
+TEST_F(SlaveTest, DrainAgentKillsRunningTask)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  v1::Offer::Operation launch = v1::LAUNCH({taskInfo});
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {launch}));
+
+  AWAIT_READY(startingUpdate);
+  EXPECT_EQ(v1::TASK_STARTING, startingUpdate->status().state());
+
+  AWAIT_READY(runningUpdate);
+  EXPECT_EQ(v1::TASK_RUNNING, runningUpdate->status().state());
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate));
+
+  // Simulate the master sending a `DrainSlaveMessage` to the agent.
+
+  // Immediately kill the task forcefully.
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(0);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(true);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  AWAIT_READY(killedUpdate);
+
+  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+}
+
+
+// When the agent receives a `DrainSlaveMessage`, it should kill queued tasks.
+TEST_F(SlaveTest, DrainAgentKillsQueuedTask)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  MockContainerizer mockContainerizer;
+  StandaloneMasterDetector detector(master.get()->pid);
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  EXPECT_CALL(mockContainerizer, recover(_))
+    .WillOnce(Return(Nothing()));
+
+  EXPECT_CALL(mockContainerizer, containers())
+    .WillOnce(Return(hashset<ContainerID>()));
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(
+      &detector,
+      &mockContainerizer,
+      slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  v1::Offer::Operation launch = v1::LAUNCH({taskInfo});
+
+  // Return a pending future from the containerizer when launching the executor
+  // container so that the task remains pending.
+  Promise<slave::Containerizer::LaunchResult> launchResult;
+  Future<Nothing> launched;
+  EXPECT_CALL(mockContainerizer, launch(_, _, _, _))
+    .WillOnce(DoAll(
+        FutureSatisfy(&launched),
+        Return(launchResult.future())));
+
+  EXPECT_CALL(mockContainerizer, update(_, _))
+    .WillOnce(Return(Nothing()));
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {launch}));
+
+  AWAIT_READY(launched);
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate));
+
+  // Simulate the master sending a `DrainSlaveMessage` to the agent.
+
+  // Immediately kill the task forcefully.
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(0);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(true);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  EXPECT_CALL(mockContainerizer, destroy(_))
+    .WillOnce(Return(None()));
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  AWAIT_READY(killedUpdate);
+
+  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+}
+
+
+// When the agent receives a `DrainSlaveMessage`, it should kill pending tasks.
+TEST_F(SlaveTest, DrainAgentKillsPendingTask)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  MockAuthorizer mockAuthorizer;
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(
+      &detector,
+      &mockAuthorizer,
+      slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  v1::Offer::Operation launch = v1::LAUNCH({taskInfo});
+
+  // Intercept authorization so that the task remains pending.
+  Future<Nothing> authorized;
+  Promise<bool> promise; // Never satisfied.
+  EXPECT_CALL(mockAuthorizer, authorized(_))
+    .WillOnce(DoAll(FutureSatisfy(&authorized),
+                    Return(promise.future())));
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {launch}));
+
+  AWAIT_READY(authorized);
+
+  Future<v1::scheduler::Event::Update> killedUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate));
+
+  // Simulate the master sending a `DrainSlaveMessage` to the agent.
+
+  // Immediately kill the task forcefully.
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(0);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(true);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  AWAIT_READY(killedUpdate);
+
+  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[mesos] 07/14: Killed all tasks on the agent when draining.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit e1c7985e96d84693f3e41d3a50da5f5ea11b6cd8
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Mon Jul 15 10:25:51 2019 -0700

    Killed all tasks on the agent when draining.
    
    This patch updates the agent's `DrainSlaveMessage` handler
    to kill all tasks on the agent when the message is received.
    
    Review: https://reviews.apache.org/r/70903/
---
 include/mesos/type_utils.hpp |  6 +++++
 src/slave/slave.cpp          | 62 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 68 insertions(+)

diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index 2fd8a62..98a2995 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -338,6 +338,12 @@ inline bool operator<(const ContainerID& left, const ContainerID& right)
 }
 
 
+inline bool operator<(const DurationInfo& left, const DurationInfo& right)
+{
+  return left.nanoseconds() < right.nanoseconds();
+}
+
+
 inline bool operator<(const ExecutorID& left, const ExecutorID& right)
 {
   return left.value() < right.value();
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 741c1f6..19b4769 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -999,6 +999,68 @@ void Slave::drain(
     << "Failed to checkpoint DrainConfig";
 
   drainConfig = drainSlaveMessage.config();
+
+  const Option<DurationInfo> maxGracePeriod =
+    drainConfig->has_max_grace_period()
+      ? drainConfig->max_grace_period()
+      : Option<DurationInfo>::none();
+
+  auto calculateKillPolicy =
+    [&](const Option<KillPolicy>& killPolicy) -> Option<KillPolicy> {
+      if (maxGracePeriod.isNone()) {
+        return None();
+      }
+
+      KillPolicy killPolicyOverride;
+      killPolicyOverride.mutable_grace_period()->CopyFrom(maxGracePeriod.get());
+
+      // Task kill policy is not set or unknown.
+      if (killPolicy.isNone() || !killPolicy->has_grace_period()) {
+        return killPolicyOverride;
+      }
+
+      // Task kill policy is greater than the override.
+      if (maxGracePeriod.get() < killPolicy->grace_period()) {
+        return killPolicyOverride;
+      }
+
+      return None();
+    };
+
+  // Frameworks may be removed within `kill()` or `killPendingTask()` below,
+  // so we must copy them and their members before looping.
+  foreachvalue (Framework* framework, utils::copy(frameworks)) {
+    typedef hashmap<TaskID, TaskInfo> TaskMap;
+    foreachvalue (const TaskMap& tasks, utils::copy(framework->pendingTasks)) {
+      foreachvalue (const TaskInfo& task, tasks) {
+        killPendingTask(framework->id(), framework, task.task_id());
+      }
+    }
+
+    foreachvalue (Executor* executor, utils::copy(framework->executors)) {
+      foreachvalue (Task* task, executor->launchedTasks) {
+        kill(framework->id(),
+             framework,
+             executor,
+             task->task_id(),
+             calculateKillPolicy(
+                task->has_kill_policy()
+                  ? task->kill_policy()
+                  : Option<KillPolicy>::none()));
+      }
+
+      foreachvalue (const TaskInfo& task, utils::copy(executor->queuedTasks)) {
+        kill(framework->id(),
+             framework,
+             executor,
+             task.task_id(),
+             calculateKillPolicy(
+                task.has_kill_policy()
+                  ? task.kill_policy()
+                  : Option<KillPolicy>::none()));
+      }
+    }
+  }
 }
 
 


[mesos] 14/14: Added test for agent to leave draining state on its own.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit c076c8ce286abfb34de3d962a7ca1601d9494919
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Mon Jul 15 10:26:28 2019 -0700

    Added test for agent to leave draining state on its own.
    
    This patch adds a test which confirms that the agent leaves a draining
    state on its own once all frameworks on the agent have no more pending
    tasks and all their executors have neither launched or queued tasks.
    
    The test uses the fact that the agent rejects task launches while
    draining.
    
    Review: https://reviews.apache.org/r/70960/
---
 src/tests/slave_tests.cpp | 188 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 188 insertions(+)

diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 95f7780..1ed59ca 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -12223,6 +12223,194 @@ TEST_F(SlaveTest, DrainAgentKillsPendingTask)
 }
 
 
+// This test validates that a draining agent fails further task launch
+// attempts to protect its internal draining invariants, and that the
+// agent leaves the draining state on its own once all tasks have
+// terminated and their status updates have been acknowledged.
+TEST_F(SlaveTest, DrainingAgentRejectLaunch)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Register a scheduler to launch tasks.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers1;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers1);
+  ASSERT_FALSE(offers1->offers().empty());
+
+  v1::Offer offer = offers1->offers(0);
+  v1::AgentID agentId = offer.agent_id();
+
+  // Launch a task. When the agent is put into draining state this task will be
+  // killed, but we will leave the draining state open even after the task is
+  // killed by not acknowledging the terminal task status update.
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::TaskInfo taskInfo1 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000), None());
+
+  // We do not acknowledge the KILLED update to control
+  // when the agent finishes draining.
+  Future<v1::scheduler::Event::Update> runningUpdate1;
+  Future<v1::scheduler::Event::Update> killedUpdate1;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, agentId)) // Starting.
+    .WillOnce(DoAll(
+        v1::scheduler::SendAcknowledge(frameworkId, agentId),
+        FutureArg<1>(&runningUpdate1)))
+    .WillOnce(FutureArg<1>(&killedUpdate1));
+
+  mesos.send(
+      v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({taskInfo1})}));
+
+  AWAIT_READY(runningUpdate1);
+
+  // Simulate the master sending a `DrainSlaveMessage` to the agent.
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(0);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(false);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  // Explicitly wait for the executor to be terminated.
+  Future<Nothing> executorTerminated =
+    FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  // Wait until we have received the terminal task status update
+  // (which we did not acknowledge) before continuing. The agent will
+  // subsequentially be left in a draining state.
+  AWAIT_READY(killedUpdate1);
+  ASSERT_EQ(v1::TASK_KILLED, killedUpdate1->status().state());
+
+  Future<v1::scheduler::Event::Offers> offers2;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Resume the clock so the containerizer can detect the terminated executor.
+  Clock::resume();
+  AWAIT_READY(executorTerminated);
+  Clock::pause();
+  Clock::settle();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers2);
+  ASSERT_FALSE(offers2->offers().empty());
+
+  offer = offers2->offers(0);
+  agentId = offer.agent_id();
+
+  // Launch another task. Since the agent is in draining
+  // state the task will be rejected by the agent.
+  Future<v1::scheduler::Event::Update> lostUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(DoAll(
+        v1::scheduler::SendAcknowledge(frameworkId, agentId),
+        FutureArg<1>(&lostUpdate)));
+
+  v1::TaskInfo taskInfo2 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000), None());
+
+  mesos.send(
+      v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({taskInfo2})}));
+
+  AWAIT_READY(lostUpdate);
+  ASSERT_EQ(taskInfo2.task_id(), lostUpdate->status().task_id());
+  ASSERT_EQ(v1::TASK_LOST, lostUpdate->status().state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_AGENT_DRAINING, lostUpdate->status().reason());
+
+  // Acknowledge the pending task status update. Once the acknowledgement has
+  // been processed the agent will leave its draining state and accept task
+  // launches again.
+  {
+    v1::scheduler::Call call;
+    call.set_type(v1::scheduler::Call::ACKNOWLEDGE);
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+
+    v1::scheduler::Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(killedUpdate1->status().task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(agentId);
+    acknowledge->set_uuid(killedUpdate1->status().uuid());
+
+    mesos.send(call);
+  }
+
+  Future<v1::scheduler::Event::Offers> offers3;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers3))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Trigger another allocation.
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers3);
+  ASSERT_FALSE(offers3->offers().empty());
+
+  offer = offers3->offers(0);
+  agentId = offer.agent_id();
+
+  // The agent should have left its running state and now accept task launches.
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, agentId)) // Starting.
+    .WillOnce(FutureArg<1>(&runningUpdate2));
+
+  mesos.send(
+      v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({taskInfo2})}));
+
+  AWAIT_READY(runningUpdate2);
+  EXPECT_EQ(taskInfo2.task_id(), runningUpdate2->status().task_id());
+  EXPECT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
+}
+
+
 // This test verifies that if the agent recovers that it is in
 // draining state any tasks after the restart are killed.
 TEST_F(SlaveTest, CheckpointedDrainInfo)


[mesos] 11/14: Adjusted task status updates during draining.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 18892687ffd98be35fc0f2012df5aae9c99a034e
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Mon Jul 15 10:26:10 2019 -0700

    Adjusted task status updates during draining.
    
    When a task is reported as killed to the agent during active agent
    draining we now decorate the reported status with
    `REASON_AGENT_DRAINING` unconditionally. If the draining marks the agent
    as gone via the `mark_gone` draining flag we additionally report
    `TASK_GONE_BY_OPERATOR` instead of the original state.
    
    This patch leaves some ambiguity in what triggered the kill since the
    agent-executor protocol does not transport reasons; instead
    the reason is here only inferred after the killed task has
    been observed. This should usually be fine since due to the inherit race
    between e.g., any user- and drain-triggered kill a user cannot
    distinguish racy reasons.
    
    Review: https://reviews.apache.org/r/70936/
---
 src/slave/slave.cpp       | 34 ++++++++++++++++++++++++++++++++++
 src/tests/slave_tests.cpp | 44 +++++++++++++++++++++++++++++++++++---------
 2 files changed, 69 insertions(+), 9 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 3878ab8..37385bd 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5698,6 +5698,40 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
   update.mutable_status()->set_source(
       pid == UPID() ? TaskStatus::SOURCE_SLAVE : TaskStatus::SOURCE_EXECUTOR);
 
+  // If the agent is draining we provide additional
+  // information for KILLING or KILLED states.
+  if (drainConfig.isSome()) {
+    switch (update.status().state()) {
+      case TASK_STAGING:
+      case TASK_STARTING:
+      case TASK_RUNNING:
+      case TASK_FAILED:
+      case TASK_FINISHED:
+      case TASK_ERROR:
+      case TASK_LOST:
+      case TASK_DROPPED:
+      case TASK_UNREACHABLE:
+      case TASK_GONE:
+      case TASK_GONE_BY_OPERATOR:
+      case TASK_UNKNOWN: {
+        break;
+      }
+      case TASK_KILLING:
+      case TASK_KILLED: {
+        // We unconditionally overwrite any previous reason to provide a
+        // consistent signal that this task went away during draining.
+        update.mutable_status()->set_reason(TaskStatus::REASON_SLAVE_DRAINING);
+
+        // If the draining marks the agent as gone report tasks as
+        // gone by operator.
+        if (drainConfig->mark_gone()) {
+          update.mutable_status()->set_state(TASK_GONE_BY_OPERATOR);
+        }
+        break;
+      }
+    }
+  }
+
   // Set TaskStatus.executor_id if not already set; overwrite existing
   // value if already set.
   if (update.has_executor_id()) {
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 5f8e53c..95f7780 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -11989,7 +11989,9 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask)
 
   AWAIT_READY(killedUpdate);
 
-  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+  EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state());
+  EXPECT_EQ(
+      v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason());
 }
 
 
@@ -12108,7 +12110,9 @@ TEST_F(SlaveTest, DrainAgentKillsQueuedTask)
 
   AWAIT_READY(killedUpdate);
 
-  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+  EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state());
+  EXPECT_EQ(
+      v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason());
 }
 
 
@@ -12213,7 +12217,9 @@ TEST_F(SlaveTest, DrainAgentKillsPendingTask)
 
   AWAIT_READY(killedUpdate);
 
-  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+  EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state());
+  EXPECT_EQ(
+      v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason());
 }
 
 
@@ -12228,6 +12234,10 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
 
   slave::Flags slaveFlags = CreateSlaveFlags();
 
+  // Make the executor reregistration timeout less than the agent's
+  // registration backoff factor to avoid resent status updates.
+  slaveFlags.executor_reregistration_timeout = Milliseconds(2);
+
   ExecutorID executorId = DEFAULT_EXECUTOR_ID;
   MockExecutor exec(executorId);
   TestContainerizer containerizer(&exec);
@@ -12253,7 +12263,9 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
   MesosSchedulerDriver driver(
       &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
-  EXPECT_CALL(sched, registered(_, _, _));
+  Future<Nothing> frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(FutureSatisfy(&frameworkId));
 
   Future<vector<Offer>> offers;
   EXPECT_CALL(sched, resourceOffers(_, _))
@@ -12262,6 +12274,8 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
 
   driver.start();
 
+  AWAIT_READY(frameworkId);
+
   AWAIT_READY(offers);
   ASSERT_FALSE(offers->empty());
 
@@ -12280,8 +12294,10 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
   DurationInfo maxGracePeriod;
   maxGracePeriod.set_nanoseconds(GRACE_PERIOD_NANOS);
 
+  // We do not mark the agent as gone in contrast to some other tests here to
+  // validate that we observe `TASK_KILLED` instead of `TASK_GONE_BY_OPERATOR`.
   DrainConfig drainConfig;
-  drainConfig.set_mark_gone(true);
+  drainConfig.set_mark_gone(false);
   drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
 
   DrainSlaveMessage drainSlaveMessage;
@@ -12317,20 +12333,30 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
 
   // Once the agent has finished recovering executors it should send
   // another task kill request to the executor.
-  Future<Nothing> killTask2;
   EXPECT_CALL(exec, killTask(_, _))
-    .WillOnce(FutureSatisfy(&killTask2));
+    .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
 
   // Restart the agent.
   slave.get()->terminate();
+
+  Future<TaskStatus> statusKilled;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusKilled))
+    .WillRepeatedly(Return()); // Ignore resent updates.
+
   slave = StartSlave(&detector, &containerizer, slaveFlags);
 
   AWAIT_READY(reregistered);
 
-  // Advance the clock to finish the executor reregistration phase.
+  // Advance the clock to finish the executor and agent reregistration phases.
   Clock::advance(slaveFlags.executor_reregistration_timeout);
+  Clock::settle();
 
-  AWAIT_READY(killTask2);
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(statusKilled);
+  EXPECT_EQ(TASK_KILLED, statusKilled->state());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_DRAINING, statusKilled->reason());
 }
 
 } // namespace tests {


[mesos] 05/14: Updated an equality operator.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 27f0cd3519bafaf058e8347d482475e776d494e1
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Mon Jul 15 10:25:47 2019 -0700

    Updated an equality operator.
    
    This patch updates the equality operator for the `Task`
    message to include two missing conditions. An equality
    operator for `HealthCheck` is also added to make this
    possible.
    
    Review: https://reviews.apache.org/r/70900/
---
 include/mesos/type_utils.hpp |  1 +
 src/common/type_utils.cpp    | 10 +++++++++-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index ed9190b..b9e6164 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -62,6 +62,7 @@ bool operator==(
 bool operator==(const DiscoveryInfo& left, const DiscoveryInfo& right);
 bool operator==(const Environment& left, const Environment& right);
 bool operator==(const ExecutorInfo& left, const ExecutorInfo& right);
+bool operator==(const HealthCheck& left, const HealthCheck& right);
 bool operator==(const Label& left, const Label& right);
 bool operator==(const Labels& left, const Labels& right);
 bool operator==(const MasterInfo& left, const MasterInfo& right);
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index a7eb0e9..16d6657 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -400,6 +400,12 @@ bool operator!=(const ExecutorInfo& left, const ExecutorInfo& right)
 }
 
 
+bool operator==(const HealthCheck& left, const HealthCheck& right)
+{
+  return google::protobuf::util::MessageDifferencer::Equals(left, right);
+}
+
+
 bool operator==(const MasterInfo& left, const MasterInfo& right)
 {
   return left.id() == right.id() &&
@@ -575,7 +581,9 @@ bool operator==(const Task& left, const Task& right)
     left.status_update_uuid() == right.status_update_uuid() &&
     left.labels() == right.labels() &&
     left.discovery() == right.discovery() &&
-    left.user() == right.user();
+    left.user() == right.user() &&
+    left.container() == right.container() &&
+    left.health_check() == right.health_check();
 }
 
 


[mesos] 13/14: Cleared agent drain state when draining is finished.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 654faf9244b0016f8a17623aca7812923b3a313a
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Mon Jul 15 10:26:23 2019 -0700

    Cleared agent drain state when draining is finished.
    
    Once a draining agent has neither frameworks with pending tasks nor any
    executors with either queued or launched tasks it has finished draining.
    This patch adds handling of that case which clears both the in-memory
    and persisted drain configuration.
    
    Review: https://reviews.apache.org/r/70959/
---
 src/slave/slave.cpp | 31 +++++++++++++++++++++++++++++++
 src/slave/slave.hpp |  4 ++++
 2 files changed, 35 insertions(+)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index eecd71e..2477975 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7067,6 +7067,8 @@ void Slave::removeFramework(Framework* framework)
   // Pass ownership of the framework pointer.
   completedFrameworks.set(framework->id(), Owned<Framework>(framework));
 
+  updateDrainStatus();
+
   if (state == TERMINATING && frameworks.empty()) {
     terminate(self());
   }
@@ -8944,6 +8946,8 @@ void Slave::removeOperation(Operation* operation)
 
   checkpointResourceState(
       totalResources.filter(mesos::needCheckpointing), false);
+
+  updateDrainStatus();
 }
 
 
@@ -9768,6 +9772,33 @@ void Slave::initializeResourceProviderManager(
 }
 
 
+void Slave::updateDrainStatus()
+{
+  if (drainConfig.isNone()) {
+    return;
+  }
+
+  bool drained = operations.empty() && frameworks.empty();
+
+  if (!drained) {
+    return;
+  }
+
+  LOG(INFO) << "Agent finished draining";
+
+  const string drainConfigPath = paths::getDrainConfigPath(metaDir, info.id());
+
+  Try<Nothing> rm = os::rm(drainConfigPath);
+
+  if (rm.isError()) {
+    EXIT(EXIT_FAILURE) << "Could not remove persisted drain configuration "
+                       << "'" << drainConfigPath << "': " << rm.error();
+  }
+
+  drainConfig = None();
+}
+
+
 Framework::Framework(
     Slave* _slave,
     const Flags& slaveFlags,
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 58bdd2a..58a5608 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -910,6 +910,10 @@ private:
   // If the agent is currently draining, contains the configuration used to
   // drain the agent. If NONE, the agent is not currently draining.
   Option<DrainConfig> drainConfig;
+
+  // Check whether draining is finished and possibly remove
+  // both in-memory and persisted drain configuration.
+  void updateDrainStatus();
 };
 
 


[mesos] 10/14: Added recovery of agent drain information.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 54fb43e7fb5fb1884e8f0ba087ae3db8cfc8d498
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Mon Jul 15 10:26:02 2019 -0700

    Added recovery of agent drain information.
    
    With this patch the agent will, after executor reregistration finished,
    replay any active drain information so remaining tasks are drained as
    well. We need to wait until executors had a chance to register so they
    are not terminated should we try to send kill task request before the
    executor has registered.
    
    Review: https://reviews.apache.org/r/70907/
---
 src/slave/slave.cpp       |  12 +++++
 src/slave/state.cpp       |  16 +++++++
 src/slave/state.hpp       |   3 ++
 src/tests/slave_tests.cpp | 117 ++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 148 insertions(+)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 19b4769..3878ab8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5630,6 +5630,16 @@ void Slave::reregisterExecutorTimeout()
     }
   }
 
+  // Replay any active draining.
+  if (drainConfig.isSome()) {
+    DrainSlaveMessage drainSlaveMessage;
+    *drainSlaveMessage.mutable_config() = *drainConfig;
+
+    LOG(INFO) << "Replaying in-process agent draining";
+
+    drain(self(), std::move(drainSlaveMessage));
+  }
+
   // Signal the end of recovery.
   // TODO(greggomann): Allow the agent to complete recovery before the executor
   // re-registration timeout has elapsed. See MESOS-7539
@@ -7512,6 +7522,8 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
     // we can not reuse the id, we will either crash or erase it again.
     info.mutable_id()->CopyFrom(slaveState->info->id());
 
+    drainConfig = slaveState->drainConfig;
+
     // Check for SlaveInfo compatibility.
     Try<Nothing> _compatible =
       compatible(slaveState->info.get(), info);
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index e0a850e..cd3fac7 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -203,6 +203,22 @@ Try<SlaveState> SlaveState::recover(
     state.errors += framework->errors;
   }
 
+  // Recover any drain state.
+  const string drainConfigPath = paths::getDrainConfigPath(rootDir, slaveId);
+  if (os::exists(drainConfigPath)) {
+    Result<DrainConfig> drainConfig = state::read<DrainConfig>(drainConfigPath);
+    if (drainConfig.isError()) {
+      string message = "Failed to read agent state file '"
+                       + drainConfigPath + "': " + drainConfig.error();
+
+      LOG(WARNING) << message;
+      state.errors++;
+    }
+    if (drainConfig.isSome()) {
+      state.drainConfig = *drainConfig;
+    }
+  }
+
   const string resourceStatePath = paths::getResourceStatePath(rootDir);
   if (os::exists(resourceStatePath)) {
     Result<ResourceState> resourceState =
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 45836e5..6d6ae01 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -391,6 +391,9 @@ struct SlaveState
   // state didn't support checkpointing operations.
   Option<std::vector<Operation>> operations;
 
+  // The drain state of the agent, if any.
+  Option<DrainConfig> drainConfig;
+
   unsigned int errors;
 };
 
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 147967d..5f8e53c 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -12216,6 +12216,123 @@ TEST_F(SlaveTest, DrainAgentKillsPendingTask)
   EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
 }
 
+
+// This test verifies that if the agent recovers that it is in
+// draining state any tasks after the restart are killed.
+TEST_F(SlaveTest, CheckpointedDrainInfo)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  MockExecutor exec(executorId);
+  TestContainerizer containerizer(&exec);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(&detector, &containerizer, slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger the agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start a framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  SlaveID slaveId = offers.get()[0].slave_id();
+  TaskInfo task = createTask(
+      slaveId,
+      Resources::parse("cpus:1;mem:32").get(),
+      SLEEP_COMMAND(1000),
+      executorId);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  constexpr int GRACE_PERIOD_NANOS = 1000000;
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(GRACE_PERIOD_NANOS);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(true);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning));
+
+  AWAIT_READY(statusRunning);
+  ASSERT_EQ(TaskState::TASK_RUNNING, statusRunning->state());
+
+  // We expect a request to kill the task when the drain request is initially
+  // received. The executor ignores the request and reregisters after agent
+  // restart.
+  Future<Nothing> killTask1;
+  EXPECT_CALL(exec, killTask(_, _))
+    .WillOnce(FutureSatisfy(&killTask1));
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  AWAIT_READY(killTask1);
+
+  Future<Nothing> reregistered;
+  EXPECT_CALL(exec, reregistered(_, _))
+    .WillOnce(DoAll(
+        Invoke(&exec, &MockExecutor::reregistered),
+        FutureSatisfy(&reregistered)))
+    .WillRepeatedly(DoDefault());
+
+  // Once the agent has finished recovering executors it should send
+  // another task kill request to the executor.
+  Future<Nothing> killTask2;
+  EXPECT_CALL(exec, killTask(_, _))
+    .WillOnce(FutureSatisfy(&killTask2));
+
+  // Restart the agent.
+  slave.get()->terminate();
+  slave = StartSlave(&detector, &containerizer, slaveFlags);
+
+  AWAIT_READY(reregistered);
+
+  // Advance the clock to finish the executor reregistration phase.
+  Clock::advance(slaveFlags.executor_reregistration_timeout);
+
+  AWAIT_READY(killTask2);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[mesos] 03/14: Added test for DrainConfig in agent API outputs.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 04d25afd70bede23f46463d67ef80c6e2c1f8167
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Mon Jul 15 10:25:37 2019 -0700

    Added test for DrainConfig in agent API outputs.
    
    Review: https://reviews.apache.org/r/70836/
---
 src/tests/slave_tests.cpp | 90 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 90 insertions(+)

diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 5d930bd..8098a1a 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -11791,6 +11791,96 @@ TEST_F(SlaveTest, AgentFailoverHTTPExecutorUsingResourceProviderResources)
   AWAIT_READY(executorSubscribed);
 }
 
+
+// When the agent receives a `DrainSlaveMessage` from the master, the agent's
+// drain info should be visible in the agent's API output.
+TEST_F(SlaveTest, DrainInfoInAPIOutputs)
+{
+  Clock::pause();
+
+  const int GRACE_PERIOD_NANOS = 1000000;
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Simulate the master sending a `DrainSlaveMessage` to the agent.
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(GRACE_PERIOD_NANOS);
+
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(true);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+
+  Clock::settle();
+
+  {
+    v1::agent::Call call;
+    call.set_type(v1::agent::Call::GET_AGENT);
+
+    const ContentType contentType = ContentType::PROTOBUF;
+
+    process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    Future<process::http::Response> httpResponse =
+      process::http::post(
+          slave.get()->pid,
+          "api/v1",
+          headers,
+          serialize(contentType, call),
+          stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, httpResponse);
+
+    Future<v1::agent::Response> responseMessage =
+      deserialize<v1::agent::Response>(contentType, httpResponse->body);
+
+    AWAIT_READY(responseMessage);
+    ASSERT_TRUE(responseMessage->IsInitialized());
+    ASSERT_EQ(v1::agent::Response::GET_AGENT, responseMessage->type());
+    ASSERT_TRUE(responseMessage->get_agent().has_drain_config());
+    EXPECT_EQ(
+        drainConfig,
+        devolve(responseMessage->get_agent().drain_config()));
+  }
+
+  {
+    Future<Response> response = process::http::get(
+        slave.get()->pid,
+        "state",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    Try<JSON::Object> state = JSON::parse<JSON::Object>(response->body);
+
+    ASSERT_SOME(state);
+
+    EXPECT_EQ(JSON::protobuf(drainConfig), state->values["drain_config"]);
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[mesos] 09/14: Fixed pid checkpointing for `TestContainerizer`.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 1a32b31496600e3ad5d54d8c4e8497e7ef420b18
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Mon Jul 15 10:26:01 2019 -0700

    Fixed pid checkpointing for `TestContainerizer`.
    
    In order for a `MockExecutor` to be able to reregister after agent
    restart a persisted pid is required. This patch adds checkpointing of
    the pid.
    
    Review: https://reviews.apache.org/r/70906/
---
 src/tests/containerizer.cpp | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/src/tests/containerizer.cpp b/src/tests/containerizer.cpp
index fab7e81..3ac992f 100644
--- a/src/tests/containerizer.cpp
+++ b/src/tests/containerizer.cpp
@@ -163,6 +163,18 @@ public:
               ContentType::PROTOBUF, executor, fullEnvironment));
     }
 
+    // Checkpoint the forked pid if requested by the agent.
+    if (pidCheckpointPath.isSome()) {
+      Try<Nothing> checkpointed = slave::state::checkpoint(
+          pidCheckpointPath.get(), stringify(::getpid()));
+
+      if (checkpointed.isError()) {
+        LOG(ERROR) << "Failed to checkpoint container's forked pid to '"
+                   << pidCheckpointPath.get() << "': " << checkpointed.error();
+        return Failure("Could not checkpoint container's pid");
+      }
+    }
+
     return slave::Containerizer::LaunchResult::SUCCESS;
   }
 


[mesos] 04/14: Refactored the agent's task-killing code.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit ef19f297be6c192f4d2cea0f9ed413a1dfaaf882
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Mon Jul 15 10:25:42 2019 -0700

    Refactored the agent's task-killing code.
    
    This patch factors the code responsible for killing tasks
    out into two helper functions. This will facilitate the
    calling of this common code by the agent-draining handler.
    
    Review: https://reviews.apache.org/r/70899/
---
 src/slave/slave.cpp | 133 +++++++++++++++++++++++++++++++---------------------
 src/slave/slave.hpp |  19 +++++++-
 2 files changed, 97 insertions(+), 55 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fc688dc..741c1f6 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3670,10 +3670,6 @@ void Slave::killTask(
     return;
   }
 
-  CHECK(framework->state == Framework::RUNNING ||
-        framework->state == Framework::TERMINATING)
-    << framework->state;
-
   // We don't send a status update here because a terminating
   // framework cannot send acknowledgements.
   if (framework->state == Framework::TERMINATING) {
@@ -3683,54 +3679,10 @@ void Slave::killTask(
     return;
   }
 
-  // If the task is pending, we send a TASK_KILLED immediately.
-  // This will trigger a synchronous removal of the pending task,
-  // which prevents it from being launched.
-  if (framework->isPending(taskId)) {
-    LOG(WARNING) << "Killing task " << taskId
-                 << " of framework " << frameworkId
-                 << " before it was launched";
-
-    Option<TaskGroupInfo> taskGroup =
-      framework->getTaskGroupForPendingTask(taskId);
-
-    vector<StatusUpdate> updates;
-    if (taskGroup.isSome()) {
-      foreach (const TaskInfo& task, taskGroup->tasks()) {
-        updates.push_back(protobuf::createStatusUpdate(
-            frameworkId,
-            info.id(),
-            task.task_id(),
-            TASK_KILLED,
-            TaskStatus::SOURCE_SLAVE,
-            id::UUID::random(),
-            "A task within the task group was killed before"
-            " delivery to the executor",
-            TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
-            CHECK_NOTNONE(
-                framework->getExecutorIdForPendingTask(task.task_id()))));
-      }
-    } else {
-      updates.push_back(protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          taskId,
-          TASK_KILLED,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "Killed before delivery to the executor",
-          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
-          CHECK_NOTNONE(
-              framework->getExecutorIdForPendingTask(taskId))));
-    }
+  CHECK(framework->state == Framework::RUNNING) << framework->state;
 
-    foreach (const StatusUpdate& update, updates) {
-      // NOTE: Sending a terminal update (TASK_KILLED) synchronously
-      // removes the task/task group from 'framework->pendingTasks'
-      // and 'framework->pendingTaskGroups', so that it will not be
-      // launched.
-      statusUpdate(update, UPID());
-    }
+  if (framework->isPending(taskId)) {
+    killPendingTask(frameworkId, framework, taskId);
 
     return;
   }
@@ -3763,6 +3715,80 @@ void Slave::killTask(
     return;
   }
 
+  kill(frameworkId,
+       framework,
+       executor,
+       taskId,
+       (killTaskMessage.has_kill_policy()
+          ? killTaskMessage.kill_policy()
+          : Option<KillPolicy>::none()));
+}
+
+
+void Slave::killPendingTask(
+    const FrameworkID& frameworkId,
+    Framework* framework,
+    const TaskID& taskId)
+{
+  LOG(WARNING) << "Killing task " << taskId
+               << " of framework " << frameworkId
+               << " before it was launched";
+
+  Option<TaskGroupInfo> taskGroup =
+    framework->getTaskGroupForPendingTask(taskId);
+
+  vector<StatusUpdate> updates;
+  if (taskGroup.isSome()) {
+    foreach (const TaskInfo& task, taskGroup->tasks()) {
+      updates.push_back(protobuf::createStatusUpdate(
+          frameworkId,
+          info.id(),
+          task.task_id(),
+          TASK_KILLED,
+          TaskStatus::SOURCE_SLAVE,
+          id::UUID::random(),
+          "A task within the task group was killed before"
+          " delivery to the executor",
+          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+          CHECK_NOTNONE(
+              framework->getExecutorIdForPendingTask(task.task_id()))));
+    }
+  } else {
+    updates.push_back(protobuf::createStatusUpdate(
+        frameworkId,
+        info.id(),
+        taskId,
+        TASK_KILLED,
+        TaskStatus::SOURCE_SLAVE,
+        id::UUID::random(),
+        "Killed before delivery to the executor",
+        TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+        CHECK_NOTNONE(
+            framework->getExecutorIdForPendingTask(taskId))));
+  }
+
+  foreach (const StatusUpdate& update, updates) {
+    // NOTE: Sending a terminal update (TASK_KILLED) synchronously
+    // removes the task/task group from 'framework->pendingTasks'
+    // and 'framework->pendingTaskGroups', so that it will not be
+    // launched.
+    statusUpdate(update, UPID());
+  }
+}
+
+
+void Slave::kill(
+    const FrameworkID& frameworkId,
+    Framework* framework,
+    Executor* executor,
+    const TaskID& taskId,
+    const Option<KillPolicy>& killPolicy)
+{
+  // This function should only be called on tasks which are queued or launched,
+  // so both the framework and executor should always exist.
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(executor);
+
   switch (executor->state) {
     case Executor::REGISTERING: {
       LOG(WARNING) << "Transitioning the state of task " << taskId
@@ -3888,9 +3914,8 @@ void Slave::killTask(
         KillTaskMessage message;
         message.mutable_framework_id()->MergeFrom(frameworkId);
         message.mutable_task_id()->MergeFrom(taskId);
-        if (killTaskMessage.has_kill_policy()) {
-          message.mutable_kill_policy()->MergeFrom(
-              killTaskMessage.kill_policy());
+        if (killPolicy.isSome()) {
+          message.mutable_kill_policy()->MergeFrom(killPolicy.get());
         }
 
         executor->send(message);
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dbcceed..58bdd2a 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -226,11 +226,28 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor);
 
-  // Made 'virtual' for Slave mocking.
+  // Handler for the `KillTaskMessage`. Made 'virtual' for Slave mocking.
   virtual void killTask(
       const process::UPID& from,
       const KillTaskMessage& killTaskMessage);
 
+  // Helper to kill a pending task, which may or may not be associated with a
+  // valid `Executor` struct.
+  void killPendingTask(
+      const FrameworkID& frameworkId,
+      Framework* framework,
+      const TaskID& taskId);
+
+  // Helper to kill a task belonging to a valid framework and executor. This
+  // function should be used to kill tasks which are queued or launched, but
+  // not tasks which are pending.
+  void kill(
+      const FrameworkID& frameworkId,
+      Framework* framework,
+      Executor* executor,
+      const TaskID& taskId,
+      const Option<KillPolicy>& killPolicy);
+
   // Made 'virtual' for Slave mocking.
   virtual void shutdownExecutor(
       const process::UPID& from,