You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/07 23:46:01 UTC
[2/3] mesos git commit: Publish resource provider resources before
container launch or update.
Publish resource provider resources before container launch or update.
`Slave::publishAllocatedResources()` will compute the total allocated
resources for all currently running executor containers, and takes an
`extra` argument for resources that will be used by the executor that
is about to launch, then sums them up and asks the resource provider
manager to publish the resources.
Review: https://reviews.apache.org/r/63555
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e715399
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e715399
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e715399
Branch: refs/heads/master
Commit: 4e71539976013b76e85ce79d2a123a02adfb832a
Parents: 3119776
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Nov 3 18:43:30 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 7 15:07:54 2017 -0800
----------------------------------------------------------------------
src/internal/devolve.cpp | 13 +--
src/internal/devolve.hpp | 2 +-
src/internal/evolve.cpp | 10 +++
src/internal/evolve.hpp | 1 +
src/resource_provider/manager.cpp | 9 ++
src/slave/slave.cpp | 72 ++++++++++++---
src/slave/slave.hpp | 8 ++
src/tests/mesos.hpp | 37 +++++++-
src/tests/slave_tests.cpp | 158 +++++++++++++++++++++++++++++++++
9 files changed, 288 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/devolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index 289c6e3..60a768d 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -116,16 +116,19 @@ Resource devolve(const v1::Resource& resource)
}
-Resources devolve(const v1::Resources& resources)
+ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId)
{
- return devolve<Resource>(
- static_cast<const RepeatedPtrField<v1::Resource>&>(resources));
+ // NOTE: We do not use the common 'devolve' call for performance.
+ ResourceProviderID id;
+ id.set_value(resourceProviderId.value());
+ return id;
}
-ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId)
+Resources devolve(const v1::Resources& resources)
{
- return devolve<ResourceProviderID>(resourceProviderId);
+ return devolve<Resource>(
+ static_cast<const RepeatedPtrField<v1::Resource>&>(resources));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/devolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 17ab76e..9e56fd9 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -62,8 +62,8 @@ InverseOffer devolve(const v1::InverseOffer& inverseOffer);
Offer devolve(const v1::Offer& offer);
OfferOperationStatus devolve(const v1::OfferOperationStatus& status);
Resource devolve(const v1::Resource& resource);
-Resources devolve(const v1::Resources& resources);
ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId);
+Resources devolve(const v1::Resources& resources);
SlaveID devolve(const v1::AgentID& agentId);
SlaveInfo devolve(const v1::AgentInfo& agentInfo);
TaskID devolve(const v1::TaskID& taskId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index f46f864..6ce6150 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -164,6 +164,16 @@ v1::Resource evolve(const Resource& resource)
}
+v1::ResourceProviderID evolve(
+ const ResourceProviderID& resourceProviderId)
+{
+ // NOTE: We do not use the common 'devolve' call for performance.
+ v1::ResourceProviderID id;
+ id.set_value(resourceProviderId.value());
+ return id;
+}
+
+
v1::Resources evolve(const Resources& resources)
{
return evolve<v1::Resource>(
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index d796f32..77b7172 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -75,6 +75,7 @@ v1::MasterInfo evolve(const MasterInfo& masterInfo);
v1::Offer evolve(const Offer& offer);
v1::OfferID evolve(const OfferID& offerId);
v1::Resource evolve(const Resource& resource);
+v1::ResourceProviderID evolve(const ResourceProviderID& resourceProviderId);
v1::Resources evolve(const Resources& resources);
v1::Task evolve(const Task& task);
v1::TaskID evolve(const TaskID& taskId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 2aee46e..e75d528 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -538,6 +538,10 @@ Future<Nothing> ResourceProviderManagerProcess::publish(
ResourceProvider* resourceProvider =
resourceProviders.subscribed.at(resourceProviderId).get();
+ LOG(INFO)
+ << "Sending PUBLISH event " << uuid << " with resources '" << resources
+ << "' to resource provider " << resourceProviderId;
+
if (!resourceProvider->http.send(event)) {
return Failure(
"Failed to send PUBLISH event to resource provider " +
@@ -677,6 +681,11 @@ void ResourceProviderManagerProcess::updatePublishStatus(
return;
}
+ LOG(INFO)
+ << "Received UPDATE_PUBLISH_STATUS call for PUBLISH event " << uuid.get()
+ << " with " << update.status() << " status from resource provider "
+ << resourceProvider->info.id();
+
if (update.status() == Call::UpdatePublishStatus::OK) {
resourceProvider->publishes.at(uuid.get())->set(Nothing());
} else {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fb077b7..1bdc9d8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2554,9 +2554,12 @@ void Slave::__run(
LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
<< " for executor " << *executor;
- containerizer->update(
- executor->containerId,
- executor->allocatedResources())
+ publishResources()
+ .then(defer(self(), [=] {
+ return containerizer->update(
+ executor->containerId,
+ executor->allocatedResources());
+ }))
.onAny(defer(self(),
&Self::___run,
lambda::_1,
@@ -2998,11 +3001,19 @@ void Slave::launchExecutor(
<< "' of framework " << framework->id();
// Launch the container.
- containerizer->launch(
- executor->containerId,
- containerConfig,
- environment,
- pidCheckpointPath)
+ // NOTE: Since we modify the ExecutorInfo to include the task's
+ // resources when launching the executor, these resources need to be
+ // published before the containerizer preparing them. This should be
+ // revisited after MESOS-600.
+ publishResources(
+ taskInfo.isSome() ? taskInfo->resources() : Option<Resources>::none())
+ .then(defer(self(), [=] {
+ return containerizer->launch(
+ executor->containerId,
+ containerConfig,
+ environment,
+ pidCheckpointPath);
+ }))
.onAny(defer(self(),
&Self::executorLaunched,
frameworkId,
@@ -4154,9 +4165,12 @@ void Slave::subscribe(
}
}
- containerizer->update(
- executor->containerId,
- executor->allocatedResources())
+ publishResources()
+ .then(defer(self(), [=] {
+ return containerizer->update(
+ executor->containerId,
+ executor->allocatedResources());
+ }))
.onAny(defer(self(),
&Self::___run,
lambda::_1,
@@ -4358,9 +4372,12 @@ void Slave::registerExecutor(
}
}
- containerizer->update(
- executor->containerId,
- executor->allocatedResources())
+ publishResources()
+ .then(defer(self(), [=] {
+ return containerizer->update(
+ executor->containerId,
+ executor->allocatedResources());
+ }))
.onAny(defer(self(),
&Self::___run,
lambda::_1,
@@ -7323,6 +7340,33 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
}
+Future<Nothing> Slave::publishResources(
+ const Option<Resources>& additionalResources)
+{
+ Resources resources;
+
+ // NOTE: For resources providers that serve quantity-based resources
+ // without any identifiers (such as memory), it is very hard to keep
+ // track of published resources. So instead of implementing diff-based
+ // resource publishing, we implement an "ensure-all" semantics, and
+ // always calculate the total resources that need to remain published.
+ foreachvalue (const Framework* framework, frameworks) {
+ // NOTE: We do not call `framework->allocatedResource()` here
+ // because we do not want to publsh resources for pending tasks that
+ // have not been authorized yet.
+ foreachvalue (const Executor* executor, framework->executors) {
+ resources += executor->allocatedResources();
+ }
+ }
+
+ if (additionalResources.isSome()) {
+ resources += additionalResources.get();
+ }
+
+ return resourceProviderManager.publish(resources);
+}
+
+
void Slave::qosCorrections()
{
qosController->corrections()
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index bbf5b79..d9b0469 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -567,6 +567,14 @@ private:
void apply(const std::vector<ResourceConversion>& conversions);
+ // Publish all resources that are needed to run the current set of
+ // tasks and executors on the agent.
+ // NOTE: The `additionalResources` parameter is for publishing
+ // additional task resources when launching executors. Consider
+ // removing this parameter once we revisited MESOS-600.
+ process::Future<Nothing> publishResources(
+ const Option<Resources>& additionalResources = None());
+
// Gauge methods.
double _frameworks_active()
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 53890d8..657f925 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -440,6 +440,7 @@ using mesos::v1::MachineID;
using mesos::v1::Metric;
using mesos::v1::Offer;
using mesos::v1::Resource;
+using mesos::v1::ResourceProviderInfo;
using mesos::v1::Resources;
using mesos::v1::TaskID;
using mesos::v1::TaskInfo;
@@ -1345,7 +1346,7 @@ inline typename TOffer::Operation CREATE_VOLUME(
typename TOffer::Operation operation;
operation.set_type(TOffer::Operation::CREATE_VOLUME);
operation.mutable_create_volume()->mutable_source()->CopyFrom(source);
- operation.set_target_type(type);
+ operation.mutable_create_volume()->set_target_type(type);
return operation;
}
@@ -2848,6 +2849,22 @@ public:
Operation,
Source>::operationDefault));
EXPECT_CALL(*this, operation(_)).WillRepeatedly(DoDefault());
+
+ ON_CALL(*this, publish(_))
+ .WillByDefault(Invoke(
+ this,
+ &MockResourceProvider<
+ Event,
+ Call,
+ Driver,
+ ResourceProviderInfo,
+ Resource,
+ Resources,
+ ResourceProviderID,
+ OfferOperationState,
+ Operation,
+ Source>::publishDefault));
+ EXPECT_CALL(*this, publish(_)).WillRepeatedly(DoDefault());
}
MOCK_METHOD0_T(connected, void());
@@ -3027,7 +3044,7 @@ public:
->Mutable(0)
->mutable_disk()
->mutable_source()
- ->set_type(Source::BLOCK);
+ ->set_type(Source::RAW);
break;
case Operation::CREATE_BLOCK:
update->mutable_status()->add_converted_resources()->CopyFrom(
@@ -3058,6 +3075,22 @@ public:
driver->send(call);
}
+ void publishDefault(const typename Event::Publish& publish)
+ {
+ CHECK(info.has_id());
+
+ Call call;
+ call.set_type(Call::UPDATE_PUBLISH_STATUS);
+ call.mutable_resource_provider_id()->CopyFrom(info.id());
+
+ typename Call::UpdatePublishStatus* update =
+ call.mutable_update_publish_status();
+ update->set_uuid(publish.uuid());
+ update->set_status(Call::UpdatePublishStatus::OK);
+
+ driver->send(call);
+ }
+
ResourceProviderInfo info;
private:
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 25cfd47..6640620 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8775,6 +8775,164 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
}
+// This test checks that before a workload (executor or task) is
+// launched, all resources from resoruce providers nended to run the
+// current set of workloads are properly published.
+TEST_F(SlaveTest, ResourceProviderPublishAll)
+{
+ // Start an agent and a master.
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability and other required capabilities.
+ constexpr SlaveInfo::Capability::Type capabilities[] = {
+ SlaveInfo::Capability::MULTI_ROLE,
+ SlaveInfo::Capability::HIERARCHICAL_ROLE,
+ SlaveInfo::Capability::RESERVATION_REFINEMENT,
+ SlaveInfo::Capability::RESOURCE_PROVIDER
+ };
+
+ flags.agent_features = SlaveCapabilities();
+ foreach (SlaveInfo::Capability::Type type, capabilities) {
+ flags.agent_features->add_capabilities()->set_type(type);
+ }
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Register a mock local resource provider with the agent.
+ v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.local.mock");
+ resourceProviderInfo.set_name("test");
+
+ vector<v1::Resource> resources = {
+ v1::Resources::parse("disk", "4096", "role1").get(),
+ v1::Resources::parse("disk", "4096", "role2").get()
+ };
+
+ v1::MockResourceProvider resourceProvider(resourceProviderInfo, resources);
+
+ string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+ if (process::network::openssl::flags().enabled) {
+ scheme = "https";
+ }
+#endif
+
+ process::http::URL url(
+ scheme,
+ slave.get()->pid.address.ip,
+ slave.get()->pid.address.port,
+ slave.get()->pid.id + "/api/v1/resource_provider");
+
+ Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+ resourceProvider.start(
+ endpointDetector,
+ ContentType::PROTOBUF,
+ v1::DEFAULT_CREDENTIAL);
+
+ // We want to register two frameworks to launch two concurrent tasks
+ // that use the provider resources, and verify that when the second
+ // task is launched, all provider resources are published.
+ // NOTE: The mock schedulers and drivers are stored outside the loop
+ // to avoid implicit destruction before the test ends.
+ vector<Owned<MockScheduler>> scheds;
+ vector<Owned<MesosSchedulerDriver>> drivers;
+
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (the default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ for (size_t i = 0; i < resources.size(); i++) {
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, resources.at(i).reservations(0).role());
+
+ Owned<MockScheduler> sched(new MockScheduler());
+ Owned<MesosSchedulerDriver> driver(new MesosSchedulerDriver(
+ sched.get(), framework, master.get()->pid, DEFAULT_CREDENTIAL));
+
+ EXPECT_CALL(*sched, registered(driver.get(), _, _));
+
+ Future<vector<Offer>> offers;
+
+ // Decline unmatched offers.
+ // NOTE: This ensures that this framework do not hold the agent's
+ // default resources. Otherwise, the other one will get no offer.
+ EXPECT_CALL(*sched, resourceOffers(driver.get(), _))
+ .WillRepeatedly(DeclineOffers());
+
+ EXPECT_CALL(*sched, resourceOffers(driver.get(), OffersHaveAnyResource(
+ std::bind(&Resources::isReserved, lambda::_1, framework.roles(0)))))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver->start();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->empty());
+
+ Future<mesos::v1::resource_provider::Event::Publish> publish;
+
+ // Two PUBLISH events will be received: one for launching the
+ // executor, and the other for launching the task.
+ EXPECT_CALL(resourceProvider, publish(_))
+ .WillOnce(
+ Invoke(&resourceProvider,
+ &v1::MockResourceProvider::publishDefault))
+ .WillOnce(DoAll(
+ FutureArg<0>(&publish),
+ Invoke(&resourceProvider,
+ &v1::MockResourceProvider::publishDefault)));
+
+ Future<TaskStatus> taskStarting;
+ Future<TaskStatus> taskRunning;
+
+ EXPECT_CALL(*sched, statusUpdate(driver.get(), _))
+ .WillOnce(FutureArg<1>(&taskStarting))
+ .WillOnce(FutureArg<1>(&taskRunning));
+
+ // Launch a task using a provider resource.
+ driver->acceptOffers(
+ {offers->at(0).id()},
+ {LAUNCH({createTask(
+ offers->at(0).slave_id(),
+ Resources(offers->at(0).resources()).reserved(framework.roles(0)),
+ createCommandInfo("sleep 1000"))})},
+ filters);
+
+ AWAIT_READY(publish);
+
+ // Test if the resources of all running executors are published.
+ // This is checked through counting how many reservatinos there are
+ // in the published resources: one (role1) when launching the first
+ // task, two (role1, role2) when the second task is launched.
+ EXPECT_EQ(i + 1, v1::Resources(publish->resources()).reservations().size());
+
+ AWAIT_READY(taskStarting);
+ EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+ AWAIT_READY(taskRunning);
+ EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+ // Store the mock scheduler and driver to prevent destruction.
+ scheds.emplace_back(std::move(sched));
+ drivers.emplace_back(std::move(driver));
+ }
+}
+
+
// This test checks that the agent correctly updates and sends
// resource version values when it registers or reregisters.
TEST_F(SlaveTest, ResourceVersions)