You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:08 UTC
[mesos] 11/15: Made `StorageLocalResourceProviderProcess` no longer
exposed.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 995873fbaad780d80f0d1aae285c388211e993c2
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:24:00 2019 -0700
Made `StorageLocalResourceProviderProcess` no longer exposed.
Since we now moved the CSI calling logics to `v0::VolumeManagerProcess`,
SLRP tests no longer rely on intercepting member function dispatches of
`StorageLocalResourceProviderProcess`, so we undo the exposure made in
r/69827 (commit e9d2a296081).
Review: https://reviews.apache.org/r/70223/
---
src/Makefile.am | 3 +-
src/resource_provider/storage/provider.cpp | 198 +++++++++++++++--
src/resource_provider/storage/provider_process.hpp | 240 ---------------------
3 files changed, 187 insertions(+), 254 deletions(-)
diff --git a/src/Makefile.am b/src/Makefile.am
index 89a4090..d03e56f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1528,8 +1528,7 @@ libmesos_no_3rdparty_la_SOURCES += \
resource_provider/storage/disk_profile_utils.cpp \
resource_provider/storage/disk_profile_utils.hpp \
resource_provider/storage/provider.cpp \
- resource_provider/storage/provider.hpp \
- resource_provider/storage/provider_process.hpp
+ resource_provider/storage/provider.hpp
libmesos_no_3rdparty_la_CPPFLAGS = $(MESOS_CPPFLAGS)
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index fba5b18..2dc5c26 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -28,6 +28,16 @@
#include <glog/logging.h>
+#include <mesos/http.hpp>
+#include <mesos/resources.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
+
+#include <mesos/v1/resource_provider.hpp>
+
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
@@ -36,21 +46,12 @@
#include <process/id.hpp>
#include <process/loop.hpp>
#include <process/process.hpp>
+#include <process/sequence.hpp>
#include <process/metrics/counter.hpp>
#include <process/metrics/metrics.hpp>
#include <process/metrics/push_gauge.hpp>
-#include <mesos/http.hpp>
-#include <mesos/resources.hpp>
-#include <mesos/type_utils.hpp>
-
-#include <mesos/resource_provider/resource_provider.hpp>
-
-#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
-
-#include <mesos/v1/resource_provider.hpp>
-
#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
@@ -79,8 +80,6 @@
#include "resource_provider/detector.hpp"
#include "resource_provider/state.hpp"
-#include "resource_provider/storage/provider_process.hpp"
-
#include "slave/paths.hpp"
#include "slave/state.hpp"
@@ -106,8 +105,10 @@ using process::Failure;
using process::Future;
using process::loop;
using process::Owned;
+using process::Process;
using process::ProcessBase;
using process::Promise;
+using process::Sequence;
using process::spawn;
using process::grpc::StatusError;
@@ -224,6 +225,179 @@ static inline Resource createRawDiskResource(
}
+class StorageLocalResourceProviderProcess
+ : public Process<StorageLocalResourceProviderProcess>
+{
+public:
+ explicit StorageLocalResourceProviderProcess(
+ const http::URL& _url,
+ const string& _workDir,
+ const ResourceProviderInfo& _info,
+ const SlaveID& _slaveId,
+ const Option<string>& _authToken,
+ bool _strict);
+
+ StorageLocalResourceProviderProcess(
+ const StorageLocalResourceProviderProcess& other) = delete;
+
+ StorageLocalResourceProviderProcess& operator=(
+ const StorageLocalResourceProviderProcess& other) = delete;
+
+ void connected();
+ void disconnected();
+ void received(const Event& event);
+
+private:
+ void initialize() override;
+ void fatal();
+
+ Future<Nothing> recover();
+
+ void doReliableRegistration();
+
+ // The reconcile functions are responsible to reconcile the state of
+ // the resource provider from the recovered state and other sources of
+ // truth, such as CSI plugin responses or the status update manager.
+ Future<Nothing> reconcileResourceProviderState();
+ Future<Nothing> reconcileOperationStatuses();
+ ResourceConversion reconcileResources(
+ const Resources& checkpointed,
+ const Resources& discovered);
+
+ Future<Resources> getRawVolumes();
+ Future<Resources> getStoragePools();
+
+ // Spawns a loop to watch for changes in the set of known profiles and update
+ // the profile mapping and storage pools accordingly.
+ void watchProfiles();
+
+ // Update the profile mapping when the set of known profiles changes.
+ // NOTE: This function never fails. If it fails to translate a new
+ // profile, the resource provider will continue to operate with the
+ // set of profiles it knows about.
+ Future<Nothing> updateProfiles(const hashset<string>& profiles);
+
+ // Reconcile the storage pools when the set of known profiles changes,
+ // or a volume with an unknown profile is destroyed.
+ Future<Nothing> reconcileStoragePools();
+
+ // Returns true if the storage pools are allowed to be reconciled when
+ // the operation is being applied.
+ static bool allowsReconciliation(const Offer::Operation& operation);
+
+ // Functions for received events.
+ void subscribed(const Event::Subscribed& subscribed);
+ void applyOperation(const Event::ApplyOperation& operation);
+ void publishResources(const Event::PublishResources& publish);
+ void acknowledgeOperationStatus(
+ const Event::AcknowledgeOperationStatus& acknowledge);
+ void reconcileOperations(const Event::ReconcileOperations& reconcile);
+
+ // Applies the operation. Speculative operations will be synchronously
+ // applied. Do nothing if the operation is already in a terminal state.
+ Future<Nothing> _applyOperation(const id::UUID& operationUuid);
+
+ // Sends `OPERATION_DROPPED` status update. The operation status will be
+ // checkpointed if `operation` is set.
+ void dropOperation(
+ const id::UUID& operationUuid,
+ const Option<FrameworkID>& frameworkId,
+ const Option<Offer::Operation>& operation,
+ const string& message);
+
+ Future<vector<ResourceConversion>> applyCreateDisk(
+ const Resource& resource,
+ const id::UUID& operationUuid,
+ const Resource::DiskInfo::Source::Type& targetType,
+ const Option<string>& targetProfile);
+
+ Future<vector<ResourceConversion>> applyDestroyDisk(
+ const Resource& resource);
+
+ // Synchronously creates persistent volumes.
+ Try<vector<ResourceConversion>> applyCreate(
+ const Offer::Operation& operation) const;
+
+ // Synchronously cleans up and destroys persistent volumes.
+ Try<vector<ResourceConversion>> applyDestroy(
+ const Offer::Operation& operation) const;
+
+ // Synchronously updates `totalResources` and the operation status and
+ // then asks the status update manager to send status updates.
+ Try<Nothing> updateOperationStatus(
+ const id::UUID& operationUuid,
+ const Try<vector<ResourceConversion>>& conversions);
+
+ void garbageCollectOperationPath(const id::UUID& operationUuid);
+
+ void checkpointResourceProviderState();
+
+ void sendResourceProviderStateUpdate();
+
+ void sendOperationStatusUpdate(const UpdateOperationStatusMessage& update);
+
+ enum State
+ {
+ RECOVERING,
+ DISCONNECTED,
+ CONNECTED,
+ SUBSCRIBED,
+ READY
+ } state;
+
+ const http::URL url;
+ const string workDir;
+ const string metaDir;
+ const ContentType contentType;
+ ResourceProviderInfo info;
+ const string vendor;
+ const SlaveID slaveId;
+ const Option<string> authToken;
+ const bool strict;
+
+ shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
+
+ Owned<Driver> driver;
+ OperationStatusUpdateManager statusUpdateManager;
+
+ // The mapping of known profiles fetched from the DiskProfileAdaptor.
+ hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
+
+ Owned<VolumeManager> volumeManager;
+
+ // We maintain the following invariant: if one operation depends on
+ // another, they cannot be in PENDING state at the same time, i.e.,
+ // the result of the preceding operation must have been reflected in
+ // the total resources.
+ //
+ // NOTE: We store the list of operations in a `LinkedHashMap` to
+ // preserve the order we receive the operations in case we need it.
+ LinkedHashMap<id::UUID, Operation> operations;
+ Resources totalResources;
+ id::UUID resourceVersion;
+
+ // If pending, it means that the storage pools are being reconciled, and all
+ // incoming operations that disallow reconciliation will be dropped.
+ Future<Nothing> reconciled;
+
+ // We maintain a sequence to coordinate reconciliations of storage pools. It
+ // keeps track of pending operations that disallow reconciliation, and ensures
+ // that any reconciliation waits for these operations to finish.
+ Sequence sequence;
+
+ struct Metrics : public csi::Metrics
+ {
+ explicit Metrics(const string& prefix);
+ ~Metrics();
+
+ hashmap<Offer::Operation::Type, PushGauge> operations_pending;
+ hashmap<Offer::Operation::Type, Counter> operations_finished;
+ hashmap<Offer::Operation::Type, Counter> operations_failed;
+ hashmap<Offer::Operation::Type, Counter> operations_dropped;
+ } metrics;
+};
+
+
StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
const http::URL& _url,
const string& _workDir,
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
deleted file mode 100644
index 6e14d90..0000000
--- a/src/resource_provider/storage/provider_process.hpp
+++ /dev/null
@@ -1,240 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
-#define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
-
-#include <memory>
-#include <string>
-#include <vector>
-
-#include <mesos/mesos.hpp>
-#include <mesos/resources.hpp>
-
-#include <mesos/resource_provider/resource_provider.hpp>
-
-#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
-
-#include <mesos/v1/resource_provider.hpp>
-
-#include <process/future.hpp>
-#include <process/http.hpp>
-#include <process/owned.hpp>
-#include <process/process.hpp>
-#include <process/sequence.hpp>
-
-#include <process/metrics/counter.hpp>
-#include <process/metrics/push_gauge.hpp>
-
-#include <stout/hashset.hpp>
-#include <stout/linkedhashmap.hpp>
-#include <stout/nothing.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
-#include <stout/uuid.hpp>
-
-#include "csi/metrics.hpp"
-#include "csi/volume_manager.hpp"
-
-#include "status_update_manager/operation.hpp"
-
-namespace mesos {
-namespace internal {
-
-class StorageLocalResourceProviderProcess
- : public process::Process<StorageLocalResourceProviderProcess>
-{
-public:
- explicit StorageLocalResourceProviderProcess(
- const process::http::URL& _url,
- const std::string& _workDir,
- const ResourceProviderInfo& _info,
- const SlaveID& _slaveId,
- const Option<std::string>& _authToken,
- bool _strict);
-
- StorageLocalResourceProviderProcess(
- const StorageLocalResourceProviderProcess& other) = delete;
-
- StorageLocalResourceProviderProcess& operator=(
- const StorageLocalResourceProviderProcess& other) = delete;
-
- void connected();
- void disconnected();
- void received(const resource_provider::Event& event);
-
-private:
- void initialize() override;
- void fatal();
-
- process::Future<Nothing> recover();
-
- void doReliableRegistration();
-
- // The reconcile functions are responsible to reconcile the state of
- // the resource provider from the recovered state and other sources of
- // truth, such as CSI plugin responses or the status update manager.
- process::Future<Nothing> reconcileResourceProviderState();
- process::Future<Nothing> reconcileOperationStatuses();
- ResourceConversion reconcileResources(
- const Resources& checkpointed,
- const Resources& discovered);
-
- process::Future<Resources> getRawVolumes();
- process::Future<Resources> getStoragePools();
-
- // Spawns a loop to watch for changes in the set of known profiles and update
- // the profile mapping and storage pools accordingly.
- void watchProfiles();
-
- // Update the profile mapping when the set of known profiles changes.
- // NOTE: This function never fails. If it fails to translate a new
- // profile, the resource provider will continue to operate with the
- // set of profiles it knows about.
- process::Future<Nothing> updateProfiles(const hashset<std::string>& profiles);
-
- // Reconcile the storage pools when the set of known profiles changes,
- // or a volume with an unknown profile is destroyed.
- process::Future<Nothing> reconcileStoragePools();
-
- // Returns true if the storage pools are allowed to be reconciled when
- // the operation is being applied.
- static bool allowsReconciliation(const Offer::Operation& operation);
-
- // Functions for received events.
- void subscribed(const resource_provider::Event::Subscribed& subscribed);
- void applyOperation(
- const resource_provider::Event::ApplyOperation& operation);
- void publishResources(
- const resource_provider::Event::PublishResources& publish);
- void acknowledgeOperationStatus(
- const resource_provider::Event::AcknowledgeOperationStatus& acknowledge);
- void reconcileOperations(
- const resource_provider::Event::ReconcileOperations& reconcile);
-
- // Applies the operation. Speculative operations will be synchronously
- // applied. Do nothing if the operation is already in a terminal state.
- process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);
-
- // Sends `OPERATION_DROPPED` status update. The operation status will be
- // checkpointed if `operation` is set.
- void dropOperation(
- const id::UUID& operationUuid,
- const Option<FrameworkID>& frameworkId,
- const Option<Offer::Operation>& operation,
- const std::string& message);
-
- process::Future<std::vector<ResourceConversion>> applyCreateDisk(
- const Resource& resource,
- const id::UUID& operationUuid,
- const Resource::DiskInfo::Source::Type& targetType,
- const Option<std::string>& targetProfile);
-
- process::Future<std::vector<ResourceConversion>> applyDestroyDisk(
- const Resource& resource);
-
- // Synchronously creates persistent volumes.
- Try<std::vector<ResourceConversion>> applyCreate(
- const Offer::Operation& operation) const;
-
- // Synchronously cleans up and destroys persistent volumes.
- Try<std::vector<ResourceConversion>> applyDestroy(
- const Offer::Operation& operation) const;
-
- // Synchronously updates `totalResources` and the operation status and
- // then asks the status update manager to send status updates.
- Try<Nothing> updateOperationStatus(
- const id::UUID& operationUuid,
- const Try<std::vector<ResourceConversion>>& conversions);
-
- void garbageCollectOperationPath(const id::UUID& operationUuid);
-
- void checkpointResourceProviderState();
-
- void sendResourceProviderStateUpdate();
-
- void sendOperationStatusUpdate(
- const UpdateOperationStatusMessage& update);
-
- enum State
- {
- RECOVERING,
- DISCONNECTED,
- CONNECTED,
- SUBSCRIBED,
- READY
- } state;
-
- const process::http::URL url;
- const std::string workDir;
- const std::string metaDir;
- const ContentType contentType;
- ResourceProviderInfo info;
- const std::string vendor;
- const SlaveID slaveId;
- const Option<std::string> authToken;
- const bool strict;
-
- std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
-
- process::Owned<v1::resource_provider::Driver> driver;
- OperationStatusUpdateManager statusUpdateManager;
-
- // The mapping of known profiles fetched from the DiskProfileAdaptor.
- hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;
-
- process::Owned<csi::VolumeManager> volumeManager;
-
- // We maintain the following invariant: if one operation depends on
- // another, they cannot be in PENDING state at the same time, i.e.,
- // the result of the preceding operation must have been reflected in
- // the total resources.
- //
- // NOTE: We store the list of operations in a `LinkedHashMap` to
- // preserve the order we receive the operations in case we need it.
- LinkedHashMap<id::UUID, Operation> operations;
- Resources totalResources;
- id::UUID resourceVersion;
-
- // If pending, it means that the storage pools are being reconciled, and all
- // incoming operations that disallow reconciliation will be dropped.
- process::Future<Nothing> reconciled;
-
- // We maintain a sequence to coordinate reconciliations of storage pools. It
- // keeps track of pending operations that disallow reconciliation, and ensures
- // that any reconciliation waits for these operations to finish.
- process::Sequence sequence;
-
- struct Metrics : public csi::Metrics
- {
- explicit Metrics(const std::string& prefix);
- ~Metrics();
-
- hashmap<Offer::Operation::Type, process::metrics::PushGauge>
- operations_pending;
- hashmap<Offer::Operation::Type, process::metrics::Counter>
- operations_finished;
- hashmap<Offer::Operation::Type, process::metrics::Counter>
- operations_failed;
- hashmap<Offer::Operation::Type, process::metrics::Counter>
- operations_dropped;
- } metrics;
-};
-
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__