You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2018/05/04 05:17:48 UTC

mesos git commit: Remove unknown unreachable tasks when agent reregisters.

Repository: mesos
Updated Branches:
  refs/heads/master a483fb0d0 -> 520b72985

Remove unknown unreachable tasks when agent reregisters.

A RunTaskMesssage could get dropped for an agent while it's
disconnected from the master and when such an agent goes unreachable
then this dropped task message gets added to the unreachable tasks.
When the agent reregisters, the master sends status updates for the
tasks that the agent reported when re-registering and these tasks are
also removed from the unreachableTasks on the framework but since the
agent doesn't know about the dropped task so it doesn't get removed
from the unreachableTasks leading to a check failure when
this inconsistency is detected during framework removal.



Branch: refs/heads/master
Commit: 520b729857223aeade345cbdf61209ec4f395ad9
Parents: a483fb0
Author: Megha Sharma <>
Authored: Thu May 3 22:09:02 2018 -0700
Committer: Jiang Yan Xu <>
Committed: Thu May 3 22:12:17 2018 -0700

 src/master/master.cpp         |  20 +++
 src/master/master.hpp         |   7 +
 src/tests/partition_tests.cpp | 326 +++++++++++++++++++++++++++++++++++++
 3 files changed, 353 insertions(+)
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c117b8c..7a2f69c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1920,6 +1920,7 @@ void Master::_doRegistryGc(
+    slaves.unreachableTasks.erase(slave);
@@ -7342,6 +7343,23 @@ void Master::__reregisterSlave(
+  // All tasks from this agent are now reachable so clean them up from
+  // the master's unreachable task records.
+  if (slaves.unreachableTasks.contains( {
+    foreachkey (FrameworkID frameworkId,
+      {
+      Framework* framework = getFramework(frameworkId);
+      if (framework != nullptr) {
+        foreach (TaskID taskId,
+        {
+          framework->unreachableTasks.erase(taskId);
+        }
+      }
+    }
+  }
+  slaves.unreachableTasks.erase(;
   vector<Resource> checkpointedResources = google::protobuf::convert(
   vector<ExecutorInfo> executorInfos = google::protobuf::convert(
@@ -10889,6 +10907,8 @@ void Master::removeTask(Task* task, bool unreachable)
               << " on agent " << *slave;
+  slaves.unreachableTasks[slave->id].put(task->framework_id(), task->task_id());
   // Remove from framework.
   Framework* framework = getFramework(task->framework_id());
   if (framework != nullptr) { // A framework might not be reregistered yet.
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e4f7b45..067a346 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1960,6 +1960,13 @@ private:
     // `registry_max_agent_age`, and `registry_max_agent_count` flags.
     LinkedHashMap<SlaveID, TimeInfo> unreachable;
+    // This helps us look up all unreachable tasks on an agent so we can remove
+    // them from their primary storage `framework.unreachableTasks` when an
+    // agent reregisters. This map is bounded by the same GC behavior as
+    // `unreachable`. When the agent is GC'd from unreachable it's also
+    // erased from `unreachableTasks`.
+    hashmap<SlaveID, multihashmap<FrameworkID, TaskID>> unreachableTasks;
     // Slaves that have been marked gone. We recover this from the
     // registry, so it includes slaves marked as gone by other instances
     // of the master. Note that we use a LinkedHashMap to ensure the order
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 606025a..390b7c8 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -163,6 +163,332 @@ TEST_F(PartitionTest, PartitionedSlave)
+// This test verifies that if a `RunTaskMessage` dropped en route to
+// an agent which later becomes unreachable, the task is removed correctly
+// from the master's unreachable task records when the agent reregisters.
+    PartitionTest,
+    ReregisterPartitionedAgentWithEnrouteTask)
+  Clock::pause();
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the `SlaveObserver` process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+  StandaloneMasterDetector detector(master.get()->pid);
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, agentFlags);
+  ASSERT_SOME(slave);
+  // Start a scheduler. The scheduler has the PARTITION_AWARE
+  // capability, so we expect its tasks to continue running when the
+  // partitioned agent reregisters.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+  FrameworkInfo::Capability::PARTITION_AWARE);
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+  EXPECT_CALL(sched, registered(&driver, _, _));
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+  driver.start();
+  Clock::advance(agentFlags.registration_backoff_factor);
+  Clock::advance(masterFlags.allocation_interval);
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+  const Offer& offer = offers.get()[0];
+  TaskInfo task = createTask(offer, "sleep 60");
+  // Drop the RunTaskMessage.
+  Future<RunTaskMessage> runTaskMessage =
+    DROP_PROTOBUF(RunTaskMessage(), master.get()->pid, slave.get()->pid);
+  driver.launchTasks(, {task});
+  AWAIT_READY(runTaskMessage);
+  const SlaveID& slaveId = offer.slave_id();
+  // Now, induce a partition of the slave by having the master
+  // timeout the slave.
+  Future<TaskStatus> unreachableStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&unreachableStatus));
+  // We expect to get a `slaveLost` callback, even though this
+  // scheduler is partition-aware.
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+  Clock::advance(masterFlags.agent_ping_timeout);
+  AWAIT_READY(unreachableStatus);
+  EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus->state());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, unreachableStatus->reason());
+  EXPECT_EQ(task.task_id(), unreachableStatus->task_id());
+  EXPECT_EQ(slaveId, unreachableStatus->slave_id());
+  AWAIT_READY(slaveLost);
+  {
+    JSON::Object stats = Metrics();
+    EXPECT_EQ(0, stats.values["master/tasks_lost"]);
+    EXPECT_EQ(1, stats.values["master/tasks_unreachable"]);
+    EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]);
+    EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]);
+    EXPECT_EQ(1, stats.values["master/slave_removals"]);
+    EXPECT_EQ(1, stats.values["master/slave_removals/reason_unhealthy"]);
+  }
+  // Check the master's "/state" endpoint. There should be a single
+  // unreachable agent. The "tasks" and "completed_tasks" fields
+  // should be empty; there should be a single unreachable task.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "state",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+    EXPECT_EQ(0, parse->values["activated_slaves"].as<JSON::Number>());
+    EXPECT_EQ(0, parse->values["deactivated_slaves"].as<JSON::Number>());
+    EXPECT_EQ(1, parse->values["unreachable_slaves"].as<JSON::Number>());
+    EXPECT_TRUE(parse->values["orphan_tasks"].as<JSON::Array>().values.empty());
+    JSON::Array frameworks = parse->values["frameworks"].as<JSON::Array>();
+    ASSERT_EQ(1u, frameworks.values.size());
+    JSON::Object framework = frameworks.values.front().as<JSON::Object>();
+    JSON::Array completedTasks =
+      framework.values["completed_tasks"].as<JSON::Array>();
+    EXPECT_TRUE(completedTasks.values.empty());
+    JSON::Array runningTasks = framework.values["tasks"].as<JSON::Array>();
+    EXPECT_TRUE(runningTasks.values.empty());
+    JSON::Array unreachableTasks =
+    framework.values["unreachable_tasks"].as<JSON::Array>();
+    ASSERT_EQ(1u, unreachableTasks.values.size());
+    JSON::Object unreachableTask =
+      unreachableTasks.values.front().as<JSON::Object>();
+    task.task_id(), unreachableTask.values["id"].as<JSON::String>().value);
+        unreachableTask.values["state"].as<JSON::String>().value);
+  }
+  // Check the master's "/tasks" endpoint.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "tasks",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+    JSON::Array tasks = parse->values["tasks"].as<JSON::Array>();
+    ASSERT_EQ(1u, tasks.values.size());
+    JSON::Object jsonTask = tasks.values.front().as<JSON::Object>();
+    task.task_id(), jsonTask.values["id"].as<JSON::String>().value);
+        jsonTask.values["state"].as<JSON::String>().value);
+  }
+  // Check the master's "/state-summary" endpoint.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "state-summary",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+    JSON::Array frameworks = parse->values["frameworks"].as<JSON::Array>();
+    ASSERT_EQ(1u, frameworks.values.size());
+    JSON::Object framework = frameworks.values.front().as<JSON::Object>();
+    EXPECT_EQ(0, framework.values["TASK_LOST"].as<JSON::Number>());
+    EXPECT_EQ(0, framework.values["TASK_RUNNING"].as<JSON::Number>());
+    EXPECT_EQ(1, framework.values["TASK_UNREACHABLE"].as<JSON::Number>());
+  }
+  // We now complete the partition on the slave side as well. We
+  // simulate a master loss event, which would normally happen during
+  // a network partition. The slave should then reregister with the
+  // master.
+  detector.appoint(None());
+  Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
+      SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
+  detector.appoint(master.get()->pid);
+  Clock::advance(agentFlags.registration_backoff_factor);
+  AWAIT_READY(slaveReregistered);
+  Clock::resume();
+  // Check the master's "/state" endpoint. The "unreachable_tasks" and
+  // "completed_tasks" fields should be empty.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "state",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+    EXPECT_TRUE(parse->values["orphan_tasks"].as<JSON::Array>().values.empty());
+    JSON::Array frameworks = parse->values["frameworks"].as<JSON::Array>();
+    ASSERT_EQ(1u, frameworks.values.size());
+    JSON::Object framework = frameworks.values.front().as<JSON::Object>();
+    JSON::Array completedTasks =
+    framework.values["completed_tasks"].as<JSON::Array>();
+    EXPECT_TRUE(completedTasks.values.empty());
+    JSON::Array unreachableTasks =
+    framework.values["unreachable_tasks"].as<JSON::Array>();
+    EXPECT_TRUE(unreachableTasks.values.empty());
+    JSON::Array tasks = framework.values["tasks"].as<JSON::Array>();
+    EXPECT_TRUE(tasks.values.empty());
+  }
+  // Check the master's "/tasks" endpoint.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "tasks",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+    JSON::Array tasks = parse->values["tasks"].as<JSON::Array>();
+    ASSERT_TRUE(tasks.values.empty());
+  }
+  // Check the master's "/state-summary" endpoint.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "state-summary",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+    JSON::Array frameworks = parse->values["frameworks"].as<JSON::Array>();
+    ASSERT_EQ(1u, frameworks.values.size());
+    JSON::Object framework = frameworks.values.front().as<JSON::Object>();
+    EXPECT_EQ(0, framework.values["TASK_LOST"].as<JSON::Number>());
+    EXPECT_EQ(0, framework.values["TASK_UNREACHABLE"].as<JSON::Number>());
+    EXPECT_EQ(0, framework.values["TASK_RUNNING"].as<JSON::Number>());
+  }
+  {
+    JSON::Object stats = Metrics();
+    EXPECT_EQ(0, stats.values["master/tasks_running"]);
+    EXPECT_EQ(0, stats.values["master/tasks_unreachable"]);
+    EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]);
+    EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]);
+    EXPECT_EQ(1, stats.values["master/slave_removals"]);
+    EXPECT_EQ(1, stats.values["master/slave_removals/reason_unhealthy"]);
+    EXPECT_EQ(0, stats.values["master/slave_removals/reason_unregistered"]);
+  }
+  driver.stop();
+  driver.join();
 // This test checks that a slave can reregister with the master after
 // a partition, and that PARTITION_AWARE tasks running on the slave
 // continue to run.