You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/12/11 23:39:30 UTC

[mesos] branch master updated: Added an operation status update manager to the agent.

This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 18356bf  Added an operation status update manager to the agent.
18356bf is described below

commit 18356bf3f4ac730b4a798261aad042555c4a4834
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Tue Dec 11 13:09:20 2018 -0800

    Added an operation status update manager to the agent.
    
    This patch adds an operation status update manager to the agent
    in order to handle updates for operations on agent default
    resources. A new test is also added which verifies that such
    updates are retried.
    
    Later patches will integrate this status update manager with
    the agent's checkpointing/recovery code.
    
    Review: https://reviews.apache.org/r/69505/
---
 src/master/master.cpp     | 16 +++++---
 src/slave/slave.cpp       | 71 +++++++++++++++++++++++++++++++----
 src/slave/slave.hpp       |  8 ++++
 src/tests/slave_tests.cpp | 96 +++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 179 insertions(+), 12 deletions(-)

diff --git a/src/master/master.cpp b/src/master/master.cpp
index ae5b240..3de0fd3 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8551,15 +8551,21 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
       Result<ResourceProviderID> resourceProviderId =
         getResourceProviderId(operation->info());
 
-      // TODO(greggomann): Remove this CHECK once the agent is sending reliable
-      // updates for operations on its default resources. See MESOS-8194.
-      CHECK_SOME(resourceProviderId);
+      CHECK(!resourceProviderId.isError())
+        << "Could not determine resource provider of operation with no ID"
+        << (frameworkId.isSome()
+              ? " from framework " + stringify(frameworkId.get())
+              : " from an operator")
+        << ": " << resourceProviderId.error();
 
       AcknowledgeOperationStatusMessage acknowledgement;
       acknowledgement.mutable_status_uuid()->CopyFrom(latestStatus.uuid());
       acknowledgement.mutable_operation_uuid()->CopyFrom(operation->uuid());
-      acknowledgement.mutable_resource_provider_id()->CopyFrom(
-          resourceProviderId.get());
+
+      if (resourceProviderId.isSome()) {
+        acknowledgement.mutable_resource_provider_id()->CopyFrom(
+            resourceProviderId.get());
+      }
 
       CHECK(slave->capabilities.resourceProvider);
 
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e13b955..8467a60 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -635,10 +635,18 @@ void Slave::initialize()
   taskStatusUpdateManager->initialize(defer(self(), &Slave::forward, lambda::_1)
     .operator std::function<void(StatusUpdate)>());
 
-  // We pause the task status update manager so that it doesn't forward any
-  // updates while the slave is still recovering. It is unpaused/resumed when
-  // the slave (re-)registers with the master.
+  // We pause the status update managers so that they don't forward any updates
+  // while the agent is still recovering. They are unpaused/resumed when the
+  // agent (re-)registers with the master.
   taskStatusUpdateManager->pause();
+  operationStatusUpdateManager.pause();
+
+  operationStatusUpdateManager.initialize(
+      defer(self(), &Self::sendOperationStatusUpdate, lambda::_1),
+      std::bind(
+          &slave::paths::getOperationUpdatesPath,
+          metaDir,
+          lambda::_1));
 
   // Start disk monitoring.
   // NOTE: We send a delayed message here instead of directly calling
@@ -1237,6 +1245,7 @@ void Slave::detected(const Future<Option<MasterInfo>>& _master)
 
   // Pause the status updates.
   taskStatusUpdateManager->pause();
+  operationStatusUpdateManager.pause();
 
   if (_master.isFailed()) {
     EXIT(EXIT_FAILURE) << "Failed to detect a master: " << _master.failure();
@@ -1492,6 +1501,7 @@ void Slave::registered(
       Clock::cancel(agentRegistrationTimer);
 
       taskStatusUpdateManager->resume(); // Resume status updates.
+      operationStatusUpdateManager.resume();
 
       info.mutable_id()->CopyFrom(slaveId); // Store the slave id.
 
@@ -1590,6 +1600,7 @@ void Slave::reregistered(
       LOG(INFO) << "Re-registered with master " << master.get();
       state = RUNNING;
       taskStatusUpdateManager->resume(); // Resume status updates.
+      operationStatusUpdateManager.resume();
 
       // We start the local resource providers daemon once the agent is
       // running, so the resource providers can use the agent API.
@@ -4397,7 +4408,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
             operationId,
             None(),
             None(),
-            None(),
+            id::UUID::random(),
             info.id(),
             resourceProviderId.isSome()
               ? resourceProviderId.get() : Option<ResourceProviderID>::none()),
@@ -4409,9 +4420,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
 
   removeOperation(operation);
 
-  // TODO(nfnt): Use the status update manager to reliably send
-  // this message to the master.
-  send(master.get(), update);
+  operationStatusUpdateManager.update(update);
 }
 
 
@@ -5804,6 +5813,54 @@ void Slave::forward(StatusUpdate update)
 }
 
 
+void Slave::sendOperationStatusUpdate(
+    const UpdateOperationStatusMessage& update)
+{
+  const UUID& operationUUID = update.operation_uuid();
+
+  Operation* operation = getOperation(operationUUID);
+
+  // TODO(greggomann): Make a note here of which cases may lead to
+  // the operation being unknown by the agent.
+  if (operation != nullptr) {
+    updateOperation(operation, update);
+  }
+
+  switch (state) {
+    case RECOVERING:
+    case DISCONNECTED:
+    case TERMINATING: {
+      LOG(WARNING)
+        << "Dropping status update of operation"
+        << (update.status().has_operation_id()
+             ? " '" + stringify(update.status().operation_id()) + "'"
+             : " with no ID")
+        << " (operation_uuid: " << operationUUID << ")"
+        << (update.has_framework_id()
+             ? " for framework " + stringify(update.framework_id())
+             : " for an operator API call")
+        << " because agent is in " << state << " state";
+      break;
+    }
+    case RUNNING: {
+      LOG(INFO)
+        << "Forwarding status update of"
+        << (operation == nullptr ? " unknown" : "") << " operation"
+        << (update.status().has_operation_id()
+             ? " '" + stringify(update.status().operation_id()) + "'"
+             : " with no ID")
+        << " (operation_uuid: " << operationUUID << ")"
+        << (update.has_framework_id()
+             ? " for framework " + stringify(update.framework_id())
+             : " for an operator API call");
+
+      send(master.get(), update);
+      break;
+    }
+  }
+}
+
+
 void Slave::executorMessage(
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index edf7269..d57aed8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -87,6 +87,8 @@
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
 
+#include "status_update_manager/operation.hpp"
+
 // `REGISTERING` is used as an enum value, but it's actually defined as a
 // constant in the Windows SDK.
 #ifdef __WINDOWS__
@@ -348,6 +350,10 @@ public:
   // added to the update before forwarding.
   void forward(StatusUpdate update);
 
+  // This is called by the operation status update manager to forward operation
+  // status updates to the master.
+  void sendOperationStatusUpdate(const UpdateOperationStatusMessage& update);
+
   void statusUpdateAcknowledgement(
       const process::UPID& from,
       const SlaveID& slaveId,
@@ -761,6 +767,8 @@ private:
 
   TaskStatusUpdateManager* taskStatusUpdateManager;
 
+  OperationStatusUpdateManager operationStatusUpdateManager;
+
   // Master detection future.
   process::Future<Option<MasterInfo>> detection;
 
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 1e91871..4aed5d6 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -6455,6 +6455,102 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)
 }
 
 
+// The agent's operation status update manager should retry updates for
+// operations on agent default resources. Here we drop the first such update and
+// verify that the update is sent again after the retry interval elapses.
+TEST_F(SlaveTest, UpdateOperationStatusRetry)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.clear_roles();
+  frameworkInfo.add_roles("test-role");
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      contentType,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::Resources unreserved = offer.resources();
+  v1::Resources reserved = unreserved.pushReservation(
+      v1::createDynamicReservationInfo(
+          frameworkInfo.roles(0),
+          frameworkInfo.principal()));
+
+  v1::Offer::Operation reserve = v1::RESERVE(reserved);
+
+  Future<ApplyOperationMessage> applyOperationMessage =
+    FUTURE_PROTOBUF(ApplyOperationMessage(), _, slave.get()->pid);
+
+  // Drop the first operation status update.
+  Future<UpdateOperationStatusMessage> droppedOperation =
+    DROP_PROTOBUF(UpdateOperationStatusMessage(), _, master.get()->pid);
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {reserve}));
+
+  AWAIT_READY(applyOperationMessage);
+  UUID operationUuid = applyOperationMessage->operation_uuid();
+
+  AWAIT_READY(droppedOperation);
+  ASSERT_EQ(droppedOperation->operation_uuid(), operationUuid);
+
+  // Confirm that the agent retries the update.
+  Future<UpdateOperationStatusMessage> updateOperationStatusMessage =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, master.get()->pid);
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+  AWAIT_READY(updateOperationStatusMessage);
+  ASSERT_EQ(updateOperationStatusMessage->operation_uuid(), operationUuid);
+
+  Clock::resume();
+}
+
+
 // This test verifies that the slave should properly handle the case
 // where the containerizer usage call fails when getting the usage
 // information.