You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2019/01/18 10:02:47 UTC

[mesos] 01/02: Added agent support to remove local resource providers.

This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit c5a11ce7adcad2f024b64d5cfc678ac678e1d5df
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Fri Jan 18 09:39:43 2019 +0100

    Added agent support to remove local resource providers.
    
    This patch adds support for triggering permanent removal of local
    resource providers. We also add authorization and tests as part of this
    patch.
    
    Review: https://reviews.apache.org/r/68147/
---
 include/mesos/agent/agent.proto    |   9 +++
 include/mesos/v1/agent/agent.proto |   9 +++
 src/common/protobuf_utils.cpp      |  12 ++--
 src/master/master.cpp              |   6 +-
 src/slave/http.cpp                 |  35 ++++++++++
 src/slave/http.hpp                 |   4 ++
 src/slave/slave.cpp                | 117 +++++++++++++++++++++++++++++++-
 src/slave/slave.hpp                |   3 +
 src/slave/validation.cpp           |   8 +++
 src/tests/api_tests.cpp            | 133 +++++++++++++++++++++++++++++++++++++
 10 files changed, 324 insertions(+), 12 deletions(-)

diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 74488e8..ff408a4 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -97,6 +97,8 @@ message Call {
     UPDATE_RESOURCE_PROVIDER_CONFIG = 28; // See 'UpdateResourceProviderConfig' below. // NOLINT
     REMOVE_RESOURCE_PROVIDER_CONFIG = 29; // See 'RemoveResourceProviderConfig' below. // NOLINT
 
+    MARK_RESOURCE_PROVIDER_GONE = 32; // See 'MarkResourceProviderGone' below.
+
     // Prune unused container images.
     PRUNE_IMAGES = 30;
   }
@@ -363,6 +365,11 @@ message Call {
     required string name = 2;
   }
 
+  // Mark a resource provider as gone.
+  message MarkResourceProviderGone {
+    required ResourceProviderID resource_provider_id = 1;
+  }
+
   // Prune unused container images from image store.
   //
   // Images and layers referenced by active containers as well as
@@ -400,6 +407,8 @@ message Call {
   optional UpdateResourceProviderConfig update_resource_provider_config = 18;
   optional RemoveResourceProviderConfig remove_resource_provider_config = 19;
 
+  optional MarkResourceProviderGone mark_resource_provider_gone = 22;
+
   optional PruneImages prune_images = 21;
 }
 
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index 5d1ab6f..19d6c42 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -97,6 +97,8 @@ message Call {
     UPDATE_RESOURCE_PROVIDER_CONFIG = 28; // See 'UpdateResourceProviderConfig' below. // NOLINT
     REMOVE_RESOURCE_PROVIDER_CONFIG = 29; // See 'RemoveResourceProviderConfig' below. // NOLINT
 
+    MARK_RESOURCE_PROVIDER_GONE = 32; // See 'MarkResourceProviderGone' below.
+
     // Prune unused container images.
     PRUNE_IMAGES = 30;
   }
@@ -363,6 +365,11 @@ message Call {
     required string name = 2;
   }
 
+  // Mark a resource provider as gone.
+  message MarkResourceProviderGone {
+    required ResourceProviderID resource_provider_id = 1;
+  }
+
   // Prune unused container images from image store.
   //
   // Images and layers referenced by active containers as well as
@@ -400,6 +407,8 @@ message Call {
   optional UpdateResourceProviderConfig update_resource_provider_config = 18;
   optional RemoveResourceProviderConfig remove_resource_provider_config = 19;
 
+  optional MarkResourceProviderGone mark_resource_provider_gone = 22;
+
   optional PruneImages prune_images = 21;
 }
 
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index a0159fe..9a03c35 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -421,17 +421,17 @@ Option<ContainerStatus> getTaskContainerStatus(const Task& task)
 bool isTerminalState(const OperationState& state)
 {
   switch (state) {
-    case OPERATION_FINISHED:
-    case OPERATION_FAILED:
-    case OPERATION_ERROR:
     case OPERATION_DROPPED:
+    case OPERATION_ERROR:
+    case OPERATION_FAILED:
+    case OPERATION_FINISHED:
+    case OPERATION_GONE_BY_OPERATOR:
       return true;
-    case OPERATION_UNSUPPORTED:
     case OPERATION_PENDING:
-    case OPERATION_UNREACHABLE:
-    case OPERATION_GONE_BY_OPERATOR:
     case OPERATION_RECOVERING:
     case OPERATION_UNKNOWN:
+    case OPERATION_UNREACHABLE:
+    case OPERATION_UNSUPPORTED:
       return false;
   }
 
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7c8d3ce..2e0a0de 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -11460,9 +11460,10 @@ void Master::updateOperation(
     }
 
     // Terminal state, and the conversion has failed.
-    case OPERATION_FAILED:
+    case OPERATION_DROPPED:
     case OPERATION_ERROR:
-    case OPERATION_DROPPED: {
+    case OPERATION_FAILED:
+    case OPERATION_GONE_BY_OPERATOR: {
       allocator->recoverResources(
           operation->framework_id(),
           operation->slave_id(),
@@ -11476,7 +11477,6 @@ void Master::updateOperation(
     case OPERATION_UNSUPPORTED:
     case OPERATION_PENDING:
     case OPERATION_UNREACHABLE:
-    case OPERATION_GONE_BY_OPERATOR:
     case OPERATION_RECOVERING:
     case OPERATION_UNKNOWN: {
       LOG(FATAL) << "Unexpected operation state "
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4b48756..d66ae52 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -104,6 +104,7 @@ using mesos::authorization::KILL_STANDALONE_CONTAINER;
 using mesos::authorization::REMOVE_NESTED_CONTAINER;
 using mesos::authorization::REMOVE_STANDALONE_CONTAINER;
 using mesos::authorization::MODIFY_RESOURCE_PROVIDER_CONFIG;
+using mesos::authorization::MARK_RESOURCE_PROVIDER_GONE;
 using mesos::authorization::PRUNE_IMAGES;
 
 using mesos::internal::recordio::Reader;
@@ -655,6 +656,9 @@ Future<Response> Http::_api(
     case mesos::agent::Call::REMOVE_RESOURCE_PROVIDER_CONFIG:
       return removeResourceProviderConfig(call, principal);
 
+    case mesos::agent::Call::MARK_RESOURCE_PROVIDER_GONE:
+      return markResourceProviderGone(call, principal);
+
     case mesos::agent::Call::PRUNE_IMAGES:
       return pruneImages(call, mediaTypes.accept, principal);
   }
@@ -3325,6 +3329,37 @@ Future<Response> Http::removeResourceProviderConfig(
 }
 
 
+Future<Response> Http::markResourceProviderGone(
+    const mesos::agent::Call& call,
+    const Option<Principal>& principal) const
+{
+  CHECK_EQ(mesos::agent::Call::MARK_RESOURCE_PROVIDER_GONE, call.type());
+  CHECK(call.has_mark_resource_provider_gone());
+
+  const ResourceProviderID& resourceProviderId =
+    call.mark_resource_provider_gone().resource_provider_id();
+
+  LOG(INFO)
+    << "Processing MARK_RESOURCE_PROVIDER_GONE for resource provider "
+    << resourceProviderId;
+
+  return ObjectApprovers::create(
+      slave->authorizer, principal, {MARK_RESOURCE_PROVIDER_GONE})
+    .then(defer(
+        slave->self(),
+        [this, resourceProviderId](
+            const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+          if (!approvers->approved<MARK_RESOURCE_PROVIDER_GONE>()) {
+            return Forbidden();
+          }
+
+          return slave->markResourceProviderGone(resourceProviderId)
+            .then([](const Future<Nothing>&) -> Future<Response> {
+              return OK();
+            });
+        }));
+}
+
 // Helper that reads data from `writer` and writes to `reader`.
 // Returns a failed future if there are any errors reading or writing.
 // The future is satisfied when we get a EOF.
diff --git a/src/slave/http.hpp b/src/slave/http.hpp
index 5b113fa..b8c83f1 100644
--- a/src/slave/http.hpp
+++ b/src/slave/http.hpp
@@ -350,6 +350,10 @@ private:
       const mesos::agent::Call& call,
       const Option<process::http::authentication::Principal>& principal) const;
 
+  process::Future<process::http::Response> markResourceProviderGone(
+      const mesos::agent::Call& call,
+      const Option<process::http::authentication::Principal>& principal) const;
+
   process::Future<process::http::Response> pruneImages(
       const mesos::agent::Call& call,
       ContentType acceptType,
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 10cbc19..ed92f67 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -8000,8 +8000,98 @@ void Slave::handleResourceProviderMessage(
       const ResourceProviderID& resourceProviderId =
         message->remove->resourceProviderId;
 
+      if (!resourceProviders.contains(resourceProviderId)) {
+        break;
+      }
+
+      const ResourceProvider* resourceProvider =
+        resourceProviders.at(resourceProviderId);
+
+      CHECK_NOTNULL(resourceProvider);
+
+      // Transition all non-terminal operations on the resource provider to a
+      // terminal state.
+      //
+      // NOTE: We operate on a copy of the operations container since we trigger
+      // removal of current operation in below loop. This invalidates the loop
+      // iterator so it cannot be safely incremented after the loop body.
+      const hashmap<UUID, Operation*> operations = resourceProvider->operations;
+      foreachpair (const UUID& uuid, Operation * operation, operations) {
+        CHECK_NOTNULL(operation);
+
+        if (protobuf::isTerminalState(operation->latest_status().state())) {
+          continue;
+        }
+
+        // The operation might be from an operator API call, thus the framework
+        // ID here is optional.
+        Option<FrameworkID> frameworkId =
+          operation->has_framework_id()
+            ? operation->framework_id()
+            : Option<FrameworkID>::none();
+
+        Option<OperationID> operationId =
+          operation->info().has_id()
+            ? operation->info().id()
+            : Option<OperationID>::none();
+
+        UpdateOperationStatusMessage update =
+          protobuf::createUpdateOperationStatusMessage(
+              uuid,
+              protobuf::createOperationStatus(
+                  OPERATION_GONE_BY_OPERATOR,
+                  operationId,
+                  "The resource provider was removed before a terminal "
+                  "operation status update was received",
+                  None(),
+                  None(),
+                  info.id()),
+              None(),
+              frameworkId);
+
+        updateOperation(operation, update);
+
+        removeOperation(operation);
+
+        // Forward the operation status update to the master.
+        //
+        // The status update from the resource provider does not
+        // provide the agent ID (because the resource provider doesn't
+        // know it), so we inject it here.
+        UpdateOperationStatusMessage _update;
+        _update.CopyFrom(update);
+        _update.mutable_slave_id()->CopyFrom(info.id());
+        send(master.get(), _update);
+      };
+
+      // TODO(bbannier): Consider transitioning all tasks using resources from
+      // this resource provider to e.g., `TASK_GONE_BY_OPERATOR` and terminating
+      // them.
+
+      // Remove the resources of the resource provider from the agent's total.
+      // This needs to be done after triggering the operation status update so
+      // that master does not receive a operations status update for an unknown
+      // operation (gone from `UpdateSlaveMessage`).
+      totalResources -= resourceProvider->totalResources;
+
       resourceProviders.erase(resourceProviderId);
 
+      switch (state) {
+        case RECOVERING:
+        case DISCONNECTED:
+        case TERMINATING: {
+          break;
+        }
+        case RUNNING: {
+          LOG(INFO) << "Forwarding new total resources " << totalResources;
+
+          // Inform the master about the updated resources.
+          send(master.get(), generateResourceProviderUpdate());
+
+          break;
+        }
+      }
+
       LOG(INFO) << "Removed resource provider '" << resourceProviderId << "'";
       break;
     }
@@ -8112,9 +8202,10 @@ void Slave::updateOperation(
     }
 
     // Terminal state, and the conversion has failed.
-    case OPERATION_FAILED:
+    case OPERATION_DROPPED:
     case OPERATION_ERROR:
-    case OPERATION_DROPPED: {
+    case OPERATION_FAILED:
+    case OPERATION_GONE_BY_OPERATOR: {
       break;
     }
 
@@ -8122,7 +8213,6 @@ void Slave::updateOperation(
     case OPERATION_UNSUPPORTED:
     case OPERATION_PENDING:
     case OPERATION_UNREACHABLE:
-    case OPERATION_GONE_BY_OPERATOR:
     case OPERATION_RECOVERING:
     case OPERATION_UNKNOWN: {
       LOG(FATAL)
@@ -8201,6 +8291,27 @@ ResourceProvider* Slave::getResourceProvider(const ResourceProviderID& id) const
 }
 
 
+Future<Nothing> Slave::markResourceProviderGone(
+    const ResourceProviderID& resourceProviderId) const
+{
+  auto message = [&resourceProviderId](const string& reason) {
+    return
+      "Could not mark resource provider '" + stringify(resourceProviderId) +
+      "' as gone: " + reason;
+  };
+
+  if (!resourceProviderManager.get()) {
+    return Failure(message("Agent has not registered yet"));
+  }
+
+  if (resourceProviders.contains(resourceProviderId) &&
+      !resourceProviders.at(resourceProviderId)->totalResources.empty()) {
+    return Failure(message("Resource provider has resources"));
+  }
+
+  return resourceProviderManager->removeResourceProvider(resourceProviderId);
+}
+
 void Slave::apply(Operation* operation)
 {
   vector<ResourceConversion> conversions;
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 2eadf5f..2bcd7a9 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -661,6 +661,9 @@ private:
 
   void removeOperation(Operation* operation);
 
+  process::Future<Nothing> markResourceProviderGone(
+      const ResourceProviderID& resourceProviderId) const;
+
   Operation* getOperation(const UUID& uuid) const;
 
   void addResourceProvider(ResourceProvider* resourceProvider);
diff --git a/src/slave/validation.cpp b/src/slave/validation.cpp
index df5e137..99b17c9 100644
--- a/src/slave/validation.cpp
+++ b/src/slave/validation.cpp
@@ -533,6 +533,14 @@ Option<Error> validate(
       return None();
     }
 
+    case mesos::agent::Call::MARK_RESOURCE_PROVIDER_GONE: {
+      if (!call.has_mark_resource_provider_gone()) {
+        return Error("Expecting 'mark_resource_provider_gone' to be present");
+      }
+
+      return None();
+    }
+
     case mesos::agent::Call::PRUNE_IMAGES: {
       return None();
     }
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 76ec56b..12ecea1 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -8334,6 +8334,139 @@ TEST_P(AgentAPITest, GetResourceProviders)
 }
 
 
+TEST_P(AgentAPITest, MarkResourceProviderGone)
+{
+  Clock::pause();
+
+  const ContentType contentType = GetParam();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.authenticate_http_readwrite = true;
+
+  {
+    // Default principal 2 is not allowed to mark any resource provider gone.
+    mesos::ACL::MarkResourceProvidersGone* acl =
+      slaveFlags.acls->add_mark_resource_providers_gone();
+    acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
+    acl->mutable_resource_providers()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  mesos::v1::ResourceProviderInfo info;
+  info.set_type("org.apache.mesos.rp.test");
+  info.set_name("test");
+
+  // Start a resource provider without resources since resource
+  // providers with resources cannot be marked gone.
+  v1::MockResourceProvider resourceProvider(info, v1::Resource());
+
+  // Start and register a resource provider.
+  Owned<EndpointDetector> endpointDetector(
+      resource_provider::createEndpointDetector(slave.get()->pid));
+
+  Future<mesos::v1::resource_provider::Event::Subscribed> subscribed;
+
+  EXPECT_CALL(resourceProvider, subscribed(_))
+    .WillOnce(DoAll(
+        Invoke(&resourceProvider, &v1::MockResourceProvider::subscribedDefault),
+        FutureArg<0>(&subscribed)))
+    .WillRepeatedly(Return());
+
+  // After the resource provider it will update its resources which
+  // triggers an `UpdateSlaveMessage`.
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  resourceProvider.start(std::move(endpointDetector), contentType);
+
+  AWAIT_READY(subscribed);
+  AWAIT_READY(updateSlaveMessage);
+
+  const mesos::v1::ResourceProviderID& resourceProviderId =
+    subscribed->provider_id();
+
+  // Removing the resource provider should trigger an `UpdateSlaveMessage`.
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // Mark the resource provider gone.
+  {
+    // We explicitly track whether the resource provider was disconnected as it
+    // indicates whether the agent has cleaned up the connection to the
+    // provider.
+    //
+    // NOTE: As the resource provider driver will try to resubscribe if the
+    // connection is broken will does not succeed, we might observe other
+    // `disconnected` events.
+    Future<Nothing> disconnected;
+    EXPECT_CALL(resourceProvider, disconnected())
+      .WillOnce(FutureSatisfy(&disconnected))
+      .WillRepeatedly(Return());
+
+    v1::agent::Call v1Call;
+    v1Call.set_type(v1::agent::Call::MARK_RESOURCE_PROVIDER_GONE);
+    v1Call.mutable_mark_resource_provider_gone()
+      ->mutable_resource_provider_id()
+      ->CopyFrom(resourceProviderId);
+
+    // `DEFAULT_CREDENTIAL_2` is not able to mark the resource provider as gone.
+    http::Headers headers2 = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
+    headers2["Accept"] = stringify(contentType);
+
+    Future<http::Response> v1Response = http::post(
+        slave.get()->pid,
+        "api/v1",
+        headers2,
+        serialize(contentType, v1Call),
+        stringify(contentType));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, v1Response);
+
+    // Other principals are able to remove the resource provider.
+    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    v1Response = http::post(
+        slave.get()->pid,
+        "api/v1",
+        headers,
+        serialize(contentType, v1Call),
+        stringify(contentType));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, v1Response);
+
+    AWAIT_READY(disconnected);
+  }
+
+  // Verify that resource provider is not be there anymore.
+  {
+    AWAIT_READY(updateSlaveMessage);
+
+    v1::agent::Call v1Call;
+    v1Call.set_type(v1::agent::Call::GET_RESOURCE_PROVIDERS);
+
+    auto v1Response = post(slave.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::agent::Response::GET_RESOURCE_PROVIDERS, v1Response->type());
+
+    EXPECT_TRUE(
+        v1Response->get_resource_providers().resource_providers().empty());
+  }
+}
+
+
 TEST_P(AgentAPITest, GetOperations)
 {
   Clock::pause();