You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/11/08 16:01:24 UTC

[3/9] mesos git commit: Triggered 'UpdateSlaveMessage' when 'ResourceProviderManager' updates.

Triggered 'UpdateSlaveMessage' when 'ResourceProviderManager' updates.

The agent's resource provider manager sends a
'ResourceProviderMessage' when its managed resources change. This
commit adds handling in the agent so that an 'UpdateSlaveMessage' is
sent to the master to update the total resource available on the
agent.

In order to provide push-like handling of the resource provider
manager's message queue, we chain recursive calls to the handler for
continuous processing. Initially, processing is kicked off from
'Slave::initialize'. In this simple implementation we e.g., provide no
direct way to stop processing of messages, yet, but it can be achieved
by e.g., replacing the manager with a new instance (this would also
require updating routes).

Since the agent can only send an 'UpdateSlaveMessage' when it is
registered with a master, a simple back-off of 5 s is implemented which
will defer processing of a ready message should the agent not yet have
registered.

To facilitate logging we add a stringification function for
'ResourceProviderMessage's.

This patch also adjusts a number of tests to no expect two
'UpdateSlaveMessage's.

Review: https://reviews.apache.org/r/61183/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e37b1336
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e37b1336
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e37b1336

Branch: refs/heads/master
Commit: e37b1336c21d0b2d60fead1d6489122615a8a8c9
Parents: 9850482
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:51:52 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/resource_provider/message.hpp |  25 ++++++
 src/slave/slave.cpp               | 130 ++++++++++++++++++++++++++++-
 src/slave/slave.hpp               |   7 +-
 src/tests/slave_tests.cpp         | 144 +++++++++++++++++++++++++++++++++
 4 files changed, 300 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e37b1336/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index 3c7c3f2..931aab6 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -17,10 +17,14 @@
 #ifndef __RESOURCE_PROVIDER_MESSAGE_HPP__
 #define __RESOURCE_PROVIDER_MESSAGE_HPP__
 
+#include <ostream>
+
 #include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
+#include <stout/check.hpp>
 #include <stout/option.hpp>
+#include <stout/unreachable.hpp>
 
 namespace mesos {
 namespace internal {
@@ -42,6 +46,27 @@ struct ResourceProviderMessage
   Option<UpdateTotalResources> updateTotalResources;
 };
 
+
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const ResourceProviderMessage& resourceProviderMessage)
+{
+  switch (resourceProviderMessage.type) {
+    case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES:
+      const Option<ResourceProviderMessage::UpdateTotalResources>&
+        updateTotalResources = resourceProviderMessage.updateTotalResources;
+
+      CHECK_SOME(updateTotalResources);
+
+      return stream
+          << "UPDATE_TOTAL_RESOURCES: "
+          << updateTotalResources->id << " "
+          << updateTotalResources->total;
+  }
+
+  UNREACHABLE();
+}
+
 } // namespace internal {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e37b1336/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0e342cd..494d793 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1270,17 +1270,36 @@ void Slave::registered(
       break;
   }
 
+  // Send the latest total, including resources from resource providers. We send
+  // this message here as a resource provider might have registered with the
+  // agent between recovery completion and agent registration.
+  bool sendUpdateSlaveMessage = false;
+
+  UpdateSlaveMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+
+  if (capabilities.resourceProvider) {
+    LOG(INFO) << "Forwarding total resources " << totalResources;
+
+    message.mutable_resource_categories()->set_total(true);
+    message.mutable_total_resources()->CopyFrom(totalResources);
+
+    sendUpdateSlaveMessage = true;
+  }
+
   // Send the latest estimate for oversubscribed resources.
   if (oversubscribedResources.isSome()) {
     LOG(INFO) << "Forwarding total oversubscribed resources "
               << oversubscribedResources.get();
 
-    UpdateSlaveMessage message;
-    message.mutable_slave_id()->CopyFrom(info.id());
     message.mutable_resource_categories()->set_oversubscribed(true);
     message.mutable_oversubscribed_resources()->CopyFrom(
         oversubscribedResources.get());
 
+    sendUpdateSlaveMessage = true;
+  }
+
+  if (sendUpdateSlaveMessage) {
     send(master.get(), message);
   }
 }
@@ -1350,17 +1369,36 @@ void Slave::reregistered(
       return;
   }
 
+  // Send the latest total, including resources from resource providers. We send
+  // this message here as a resource provider might have registered with the
+  // agent between recovery completion and agent registration.
+  bool sendUpdateSlaveMessage = false;
+
+  UpdateSlaveMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+
+  if (capabilities.resourceProvider) {
+    LOG(INFO) << "Forwarding total resources " << totalResources;
+
+    message.mutable_resource_categories()->set_total(true);
+    message.mutable_total_resources()->CopyFrom(totalResources);
+
+    sendUpdateSlaveMessage = true;
+  }
+
   // Send the latest estimate for oversubscribed resources.
   if (oversubscribedResources.isSome()) {
     LOG(INFO) << "Forwarding total oversubscribed resources "
               << oversubscribedResources.get();
 
-    UpdateSlaveMessage message;
-    message.mutable_slave_id()->CopyFrom(info.id());
     message.mutable_resource_categories()->set_oversubscribed(true);
     message.mutable_oversubscribed_resources()->CopyFrom(
         oversubscribedResources.get());
 
+    sendUpdateSlaveMessage = true;
+  }
+
+  if (sendUpdateSlaveMessage) {
     send(master.get(), message);
   }
 
@@ -6429,6 +6467,12 @@ void Slave::__recover(const Future<Nothing>& future)
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
 
+    if (capabilities.resourceProvider) {
+      // Start listening for messages from the resource provider manager.
+      resourceProviderManager.messages().get().onAny(
+          defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+    }
+
     // Forward oversubscribed resources.
     forwardOversubscribed();
 
@@ -6621,6 +6665,84 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 }
 
 
+void Slave::handleResourceProviderMessage(
+    const Future<ResourceProviderMessage>& message)
+{
+  // Ignore terminal messages which are not ready. These
+  // can arise e.g., if the `Future` was discarded.
+  if (!message.isReady()) {
+    LOG(ERROR) << "Last resource provider message became terminal before "
+                  "becoming ready: "
+               << (message.isFailed() ? message.failure() : "future discarded");
+
+    // Wait for the next message.
+    resourceProviderManager.messages().get()
+      .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+
+    return;
+  }
+
+  LOG(INFO) << "Handling resource provider message '" << message.get() << "'";
+
+  switch(message->type) {
+    case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES: {
+      CHECK_SOME(message->updateTotalResources);
+
+      const Resources& newTotal = message->updateTotalResources->total;
+
+      const ResourceProviderID& resourceProviderId =
+        message->updateTotalResources->id;
+
+      const Resources oldTotal =
+        totalResources.filter([&resourceProviderId](const Resource& resource) {
+          return resource.provider_id() == resourceProviderId;
+        });
+
+      // Ignore the update if it contained no new information.
+      if (newTotal == oldTotal) {
+        break;
+      }
+
+      totalResources -= oldTotal;
+      totalResources += newTotal;
+
+      // Send the updated resources to the master if the agent is running. Note
+      // that since we have already updated our copy of the latest resource
+      // provider resources, it is safe to consume this message and wait for the
+      // next one; even if we do not send the update to the master right now, an
+      // update will be send once the agent reregisters.
+      switch (state) {
+        case RECOVERING:
+        case DISCONNECTED:
+        case TERMINATING: {
+          break;
+        }
+        case RUNNING: {
+          LOG(INFO) << "Forwarding new total resources " << totalResources;
+
+          // Inform the master that the total capacity of this agent has
+          // changed.
+          UpdateSlaveMessage updateSlaveMessage;
+          updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
+          updateSlaveMessage.mutable_resource_categories()->set_total(true);
+          updateSlaveMessage.mutable_total_resources()->CopyFrom(
+              totalResources);
+
+          send(master.get(), updateSlaveMessage);
+
+          break;
+        }
+      }
+      break;
+    }
+  }
+
+  // Wait for the next message.
+  resourceProviderManager.messages().get()
+    .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+}
+
+
 void Slave::qosCorrections()
 {
   qosController->corrections()

http://git-wip-us.apache.org/repos/asf/mesos/blob/e37b1336/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 79c7f02..b2dc002 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -530,6 +530,9 @@ private:
   void _forwardOversubscribed(
       const process::Future<Resources>& oversubscribable);
 
+  void handleResourceProviderMessage(
+      const process::Future<ResourceProviderMessage>& message);
+
   // Gauge methods.
   double _frameworks_active()
   {
@@ -576,8 +579,8 @@ private:
   // Resources that are checkpointed by the slave.
   Resources checkpointedResources;
 
-  // The current total resources of the agent, i.e.,
-  // `info.resources()` with checkpointed resources applied.
+  // The current total resources of the agent, i.e., `info.resources()` with
+  // checkpointed resources applied and resource provider resources.
   Resources totalResources;
 
   Option<process::UPID> master;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e37b1336/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index f9c2e6b..9928cfc 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -31,6 +31,8 @@
 
 #include <mesos/authentication/http/basic_authenticator_factory.hpp>
 
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
 #include <process/clock.hpp>
 #include <process/collect.hpp>
 #include <process/future.hpp>
@@ -41,6 +43,8 @@
 #include <process/reap.hpp>
 #include <process/subprocess.hpp>
 
+#include <process/ssl/flags.hpp>
+
 #include <stout/hashset.hpp>
 #include <stout/json.hpp>
 #include <stout/none.hpp>
@@ -8515,6 +8519,146 @@ TEST_P(DefaultContainerDNSFlagTest, ValidateFlag)
   }
 }
 
+
+// This test checks that when a resource provider subscribes with the
+// agent's resource provider manager, the agent send an
+// `UpdateSlaveMessage` reflecting the updated capacity.
+//
+// TODO(bbannier): We should also add tests for the agent behavior
+// with resource providers where the agent ultimately resends the
+// previous total when the master fails over, or for the interaction
+// with the usual oversubscription protocol (oversubscribed resources
+// vs. updates of total).
+TEST_F(SlaveTest, ResourceProviderSubscribe)
+{
+  Clock::pause();
+
+  // Start an agent and a master.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Specify the agent resources so we can check the reported total later.
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:2;mem:512;disk:512;ports:[]";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER};
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    SlaveInfo::Capability* capability =
+      slaveFlags.agent_features->add_capabilities();
+    capability->set_type(type);
+  }
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a local resource provider with the agent.
+  v1::MockResourceProvider resourceProvider;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(resourceProvider, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  process::http::URL url(
+      scheme,
+      slave.get()->pid.address.ip,
+      slave.get()->pid.address.port,
+      slave.get()->pid.id + "/api/v1/resource_provider");
+
+  Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+  resourceProvider.start(
+      endpointDetector,
+      ContentType::PROTOBUF,
+      v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(connected);
+
+  Future<mesos::v1::resource_provider::Event::Subscribed> subscribed;
+  EXPECT_CALL(resourceProvider, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribed));
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  {
+    mesos::v1::resource_provider::Call call;
+    call.set_type(mesos::v1::resource_provider::Call::SUBSCRIBE);
+
+    mesos::v1::ResourceProviderInfo* info =
+      call.mutable_subscribe()->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.resource_provider.test");
+    info->set_name("test");
+
+    resourceProvider.send(call);
+  }
+
+  // The subscription event contains the assigned resource provider id.
+  AWAIT_READY(subscribed);
+
+  const mesos::v1::ResourceProviderID& resourceProviderId =
+    subscribed->provider_id();
+
+  v1::Resource resourceProviderResources =
+    v1::Resources::parse("disk", "8096", "*").get();
+
+  resourceProviderResources.mutable_provider_id()->CopyFrom(resourceProviderId);
+
+  {
+    mesos::v1::resource_provider::Call call;
+    call.set_type(mesos::v1::resource_provider::Call::UPDATE_STATE);
+
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
+
+    auto updateState = call.mutable_update_state();
+    updateState->mutable_resources()->CopyFrom(
+        v1::Resources(resourceProviderResources));
+    updateState->set_resource_version_uuid(UUID::random().toBytes());
+
+    resourceProvider.send(call);
+  }
+
+  AWAIT_READY(updateSlaveMessage);
+
+  EXPECT_TRUE(updateSlaveMessage->has_resource_categories());
+  EXPECT_TRUE(updateSlaveMessage->resource_categories().has_total());
+  EXPECT_TRUE(updateSlaveMessage->resource_categories().total());
+
+  // We expect the updated agent total to contain both the resources of the
+  // agent and of the newly subscribed resource provider. The resources from the
+  // resource provider have a matching `ResourceProviderId` set.
+  Resources expectedResources =
+    Resources::parse(slaveFlags.resources.get()).get();
+  expectedResources += devolve(resourceProviderResources);
+
+  EXPECT_EQ(expectedResources, updateSlaveMessage->total_resources());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {