You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/07 23:46:01 UTC

[2/3] mesos git commit: Publish resource provider resources before container launch or update.

Publish resource provider resources before container launch or update.

`Slave::publishAllocatedResources()` will compute the total allocated
resources for all currently running executor containers, and takes an
`extra` argument for resources that will be used by the executor that
is about to launch, then sums them up and asks the resource provider
manager to publish the resources.

Review: https://reviews.apache.org/r/63555


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e715399
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e715399
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e715399

Branch: refs/heads/master
Commit: 4e71539976013b76e85ce79d2a123a02adfb832a
Parents: 3119776
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Nov 3 18:43:30 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 7 15:07:54 2017 -0800

----------------------------------------------------------------------
 src/internal/devolve.cpp          |  13 +--
 src/internal/devolve.hpp          |   2 +-
 src/internal/evolve.cpp           |  10 +++
 src/internal/evolve.hpp           |   1 +
 src/resource_provider/manager.cpp |   9 ++
 src/slave/slave.cpp               |  72 ++++++++++++---
 src/slave/slave.hpp               |   8 ++
 src/tests/mesos.hpp               |  37 +++++++-
 src/tests/slave_tests.cpp         | 158 +++++++++++++++++++++++++++++++++
 9 files changed, 288 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/devolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index 289c6e3..60a768d 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -116,16 +116,19 @@ Resource devolve(const v1::Resource& resource)
 }
 
 
-Resources devolve(const v1::Resources& resources)
+ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId)
 {
-  return devolve<Resource>(
-      static_cast<const RepeatedPtrField<v1::Resource>&>(resources));
+  // NOTE: We do not use the common 'devolve' call for performance.
+  ResourceProviderID id;
+  id.set_value(resourceProviderId.value());
+  return id;
 }
 
 
-ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId)
+Resources devolve(const v1::Resources& resources)
 {
-  return devolve<ResourceProviderID>(resourceProviderId);
+  return devolve<Resource>(
+      static_cast<const RepeatedPtrField<v1::Resource>&>(resources));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/devolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 17ab76e..9e56fd9 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -62,8 +62,8 @@ InverseOffer devolve(const v1::InverseOffer& inverseOffer);
 Offer devolve(const v1::Offer& offer);
 OfferOperationStatus devolve(const v1::OfferOperationStatus& status);
 Resource devolve(const v1::Resource& resource);
-Resources devolve(const v1::Resources& resources);
 ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId);
+Resources devolve(const v1::Resources& resources);
 SlaveID devolve(const v1::AgentID& agentId);
 SlaveInfo devolve(const v1::AgentInfo& agentInfo);
 TaskID devolve(const v1::TaskID& taskId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index f46f864..6ce6150 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -164,6 +164,16 @@ v1::Resource evolve(const Resource& resource)
 }
 
 
+v1::ResourceProviderID evolve(
+    const ResourceProviderID& resourceProviderId)
+{
+  // NOTE: We do not use the common 'devolve' call for performance.
+  v1::ResourceProviderID id;
+  id.set_value(resourceProviderId.value());
+  return id;
+}
+
+
 v1::Resources evolve(const Resources& resources)
 {
   return evolve<v1::Resource>(

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index d796f32..77b7172 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -75,6 +75,7 @@ v1::MasterInfo evolve(const MasterInfo& masterInfo);
 v1::Offer evolve(const Offer& offer);
 v1::OfferID evolve(const OfferID& offerId);
 v1::Resource evolve(const Resource& resource);
+v1::ResourceProviderID evolve(const ResourceProviderID& resourceProviderId);
 v1::Resources evolve(const Resources& resources);
 v1::Task evolve(const Task& task);
 v1::TaskID evolve(const TaskID& taskId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 2aee46e..e75d528 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -538,6 +538,10 @@ Future<Nothing> ResourceProviderManagerProcess::publish(
     ResourceProvider* resourceProvider =
       resourceProviders.subscribed.at(resourceProviderId).get();
 
+    LOG(INFO)
+      << "Sending PUBLISH event " << uuid << " with resources '" << resources
+      << "' to resource provider " << resourceProviderId;
+
     if (!resourceProvider->http.send(event)) {
       return Failure(
           "Failed to send PUBLISH event to resource provider " +
@@ -677,6 +681,11 @@ void ResourceProviderManagerProcess::updatePublishStatus(
     return;
   }
 
+  LOG(INFO)
+    << "Received UPDATE_PUBLISH_STATUS call for PUBLISH event " << uuid.get()
+    << " with " << update.status() << " status from resource provider "
+    << resourceProvider->info.id();
+
   if (update.status() == Call::UpdatePublishStatus::OK) {
     resourceProvider->publishes.at(uuid.get())->set(Nothing());
   } else {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fb077b7..1bdc9d8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2554,9 +2554,12 @@ void Slave::__run(
       LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
                 << " for executor " << *executor;
 
-      containerizer->update(
-          executor->containerId,
-          executor->allocatedResources())
+      publishResources()
+        .then(defer(self(), [=] {
+          return containerizer->update(
+              executor->containerId,
+              executor->allocatedResources());
+        }))
         .onAny(defer(self(),
                      &Self::___run,
                      lambda::_1,
@@ -2998,11 +3001,19 @@ void Slave::launchExecutor(
             << "' of framework " << framework->id();
 
   // Launch the container.
-  containerizer->launch(
-      executor->containerId,
-      containerConfig,
-      environment,
-      pidCheckpointPath)
+  // NOTE: Since we modify the ExecutorInfo to include the task's
+  // resources when launching the executor, these resources need to be
+  // published before the containerizer preparing them. This should be
+  // revisited after MESOS-600.
+  publishResources(
+      taskInfo.isSome() ? taskInfo->resources() : Option<Resources>::none())
+    .then(defer(self(), [=] {
+      return containerizer->launch(
+          executor->containerId,
+          containerConfig,
+          environment,
+          pidCheckpointPath);
+    }))
     .onAny(defer(self(),
                  &Self::executorLaunched,
                  frameworkId,
@@ -4154,9 +4165,12 @@ void Slave::subscribe(
         }
       }
 
-      containerizer->update(
-          executor->containerId,
-          executor->allocatedResources())
+      publishResources()
+        .then(defer(self(), [=] {
+          return containerizer->update(
+              executor->containerId,
+              executor->allocatedResources());
+        }))
         .onAny(defer(self(),
                      &Self::___run,
                      lambda::_1,
@@ -4358,9 +4372,12 @@ void Slave::registerExecutor(
         }
       }
 
-      containerizer->update(
-          executor->containerId,
-          executor->allocatedResources())
+      publishResources()
+        .then(defer(self(), [=] {
+          return containerizer->update(
+              executor->containerId,
+              executor->allocatedResources());
+        }))
         .onAny(defer(self(),
                      &Self::___run,
                      lambda::_1,
@@ -7323,6 +7340,33 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
 }
 
 
+Future<Nothing> Slave::publishResources(
+    const Option<Resources>& additionalResources)
+{
+  Resources resources;
+
+  // NOTE: For resources providers that serve quantity-based resources
+  // without any identifiers (such as memory), it is very hard to keep
+  // track of published resources. So instead of implementing diff-based
+  // resource publishing, we implement an "ensure-all" semantics, and
+  // always calculate the total resources that need to remain published.
+  foreachvalue (const Framework* framework, frameworks) {
+    // NOTE: We do not call `framework->allocatedResource()` here
+    // because we do not want to publsh resources for pending tasks that
+    // have not been authorized yet.
+    foreachvalue (const Executor* executor, framework->executors) {
+      resources += executor->allocatedResources();
+    }
+  }
+
+  if (additionalResources.isSome()) {
+    resources += additionalResources.get();
+  }
+
+  return resourceProviderManager.publish(resources);
+}
+
+
 void Slave::qosCorrections()
 {
   qosController->corrections()

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index bbf5b79..d9b0469 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -567,6 +567,14 @@ private:
 
   void apply(const std::vector<ResourceConversion>& conversions);
 
+  // Publish all resources that are needed to run the current set of
+  // tasks and executors on the agent.
+  // NOTE: The `additionalResources` parameter is for publishing
+  // additional task resources when launching executors. Consider
+  // removing this parameter once we revisited MESOS-600.
+  process::Future<Nothing> publishResources(
+      const Option<Resources>& additionalResources = None());
+
   // Gauge methods.
   double _frameworks_active()
   {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 53890d8..657f925 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -440,6 +440,7 @@ using mesos::v1::MachineID;
 using mesos::v1::Metric;
 using mesos::v1::Offer;
 using mesos::v1::Resource;
+using mesos::v1::ResourceProviderInfo;
 using mesos::v1::Resources;
 using mesos::v1::TaskID;
 using mesos::v1::TaskInfo;
@@ -1345,7 +1346,7 @@ inline typename TOffer::Operation CREATE_VOLUME(
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::CREATE_VOLUME);
   operation.mutable_create_volume()->mutable_source()->CopyFrom(source);
-  operation.set_target_type(type);
+  operation.mutable_create_volume()->set_target_type(type);
   return operation;
 }
 
@@ -2848,6 +2849,22 @@ public:
               Operation,
               Source>::operationDefault));
     EXPECT_CALL(*this, operation(_)).WillRepeatedly(DoDefault());
+
+    ON_CALL(*this, publish(_))
+      .WillByDefault(Invoke(
+          this,
+          &MockResourceProvider<
+              Event,
+              Call,
+              Driver,
+              ResourceProviderInfo,
+              Resource,
+              Resources,
+              ResourceProviderID,
+              OfferOperationState,
+              Operation,
+              Source>::publishDefault));
+    EXPECT_CALL(*this, publish(_)).WillRepeatedly(DoDefault());
   }
 
   MOCK_METHOD0_T(connected, void());
@@ -3027,7 +3044,7 @@ public:
           ->Mutable(0)
           ->mutable_disk()
           ->mutable_source()
-          ->set_type(Source::BLOCK);
+          ->set_type(Source::RAW);
         break;
       case Operation::CREATE_BLOCK:
         update->mutable_status()->add_converted_resources()->CopyFrom(
@@ -3058,6 +3075,22 @@ public:
     driver->send(call);
   }
 
+  void publishDefault(const typename Event::Publish& publish)
+  {
+    CHECK(info.has_id());
+
+    Call call;
+    call.set_type(Call::UPDATE_PUBLISH_STATUS);
+    call.mutable_resource_provider_id()->CopyFrom(info.id());
+
+    typename Call::UpdatePublishStatus* update =
+      call.mutable_update_publish_status();
+    update->set_uuid(publish.uuid());
+    update->set_status(Call::UpdatePublishStatus::OK);
+
+    driver->send(call);
+  }
+
   ResourceProviderInfo info;
 
 private:

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 25cfd47..6640620 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8775,6 +8775,164 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
 }
 
 
+// This test checks that before a workload (executor or task) is
+// launched, all resources from resoruce providers nended to run the
+// current set of workloads are properly published.
+TEST_F(SlaveTest, ResourceProviderPublishAll)
+{
+  // Start an agent and a master.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER
+  };
+
+  flags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    flags.agent_features->add_capabilities()->set_type(type);
+  }
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a mock local resource provider with the agent.
+  v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.local.mock");
+  resourceProviderInfo.set_name("test");
+
+  vector<v1::Resource> resources = {
+      v1::Resources::parse("disk", "4096", "role1").get(),
+      v1::Resources::parse("disk", "4096", "role2").get()
+  };
+
+  v1::MockResourceProvider resourceProvider(resourceProviderInfo, resources);
+
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  process::http::URL url(
+      scheme,
+      slave.get()->pid.address.ip,
+      slave.get()->pid.address.port,
+      slave.get()->pid.id + "/api/v1/resource_provider");
+
+  Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+  resourceProvider.start(
+      endpointDetector,
+      ContentType::PROTOBUF,
+      v1::DEFAULT_CREDENTIAL);
+
+  // We want to register two frameworks to launch two concurrent tasks
+  // that use the provider resources, and verify that when the second
+  // task is launched, all provider resources are published.
+  // NOTE: The mock schedulers and drivers are stored outside the loop
+  // to avoid implicit destruction before the test ends.
+  vector<Owned<MockScheduler>> scheds;
+  vector<Owned<MesosSchedulerDriver>> drivers;
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  for (size_t i = 0; i < resources.size(); i++) {
+    FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+    framework.set_roles(0, resources.at(i).reservations(0).role());
+
+    Owned<MockScheduler> sched(new MockScheduler());
+    Owned<MesosSchedulerDriver> driver(new MesosSchedulerDriver(
+        sched.get(), framework, master.get()->pid, DEFAULT_CREDENTIAL));
+
+    EXPECT_CALL(*sched, registered(driver.get(), _, _));
+
+    Future<vector<Offer>> offers;
+
+    // Decline unmatched offers.
+    // NOTE: This ensures that this framework do not hold the agent's
+    // default resources. Otherwise, the other one will get no offer.
+    EXPECT_CALL(*sched, resourceOffers(driver.get(), _))
+      .WillRepeatedly(DeclineOffers());
+
+    EXPECT_CALL(*sched, resourceOffers(driver.get(), OffersHaveAnyResource(
+        std::bind(&Resources::isReserved, lambda::_1, framework.roles(0)))))
+      .WillOnce(FutureArg<1>(&offers));
+
+    driver->start();
+
+    AWAIT_READY(offers);
+    ASSERT_FALSE(offers->empty());
+
+    Future<mesos::v1::resource_provider::Event::Publish> publish;
+
+    // Two PUBLISH events will be received: one for launching the
+    // executor, and the other for launching the task.
+    EXPECT_CALL(resourceProvider, publish(_))
+      .WillOnce(
+          Invoke(&resourceProvider,
+                 &v1::MockResourceProvider::publishDefault))
+      .WillOnce(DoAll(
+          FutureArg<0>(&publish),
+          Invoke(&resourceProvider,
+                 &v1::MockResourceProvider::publishDefault)));
+
+    Future<TaskStatus> taskStarting;
+    Future<TaskStatus> taskRunning;
+
+    EXPECT_CALL(*sched, statusUpdate(driver.get(), _))
+      .WillOnce(FutureArg<1>(&taskStarting))
+      .WillOnce(FutureArg<1>(&taskRunning));
+
+    // Launch a task using a provider resource.
+    driver->acceptOffers(
+        {offers->at(0).id()},
+        {LAUNCH({createTask(
+            offers->at(0).slave_id(),
+            Resources(offers->at(0).resources()).reserved(framework.roles(0)),
+            createCommandInfo("sleep 1000"))})},
+        filters);
+
+    AWAIT_READY(publish);
+
+    // Test if the resources of all running executors are published.
+    // This is checked through counting how many reservatinos there are
+    // in the published resources: one (role1) when launching the first
+    // task, two (role1, role2) when the second task is launched.
+    EXPECT_EQ(i + 1, v1::Resources(publish->resources()).reservations().size());
+
+    AWAIT_READY(taskStarting);
+    EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+    AWAIT_READY(taskRunning);
+    EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+    // Store the mock scheduler and driver to prevent destruction.
+    scheds.emplace_back(std::move(sched));
+    drivers.emplace_back(std::move(driver));
+  }
+}
+
+
 // This test checks that the agent correctly updates and sends
 // resource version values when it registers or reregisters.
 TEST_F(SlaveTest, ResourceVersions)