You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/12/05 22:06:53 UTC

[mesos] 05/06: Set agent and/or resource provider ID in operation status updates.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 9fd65a6f8862e52a6d8c9ed970498f4a8db113f7
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Wed Dec 5 13:03:09 2018 -0800

    Set agent and/or resource provider ID in operation status updates.
    
    This patch sets agent and/or resource provider ID operation status
    update messages. This is not always possible, e.g., some operations
    might fail validation so that no corresponding IDs can be extracted.
    
    Since operations failing validation are currently directly rejected by
    the master without going through a status update manager, they are not
    retried either. If a master status update manager for operations is
    introduced at a later point it should be possible to forward
    acknowledgements for updates to the master's update manager.
    
    Review: https://reviews.apache.org/r/69163/
---
 src/common/protobuf_utils.cpp              |   4 +-
 src/common/protobuf_utils.hpp              |   4 +-
 src/master/master.cpp                      |  74 +++++++++++++----
 src/resource_provider/manager.cpp          |   1 +
 src/resource_provider/storage/provider.cpp |  48 ++++++-----
 src/slave/slave.cpp                        |  41 +++++++--
 src/tests/master_tests.cpp                 | 129 +++++++++++++++++++++++++++++
 src/tests/mesos.hpp                        |   7 +-
 8 files changed, 259 insertions(+), 49 deletions(-)

diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 77139d8..14e582a 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -426,7 +426,9 @@ OperationStatus createOperationStatus(
     const Option<OperationID>& operationId,
     const Option<string>& message,
     const Option<Resources>& convertedResources,
-    const Option<id::UUID>& uuid)
+    const Option<id::UUID>& uuid,
+    const Option<SlaveID>& slaveId,
+    const Option<ResourceProviderID>& resourceProviderId)
 {
   OperationStatus status;
   status.set_state(state);
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 1662125..ca27e1c 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -165,7 +165,9 @@ OperationStatus createOperationStatus(
     const Option<OperationID>& operationId = None(),
     const Option<std::string>& message = None(),
     const Option<Resources>& convertedResources = None(),
-    const Option<id::UUID>& statusUUID = None());
+    const Option<id::UUID>& statusUUID = None(),
+    const Option<SlaveID>& slaveId = None(),
+    const Option<ResourceProviderID>& resourceProviderId = None());
 
 
 Operation createOperation(
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 14f0f12..fc29781 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2275,6 +2275,11 @@ void Master::drop(
     scheduler::Event update;
     update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS);
 
+    // NOTE: We do not attempt to set the agent or resource provider IDs for
+    // dropped operations as we cannot guarantee to always know their values.
+    //
+    // TODO(bbannier): Set agent or resource provider ID if we know
+    // for certain that the operation was valid.
     *update.mutable_update_operation_status()->mutable_status() =
       protobuf::createOperationStatus(
           OperationState::OPERATION_ERROR,
@@ -9146,6 +9151,11 @@ scheduler::Response::ReconcileOperations Master::reconcileOperations(
       slaveId = operation.slave_id();
     }
 
+    Option<ResourceProviderID> resourceProviderId = None();
+    if (operation.has_resource_provider_id()) {
+      resourceProviderId = operation.resource_provider_id();
+    }
+
     Option<Operation*> frameworkOperation =
       framework->getOperation(operation.operation_id());
 
@@ -9163,38 +9173,62 @@ scheduler::Response::ReconcileOperations Master::reconcileOperations(
       *status = protobuf::createOperationStatus(
           OperationState::OPERATION_RECOVERING,
           operation.operation_id(),
-          "Reconciliation: Agent is recovered but has not re-registered");
+          "Reconciliation: Agent is recovered but has not re-registered",
+          None(),
+          None(),
+          slaveId,
+          resourceProviderId);
     } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
       // (3) Operation is unknown, slave is registered: OPERATION_UNKNOWN.
       *status = protobuf::createOperationStatus(
           OperationState::OPERATION_UNKNOWN,
           operation.operation_id(),
-          "Reconciliation: Operation is unknown");
+          "Reconciliation: Operation is unknown",
+          None(),
+          None(),
+          slaveId,
+          resourceProviderId);
     } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) {
       // (4) Operation is unknown, slave is unreachable: OPERATION_UNREACHABLE.
       *status = protobuf::createOperationStatus(
           OperationState::OPERATION_UNREACHABLE,
           operation.operation_id(),
-          "Reconciliation: Agent is unreachable");
+          "Reconciliation: Agent is unreachable",
+          None(),
+          None(),
+          slaveId,
+          resourceProviderId);
     } else if (slaveId.isSome() && slaves.gone.contains(slaveId.get())) {
       // (5) Operation is unknown, slave is gone: OPERATION_GONE_BY_OPERATOR.
       *status = protobuf::createOperationStatus(
           OperationState::OPERATION_GONE_BY_OPERATOR,
           operation.operation_id(),
-          "Reconciliation: Agent marked gone by operator");
+          "Reconciliation: Agent marked gone by operator",
+          None(),
+          None(),
+          slaveId,
+          resourceProviderId);
     } else if (slaveId.isSome()) {
       // (6) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
       *status = protobuf::createOperationStatus(
           OperationState::OPERATION_UNKNOWN,
           operation.operation_id(),
-          "Reconciliation: Both operation and agent are unknown");
+          "Reconciliation: Both operation and agent are unknown",
+          None(),
+          None(),
+          slaveId,
+          resourceProviderId);
     } else {
       // (7) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
       *status = protobuf::createOperationStatus(
           OperationState::OPERATION_UNKNOWN,
           operation.operation_id(),
           "Reconciliation: Operation is unknown and no 'agent_id' was"
-          " provided");
+          " provided",
+          None(),
+          None(),
+          slaveId,
+          resourceProviderId);
     }
   }
 
@@ -11251,18 +11285,22 @@ void Master::_apply(
       ? slave->resourceVersion.get()
       : slave->resourceProviders.get(resourceProviderId.get())->resourceVersion;
 
-    Operation* operation = new Operation(
-        protobuf::createOperation(
-            operationInfo,
-            protobuf::createOperationStatus(
-              OPERATION_PENDING,
-              operationInfo.has_id()
-                ? operationInfo.id()
-                : Option<OperationID>::none()),
-            framework != nullptr
-              ? framework->id()
-              : Option<FrameworkID>::none(),
-            slave->id));
+    Operation* operation = new Operation(protobuf::createOperation(
+        operationInfo,
+        protobuf::createOperationStatus(
+            OPERATION_PENDING,
+            operationInfo.has_id()
+              ? operationInfo.id()
+              : Option<OperationID>::none(),
+            None(),
+            None(),
+            None(),
+            slave->id,
+            resourceProviderId.isSome()
+              ? Some(resourceProviderId.get())
+              : Option<ResourceProviderID>::none()),
+        framework != nullptr ? framework->id() : Option<FrameworkID>::none(),
+        slave->id));
 
     addOperation(framework, slave, operation);
 
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index abd7e38..bb72a5c 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -572,6 +572,7 @@ void ResourceProviderManagerProcess::reconcileOperations(
     if (operation.has_resource_provider_id()) {
       if (!resourceProviders.subscribed.contains(
               operation.resource_provider_id())) {
+        // TODO(bbannier): We should send `OPERATION_UNREACHABLE` here.
         LOG(WARNING) << "Dropping operation reconciliation message with"
                      << " operation_uuid " << operation.operation_uuid()
                      << " because resource provider "
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index e931300..93df4a2 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -1572,7 +1572,12 @@ void StorageLocalResourceProviderProcess::applyOperation(
       protobuf::createOperationStatus(
           OPERATION_PENDING,
           operation.info().has_id()
-            ? operation.info().id() : Option<OperationID>::none()),
+            ? operation.info().id() : Option<OperationID>::none(),
+          None(),
+          None(),
+          None(),
+          slaveId,
+          info.id()),
       frameworkId,
       slaveId,
       protobuf::createUUID(uuid.get()));
@@ -3072,17 +3077,19 @@ void StorageLocalResourceProviderProcess::dropOperation(
 
   UpdateOperationStatusMessage update =
     protobuf::createUpdateOperationStatusMessage(
-       protobuf::createUUID(operationUuid),
-       protobuf::createOperationStatus(
-           OPERATION_DROPPED,
-           operation.isSome() && operation->has_id()
-             ? operation->id() : Option<OperationID>::none(),
-           message,
-           None(),
-           id::UUID::random()),
-       None(),
-       frameworkId,
-       slaveId);
+        protobuf::createUUID(operationUuid),
+        protobuf::createOperationStatus(
+            OPERATION_DROPPED,
+            operation.isSome() && operation->has_id()
+              ? operation->id() : Option<OperationID>::none(),
+            message,
+            None(),
+            id::UUID::random(),
+            slaveId,
+            info.id()),
+        None(),
+        frameworkId,
+        slaveId);
 
   auto die = [=](const string& message) {
     LOG(ERROR)
@@ -3318,14 +3325,15 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
     error = conversions.error();
   }
 
-  operation.mutable_latest_status()->CopyFrom(
-      protobuf::createOperationStatus(
-          error.isNone() ? OPERATION_FINISHED : OPERATION_FAILED,
-          operation.info().has_id()
-            ? operation.info().id() : Option<OperationID>::none(),
-          error.isNone() ? Option<string>::none() : error->message,
-          error.isNone() ? convertedResources : Option<Resources>::none(),
-          id::UUID::random()));
+  operation.mutable_latest_status()->CopyFrom(protobuf::createOperationStatus(
+      error.isNone() ? OPERATION_FINISHED : OPERATION_FAILED,
+      operation.info().has_id()
+        ? operation.info().id() : Option<OperationID>::none(),
+      error.isNone() ? Option<string>::none() : error->message,
+      error.isNone() ? convertedResources : Option<Resources>::none(),
+      id::UUID::random(),
+      slaveId,
+      info.id()));
 
   operation.add_statuses()->CopyFrom(operation.latest_status());
 
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 50a4729..1d5dad5 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -4364,13 +4364,20 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
     return;
   }
 
-  Operation* operation = new Operation(
-      protobuf::createOperation(
-          message.operation_info(),
-          protobuf::createOperationStatus(OPERATION_PENDING, operationId),
-          frameworkId,
+  Operation* operation = new Operation(protobuf::createOperation(
+      message.operation_info(),
+      protobuf::createOperationStatus(
+          OPERATION_PENDING,
+          operationId,
+          None(),
+          None(),
+          None(),
           info.id(),
-          uuid));
+          resourceProviderId.isSome()
+            ? resourceProviderId.get() : Option<ResourceProviderID>::none()),
+      frameworkId,
+      info.id(),
+      uuid));
 
   addOperation(operation);
 
@@ -4401,7 +4408,15 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
   UpdateOperationStatusMessage update =
     protobuf::createUpdateOperationStatusMessage(
         uuid,
-        protobuf::createOperationStatus(OPERATION_FINISHED, operationId),
+        protobuf::createOperationStatus(
+            OPERATION_FINISHED,
+            operationId,
+            None(),
+            None(),
+            None(),
+            info.id(),
+            resourceProviderId.isSome()
+              ? resourceProviderId.get() : Option<ResourceProviderID>::none()),
         None(),
         frameworkId,
         info.id());
@@ -4442,7 +4457,13 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message)
       UpdateOperationStatusMessage update =
         protobuf::createUpdateOperationStatusMessage(
             operation.operation_uuid(),
-            protobuf::createOperationStatus(OPERATION_DROPPED),
+            protobuf::createOperationStatus(
+                OPERATION_DROPPED,
+                None(),
+                None(),
+                None(),
+                None(),
+                info.id()),
             None(),
             None(),
             info.id());
@@ -7794,6 +7815,10 @@ void Slave::handleResourceProviderMessage(
         message->updateOperationStatus->update;
 
       update.mutable_slave_id()->CopyFrom(info.id());
+      update.mutable_status()->mutable_slave_id()->CopyFrom(info.id());
+      if (update.has_latest_status()) {
+        update.mutable_latest_status()->mutable_slave_id()->CopyFrom(info.id());
+      }
 
       const UUID& operationUUID = update.operation_uuid();
 
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 9d5d5a3..4467799 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -9030,6 +9030,135 @@ TEST_F(MasterTest, OperationUpdateDuringFailover)
 }
 
 
+// This test verifies that operation status updates contain the
+// agent ID and resource provider ID of originating providers.
+TEST_F(MasterTest, OperationUpdateResourceProvider)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Register a resource provider with the agent.
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test");
+  resourceProviderInfo.set_name("test");
+
+  v1::Resources resourceProviderResources = v1::createDiskResource(
+      "200", "*", None(), None(), v1::createDiskSourceRaw(None(), "profile"));
+
+  v1::MockResourceProvider resourceProvider(
+      resourceProviderInfo, resourceProviderResources);
+
+  Owned<EndpointDetector> endpointDetector(
+      resource_provider::createEndpointDetector(slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  resourceProvider.start(endpointDetector, ContentType::PROTOBUF);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  const v1::AgentID agentId = evolve(updateSlaveMessage->slave_id());
+
+  ASSERT_TRUE(resourceProvider.info.has_id());
+  const v1::ResourceProviderID resourceProviderId = resourceProvider.info.id();
+
+  // Start a framework to operate on offers.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  v1::scheduler::TestMesos driver(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  AWAIT_READY(subscribed);
+
+  const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+
+  Future<Event::UpdateOperationStatus> updateOperationStatus;
+  EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+    .WillOnce(FutureArg<1>(&updateOperationStatus));
+
+  ASSERT_FALSE(offers->offers().empty());
+  const v1::Offer& offer = offers->offers(0);
+
+  // Perform an operation against the resource provider resources.
+  Option<v1::Resource> resource;
+  foreach (const v1::Resource& resource_, offer.resources()) {
+    if (resource_.has_provider_id()) {
+      resource = resource_;
+      break;
+    }
+  }
+
+  ASSERT_SOME(resource);
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::CREATE_DISK);
+    operation->mutable_id()->set_value("create_disk");
+
+    v1::Offer::Operation::CreateDisk* createDisk =
+      operation->mutable_create_disk();
+    createDisk->mutable_source()->CopyFrom(resource.get());
+    createDisk->set_target_type(v1::Resource::DiskInfo::Source::MOUNT);
+
+    driver.send(call);
+  }
+
+  AWAIT_READY(updateOperationStatus);
+
+  const v1::OperationStatus& status = updateOperationStatus->status();
+  ASSERT_EQ("create_disk", status.operation_id().value());
+
+  ASSERT_TRUE(status.has_agent_id());
+  EXPECT_EQ(agentId, status.agent_id());
+
+  ASSERT_TRUE(status.has_resource_provider_id());
+  EXPECT_EQ(resourceProviderId, status.resource_provider_id());
+}
+
+
 // Tests that the master correctly drops an operation if the operation's 'id'
 // field is set and the operation affects resources not managed by a resource
 // provider.
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index e4a1ab4..60bd6e0 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -3203,7 +3203,12 @@ public:
         break;
     }
 
-    update->mutable_latest_status()->CopyFrom(update->status());
+    if (update->has_status()) {
+      update->mutable_status()->mutable_resource_provider_id()->CopyFrom(
+          info.id());
+
+      update->mutable_latest_status()->CopyFrom(update->status());
+    }
 
     driver->send(call);
   }