You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2019/08/21 00:44:03 UTC

[mesos] branch 1.8.x updated (35bfd8a -> 13e4cd1)

This is an automated email from the ASF dual-hosted git repository.

grag pushed a change to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 35bfd8a  Added MESOS-9925 to 1.8.2 CHANGELOG.
     new 6f90cc3  Fixed a memory leak in the master's 'removeTask()' helper.
     new 13e4cd1  Transitioned tasks when an unreachable agent is marked as gone.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/master/http.cpp     |  10 +--
 src/master/master.cpp   | 105 +++++++++++++++++++++++---
 src/master/master.hpp   |   2 +-
 src/tests/api_tests.cpp | 196 ++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 293 insertions(+), 20 deletions(-)


[mesos] 02/02: Transitioned tasks when an unreachable agent is marked as gone.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 13e4cd1c42ae88094f14d6b05cfb9832d4494193
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Tue Apr 23 22:25:29 2019 -0700

    Transitioned tasks when an unreachable agent is marked as gone.
    
    This patch updates the master code responsible for marking
    agents as gone to properly transition tasks on agents which
    were previously marked as unreachable.
    
    Review: https://reviews.apache.org/r/70519/
---
 src/master/http.cpp     |  10 +--
 src/master/master.cpp   | 100 +++++++++++++++++++++---
 src/master/master.hpp   |   2 +-
 src/tests/api_tests.cpp | 196 ++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 289 insertions(+), 19 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index e7a92d0..765bbf1 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -4171,15 +4171,7 @@ Future<Response> Master::Http::_markAgentGone(const SlaveID& slaveId) const
                  << registrarResult.failure();
     }
 
-    Slave* slave = master->slaves.registered.get(slaveId);
-
-    // This can happen if the agent that is being marked as
-    // gone is not currently registered (unreachable/recovered).
-    if (slave == nullptr) {
-      return;
-    }
-
-    master->markGone(slave, goneTime);
+    master->markGone(slaveId, goneTime);
   }));
 
   return gone.then([]() -> Future<Response> {
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 9730e65..c9b0a38 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -9246,18 +9246,100 @@ void Master::_markUnreachable(
 }
 
 
-void Master::markGone(Slave* slave, const TimeInfo& goneTime)
+void Master::markGone(const SlaveID& slaveId, const TimeInfo& goneTime)
 {
-  CHECK_NOTNULL(slave);
-  CHECK(slaves.markingGone.contains(slave->info.id()));
-  slaves.markingGone.erase(slave->info.id());
+  CHECK(slaves.markingGone.contains(slaveId));
+
+  slaves.markingGone.erase(slaveId);
+
+  slaves.gone[slaveId] = goneTime;
+
+  const string message = "Agent has been marked gone";
+
+  Slave* slave = slaves.registered.get(slaveId);
 
-  slaves.gone[slave->id] = goneTime;
+  // If the `Slave` struct does not exist, then the agent
+  // must be either recovered or unreachable.
+  if (slave == nullptr) {
+    CHECK(slaves.recovered.contains(slaveId) ||
+          slaves.unreachable.contains(slaveId));
+
+    // When a recovered agent is marked gone, we have no task metadata to use in
+    // order to send task status updates. We could retain this agent ID and send
+    // updates upon reregistration but do not currently do this. See MESOS-9739.
+    if (slaves.recovered.contains(slaveId)) {
+      return;
+    }
+
+    slaves.unreachable.erase(slaveId);
+
+    // TODO(vinod): Consider moving these tasks into `completedTasks` by
+    // transitioning them to a terminal state and sending status updates.
+    // But it's not clear what this state should be. If a framework
+    // reconciles these tasks after this point it would get `TASK_UNKNOWN`
+    // which seems appropriate but we don't keep tasks in this state in-memory.
+    if (slaves.unreachableTasks.contains(slaveId)) {
+      foreachkey (const FrameworkID& frameworkId,
+                  slaves.unreachableTasks.at(slaveId)) {
+        Framework* framework = getFramework(frameworkId);
+        if (framework == nullptr) {
+          continue;
+        }
+
+        TaskState newTaskState = TASK_GONE_BY_OPERATOR;
+        TaskStatus::Reason newTaskReason =
+          TaskStatus::REASON_SLAVE_REMOVED_BY_OPERATOR;
+
+        if (!framework->capabilities.partitionAware) {
+          newTaskState = TASK_LOST;
+          newTaskReason = TaskStatus::REASON_SLAVE_REMOVED;
+        }
+
+        foreach (const TaskID& taskId,
+                 slaves.unreachableTasks.at(slaveId).get(frameworkId)) {
+          if (framework->unreachableTasks.contains(taskId)) {
+            const Owned<Task>& task = framework->unreachableTasks.at(taskId);
+
+            const StatusUpdate& update = protobuf::createStatusUpdate(
+                task->framework_id(),
+                task->slave_id(),
+                task->task_id(),
+                newTaskState,
+                TaskStatus::SOURCE_MASTER,
+                None(),
+                message,
+                newTaskReason,
+                (task->has_executor_id()
+                   ? Option<ExecutorID>(task->executor_id())
+                   : None()));
+
+            updateTask(task.get(), update);
+
+            if (!framework->connected()) {
+              LOG(WARNING) << "Dropping update " << update
+                           << " for disconnected "
+                           << " framework " << frameworkId;
+            } else {
+              forward(update, UPID(), framework);
+            }
+
+            // Move task from unreachable map to completed map.
+            framework->addCompletedTask(std::move(*task));
+            framework->unreachableTasks.erase(taskId);
+          }
+        }
+      }
+
+      slaves.unreachableTasks.erase(slaveId);
+    }
+
+    return;
+  }
 
   // Shutdown the agent if it transitioned to gone.
-  ShutdownMessage message;
-  message.set_message("Agent has been marked gone");
-  send(slave->pid, message);
+  ShutdownMessage shutdownMessage;
+  shutdownMessage.set_message(message);
+  send(slave->pid, shutdownMessage);
 
   // Notify frameworks that their operations have been transitioned
   // to status `OPERATION_GONE_BY_OPERATOR`.
@@ -9266,7 +9348,7 @@ void Master::markGone(Slave* slave, const TimeInfo& goneTime)
     OperationState::OPERATION_GONE_BY_OPERATOR,
     "Agent has been marked gone");
 
-  __removeSlave(slave, "Agent has been marked gone", None());
+  __removeSlave(slave, message, None());
 }
 
 
diff --git a/src/master/master.hpp b/src/master/master.hpp
index ed83167..27d4de8 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -475,7 +475,7 @@ public:
       bool duringMasterFailover,
       const std::string& message);
 
-  void markGone(Slave* slave, const TimeInfo& goneTime);
+  void markGone(const SlaveID& slaveId, const TimeInfo& goneTime);
 
   void authenticate(
       const process::UPID& from,
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 539c704..8c3d534 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -110,6 +110,7 @@ using std::tuple;
 using std::vector;
 
 using testing::_;
+using testing::AllOf;
 using testing::AtMost;
 using testing::DoAll;
 using testing::Eq;
@@ -5064,6 +5065,201 @@ TEST_P(MasterAPITest, TaskUpdatesUponAgentGone)
 }
 
 
+// This test verifies that the master correctly sends 'TASK_GONE_BY_OPERATOR'
+// status updates and transitions unreachable tasks to completed when an
+// unreachable agent which was running the tasks is marked as gone.
+TEST_P(MasterAPITest, UnreachableAgentMarkedGone)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  // Launch a task on this agent so that agent removal will cause
+  // the master to look for the framework struct.
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return());
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(runningUpdate);
+
+  Future<v1::scheduler::Event::Update> unreachableUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo),
+            TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE))))
+    .WillOnce(FutureArg<1>(&unreachableUpdate));
+
+  EXPECT_CALL(*scheduler, failure(_, _));
+
+  // Detect pings from master to agent.
+  Future<process::Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  // Drop all PONGs to simulate agent partition.
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  // Advance the clock to produce a ping.
+  Clock::advance(masterFlags.agent_ping_timeout);
+
+  // Now advance through enough pings to mark the agent unreachable.
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+
+  AWAIT_READY(unreachableUpdate);
+
+  Future<v1::scheduler::Event::Update> goneUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo),
+            TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR))))
+    .WillOnce(FutureArg<1>(&goneUpdate));
+
+  ContentType contentType = GetParam();
+
+  // Mark the agent as gone. This should result in the master sending
+  // a 'TASK_GONE_BY_OPERATOR' update for the running task.
+  {
+    v1::master::Call v1Call;
+    v1Call.set_type(v1::master::Call::MARK_AGENT_GONE);
+
+    v1::master::Call::MarkAgentGone* markAgentGone =
+      v1Call.mutable_mark_agent_gone();
+
+    markAgentGone->mutable_agent_id()->CopyFrom(agentId);
+
+    Future<http::Response> response = http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(contentType, v1Call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  }
+
+  AWAIT_READY(goneUpdate);
+
+  // GetState after agent is marked gone to ensure that the previously
+  // unreachable task has been moved to completed.
+  {
+    v1::master::Call v1Call;
+    v1Call.set_type(v1::master::Call::GET_STATE);
+
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+    const v1::master::Response::GetState& getState = v1Response->get_state();
+    ASSERT_TRUE(getState.get_tasks().unreachable_tasks().empty());
+    ASSERT_EQ(1, getState.get_tasks().completed_tasks_size());
+    ASSERT_EQ(
+        taskInfo.task_id(),
+        getState.get_tasks().completed_tasks(0).task_id());
+  }
+
+  Clock::resume();
+}
+
+
 // This test verifies that the master correctly sends
 // `OPERATION_GONE_BY_OPERATOR` status updates for operations
 // that are pending when the agent is marked as gone.


[mesos] 01/02: Fixed a memory leak in the master's 'removeTask()' helper.

Posted by gr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 6f90cc334701fad10e721312cd4cbd0690e1c6ec
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Tue Apr 23 22:25:21 2019 -0700

    Fixed a memory leak in the master's 'removeTask()' helper.
    
    Previously, all removed tasks were added to the
    `slaves.unreachableTasks` map. This patch adds a conditional
    so that removed tasks are only added to that structure when
    they are being marked unreachable.
    
    Review: https://reviews.apache.org/r/70518/
---
 src/master/master.cpp | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5488b7b..9730e65 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -11780,7 +11780,10 @@ void Master::removeTask(Task* task, bool unreachable)
               << " on agent " << *slave;
   }
 
-  slaves.unreachableTasks[slave->id].put(task->framework_id(), task->task_id());
+  if (unreachable) {
+    slaves.unreachableTasks[slave->id].put(
+        task->framework_id(), task->task_id());
+  }
 
   // Remove from framework.
   Framework* framework = getFramework(task->framework_id());