You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2019/08/20 10:02:37 UTC
[mesos] 07/07: Performed periodic storage local provider
reconciliations.
This is an automated email from the ASF dual-hosted git repository.
bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit deeae143ec0c2cc21137058a0f848a7081f85062
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:43 2019 +0200
Performed periodic storage local provider reconciliations.
Review: https://reviews.apache.org/r/71151/
---
src/resource_provider/storage/provider.cpp | 120 +++++++++----
.../storage_local_resource_provider_tests.cpp | 195 ++++++++++++++++++++-
2 files changed, 279 insertions(+), 36 deletions(-)
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 2f91fe0..f180af8 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -38,6 +38,7 @@
#include <mesos/v1/resource_provider.hpp>
+#include <process/after.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
@@ -79,6 +80,7 @@
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
+#include "resource_provider/constants.hpp"
#include "resource_provider/detector.hpp"
#include "resource_provider/state.hpp"
@@ -130,6 +132,7 @@ using mesos::internal::protobuf::convertLabelsToStringMap;
using mesos::internal::protobuf::convertStringMapToLabels;
using mesos::resource_provider::Call;
+using mesos::resource_provider::DEFAULT_STORAGE_RECONCILIATION_INTERVAL;
using mesos::resource_provider::Event;
using mesos::resource_provider::ResourceProviderState;
@@ -274,8 +277,10 @@ private:
Future<Nothing> reconcileResourceProviderState();
Future<Nothing> reconcileOperationStatuses();
- // Query the plugin for its resources and update the providers state.
- Future<Nothing> reconcileResources();
+ // Query the plugin for its resources and update the providers
+ // state. If `alwaysUpdate` is `true` an update will always be
+ // sent, even if no changes are detected.
+ Future<Nothing> reconcileResources(bool alwaysUpdate);
ResourceConversion computeConversion(
const Resources& checkpointed, const Resources& discovered) const;
@@ -311,6 +316,13 @@ private:
const Event::AcknowledgeOperationStatus& acknowledge);
void reconcileOperations(const Event::ReconcileOperations& reconcile);
+ // Periodically poll the provider for resource changes. The poll interval is
+ // controlled by
+ // `ResourceProviderInfo.Storage.reconciliation_interval_seconds`. When this
+ // function is invoked it will perform the first poll after one reconciliation
+ // interval.
+ void watchResources();
+
// Applies the operation. Speculative operations will be synchronously
// applied. Do nothing if the operation is already in a terminal state.
Future<Nothing> _applyOperation(const id::UUID& operationUuid);
@@ -373,6 +385,8 @@ private:
const Option<string> authToken;
const bool strict;
+ const Duration reconciliationInterval;
+
shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
Owned<Driver> driver;
@@ -442,6 +456,10 @@ StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
slaveId(_slaveId),
authToken(_authToken),
strict(_strict),
+ reconciliationInterval(
+ _info.storage().has_reconciliation_interval_seconds()
+ ? Seconds(info.storage().reconciliation_interval_seconds())
+ : DEFAULT_STORAGE_RECONCILIATION_INTERVAL),
metrics("resource_providers/" + info.type() + "." + info.name() + "/"),
resourceVersion(id::UUID::random()),
sequence("storage-local-resource-provider-sequence")
@@ -716,24 +734,73 @@ Future<Nothing>
StorageLocalResourceProviderProcess::reconcileResourceProviderState()
{
return reconcileOperationStatuses()
- .then(defer(self(), &Self::reconcileResources));
+ .then(defer(self(), &Self::reconcileResources, true))
+ .then(defer(self(), [this] {
+ statusUpdateManager.resume();
+
+ switch (state) {
+ case RECOVERING:
+ case DISCONNECTED:
+ case CONNECTED:
+ case SUBSCRIBED: {
+ LOG(INFO) << "Resource provider " << info.id()
+ << " is in READY state";
+
+ state = READY;
+ }
+ case READY:
+ break;
+ }
+
+ return Nothing();
+ }));
}
-Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources()
+void StorageLocalResourceProviderProcess::watchResources()
+{
+ // A specified reconciliation interval of zero
+ // denotes disabled periodic reconciliations.
+ if (reconciliationInterval == Seconds(0)) {
+ return;
+ }
+
+ CHECK(info.has_id());
+
+ loop(
+ self(),
+ std::bind(&process::after, reconciliationInterval),
+ [this](const Nothing&) {
+ // Poll resource provider state in `sequence` to
+ // prevent concurrent non-reconcilable operations.
+ reconciled = sequence.add(std::function<Future<Nothing>()>(
+ defer(self(), &Self::reconcileResources, false)));
+
+ return reconciled.then(
+ [](const Nothing&) -> ControlFlow<Nothing> { return Continue(); });
+ });
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources(
+ bool alwaysUpdate)
{
LOG(INFO) << "Reconciling storage pools and volumes";
+ CHECK_PENDING(reconciled);
+
return collect<vector<ResourceConversion>>(
{getExistingVolumes(), getStoragePools()})
.then(defer(
- self(), [this](const vector<vector<ResourceConversion>>& collected) {
+ self(),
+ [alwaysUpdate, this]
+ (const vector<vector<ResourceConversion>>& collected) {
Resources result = totalResources;
foreach (const vector<ResourceConversion>& conversions, collected) {
result = CHECK_NOTERROR(result.apply(conversions));
}
- bool shouldSendUpdate = false;
+ bool shouldSendUpdate = alwaysUpdate;
if (result != totalResources) {
LOG(INFO) << "Removing '" << (totalResources - result)
@@ -742,33 +809,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources()
// Update the resource version since the total resources changed.
totalResources = result;
- resourceVersion = id::UUID::random();
checkpointResourceProviderState();
shouldSendUpdate = true;
}
- switch (state) {
- case RECOVERING:
- case DISCONNECTED:
- case CONNECTED:
- case SUBSCRIBED: {
- LOG(INFO) << "Resource provider " << info.id()
- << " is in READY state";
-
- state = READY;
-
- // This is the first resource update of the current subscription.
- shouldSendUpdate = true;
- }
- case READY:
- break;
- }
-
if (shouldSendUpdate) {
sendResourceProviderStateUpdate();
- statusUpdateManager.resume();
}
return Nothing();
@@ -1168,7 +1216,7 @@ void StorageLocalResourceProviderProcess::watchProfiles()
std::function<Future<Nothing>()> update = defer(self(), [=] {
return updateProfiles(profiles)
- .then(defer(self(), &Self::reconcileResources));
+ .then(defer(self(), &Self::reconcileResources, false));
});
// Update the profile mapping and storage pools in `sequence` to wait
@@ -1261,10 +1309,12 @@ void StorageLocalResourceProviderProcess::subscribed(
// Reconcile resources after obtaining the resource provider ID and start
// watching for profile changes after the reconciliation.
// TODO(chhsiao): Reconcile and watch for profile changes early.
- reconciled = reconcileResourceProviderState()
- .onReady(defer(self(), &Self::watchProfiles))
- .onFailed(defer(self(), std::bind(die, lambda::_1)))
- .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+ reconciled =
+ reconcileResourceProviderState()
+ .onReady(defer(self(), &Self::watchProfiles))
+ .onReady(defer(self(), &Self::watchResources))
+ .onFailed(defer(self(), std::bind(die, lambda::_1)))
+ .onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
@@ -1728,7 +1778,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
// TODO(chhsiao): Consider calling `createVolume` sequentially with other
// create or delete operations, and send an `UPDATE_STATE` for storage pools
- // afterward. See MESOS-9254.
+ // afterward.
Future<VolumeInfo> created;
if (resource.disk().source().has_profile()) {
created = volumeManager->createVolume(
@@ -1866,7 +1916,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
reconciled =
sequence
.add(std::function<Future<Nothing>()>(
- defer(self(), &Self::reconcileResources)))
+ defer(self(), &Self::reconcileResources, false)))
.onFailed(std::bind(err, resource, lambda::_1))
.onDiscard(std::bind(err, resource, "future discarded"));
}
@@ -2123,6 +2173,12 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
{
+ // Set a new resource version here since we typically send state
+ // updates when resources change. While this ensures we always have
+ // a new resource version whenever we set new state, with that this
+ // function is not idempotent anymore.
+ resourceVersion = id::UUID::random();
+
Call call;
call.set_type(Call::UPDATE_STATE);
call.mutable_resource_provider_id()->CopyFrom(info.id());
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 7624ea1..05daf2a 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -96,8 +96,6 @@ using mesos::internal::slave::ContainerDaemonProcess;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
-using mesos::resource_provider::DEFAULT_STORAGE_RECONCILIATION_INTERVAL;
-
using process::Clock;
using process::Future;
using process::Owned;
@@ -1643,8 +1641,6 @@ TEST_P(StorageLocalResourceProviderTest, ProfileDisappeared)
// The resource provider will reconcile the storage pools to reclaim the
// space freed by destroying a MOUNT disk of a disappeared profile, which
// would in turn trigger another agent update and thus another allocation.
- //
- // TODO(chhsiao): This might change once MESOS-9254 is done.
AWAIT_READY(offers);
ASSERT_EQ(1, offers->offers_size());
@@ -6708,6 +6704,197 @@ TEST_P(
}
}
+
+// This test validates that the SLRP periodically
+// reconciles resources with the CSI plugin.
+TEST_P(StorageLocalResourceProviderTest, Update)
+{
+ Clock::pause();
+
+ const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+ ASSERT_SOME(
+ os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+ loadUriDiskProfileAdaptorModule(profilesPath);
+
+ const string mockCsiEndpoint =
+ "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+ MockCSIPlugin plugin;
+ ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+ constexpr Duration reconciliationInterval = Seconds(15);
+
+ setupResourceProviderConfig(
+ Bytes(0),
+ None(),
+ mockCsiEndpoint,
+ None(),
+ None(),
+ reconciliationInterval);
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ process::Queue<Nothing> getCapacityCalls;
+ process::Queue<Nothing> listVolumesCalls;
+ if (GetParam() == csi::v0::API_VERSION) {
+ EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>()))
+ .WillOnce(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v0::ListVolumesRequest* request,
+ csi::v0::ListVolumesResponse* response) {
+ listVolumesCalls.put({});
+ return grpc::Status::OK;
+ }))
+ .WillRepeatedly(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v0::ListVolumesRequest* request,
+ csi::v0::ListVolumesResponse* response) {
+ csi::v0::Volume* volume = response->add_entries()->mutable_volume();
+ volume->set_capacity_bytes(Bytes(1024).bytes());
+ volume->set_id("volume1");
+
+ listVolumesCalls.put({});
+ return grpc::Status::OK;
+ }));
+
+ EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>()))
+ .WillOnce(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v0::GetCapacityRequest* request,
+ csi::v0::GetCapacityResponse* response) {
+ getCapacityCalls.put({});
+ return grpc::Status::OK;
+ }))
+ .WillRepeatedly(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v0::GetCapacityRequest* request,
+ csi::v0::GetCapacityResponse* response) {
+ response->set_available_capacity(Bytes(1024).bytes());
+
+ getCapacityCalls.put({});
+ return grpc::Status::OK;
+ }));
+ } else if (GetParam() == csi::v1::API_VERSION) {
+ EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>()))
+ .WillOnce(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v1::ListVolumesRequest* request,
+ csi::v1::ListVolumesResponse* response) {
+ listVolumesCalls.put({});
+ return grpc::Status::OK;
+ }))
+ .WillRepeatedly(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v1::ListVolumesRequest* request,
+ csi::v1::ListVolumesResponse* response) {
+ csi::v1::Volume* volume = response->add_entries()->mutable_volume();
+ volume->set_capacity_bytes(Bytes(1024).bytes());
+ volume->set_volume_id("volume1");
+
+ listVolumesCalls.put({});
+ return grpc::Status::OK;
+ }));
+
+ EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v1::GetCapacityResponse*>()))
+ .WillOnce(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v1::GetCapacityRequest* request,
+ csi::v1::GetCapacityResponse* response) {
+ getCapacityCalls.put({});
+ return grpc::Status::OK;
+ }))
+ .WillRepeatedly(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v1::GetCapacityRequest* request,
+ csi::v1::GetCapacityResponse* response) {
+ response->set_available_capacity(Bytes(1024).bytes());
+
+ getCapacityCalls.put({});
+ return grpc::Status::OK;
+ }));
+ }
+
+ Future<Nothing> listVolumes1 = listVolumesCalls.get();
+ Future<Nothing> listVolumes2 = listVolumesCalls.get();
+
+ Future<Nothing> getCapacity1 = getCapacityCalls.get();
+ Future<Nothing> getCapacity2 = getCapacityCalls.get();
+
+
+ // Since the local resource provider daemon gets subscribed after the agent
+ // is registered, it is guaranteed that the slave will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from
+ // the storage local resource provider. After that a single update
+ // will be send since they underlying provider resources got changed.
+ //
+ // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+ // Google Mock will search the expectations in reverse order.
+ Future<UpdateSlaveMessage> updateSlave3 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave2 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave1 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration and prevent retry.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlave1);
+ ASSERT_TRUE(updateSlave1->has_resource_providers());
+ ASSERT_TRUE(updateSlave1->resource_providers().providers().empty());
+
+ // NOTE: We need to resume the clock so that the resource provider can
+ // periodically check if the CSI endpoint socket has been created by
+ // the plugin container, which runs in another Linux process.
+ Clock::resume();
+
+ AWAIT_READY(getCapacity1);
+ AWAIT_READY(listVolumes1);
+
+ AWAIT_READY(updateSlave2);
+ ASSERT_TRUE(updateSlave2->has_resource_providers());
+ ASSERT_FALSE(updateSlave2->resource_providers().providers().empty());
+
+ Clock::pause();
+
+ // Advance the clock so the SLRP polls for volume and storage pool updates.
+ Clock::settle();
+ Clock::advance(reconciliationInterval);
+
+ AWAIT_READY(listVolumes2);
+ AWAIT_READY(getCapacity2);
+ ASSERT_TRUE(updateSlave3.isPending());
+
+ // Advance the clock so the SLRP polls again.
+ Future<Nothing> listVolumes3 = listVolumesCalls.get();
+ Future<Nothing> getCapacity3 = getCapacityCalls.get();
+
+ Clock::settle();
+ Clock::advance(reconciliationInterval);
+
+ AWAIT_READY(listVolumes3);
+ AWAIT_READY(getCapacity3);
+ AWAIT_READY(updateSlave3);
+ ASSERT_TRUE(updateSlave3->has_resource_providers());
+ ASSERT_FALSE(updateSlave3->resource_providers().providers().empty());
+
+ // Resource changes are reported and the resource version changes.
+ ASSERT_NE(
+ updateSlave2->resource_providers().providers(0).resource_version_uuid(),
+ updateSlave3->resource_providers().providers(0).resource_version_uuid());
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {