You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:08 UTC

[mesos] 11/15: Made `StorageLocalResourceProviderProcess` no longer exposed.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 995873fbaad780d80f0d1aae285c388211e993c2
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:24:00 2019 -0700

    Made `StorageLocalResourceProviderProcess` no longer exposed.
    
    Since we now moved the CSI calling logics to `v0::VolumeManagerProcess`,
    SLRP tests no longer rely on intercepting member function dispatches of
    `StorageLocalResourceProviderProcess`, so we undo the exposure made in
    r/69827 (commit e9d2a296081).
    
    Review: https://reviews.apache.org/r/70223/
---
 src/Makefile.am                                    |   3 +-
 src/resource_provider/storage/provider.cpp         | 198 +++++++++++++++--
 src/resource_provider/storage/provider_process.hpp | 240 ---------------------
 3 files changed, 187 insertions(+), 254 deletions(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index 89a4090..d03e56f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1528,8 +1528,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/storage/disk_profile_utils.cpp			\
   resource_provider/storage/disk_profile_utils.hpp			\
   resource_provider/storage/provider.cpp				\
-  resource_provider/storage/provider.hpp				\
-  resource_provider/storage/provider_process.hpp
+  resource_provider/storage/provider.hpp
 
 libmesos_no_3rdparty_la_CPPFLAGS = $(MESOS_CPPFLAGS)
 
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index fba5b18..2dc5c26 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -28,6 +28,16 @@
 
 #include <glog/logging.h>
 
+#include <mesos/http.hpp>
+#include <mesos/resources.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
+
+#include <mesos/v1/resource_provider.hpp>
+
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
@@ -36,21 +46,12 @@
 #include <process/id.hpp>
 #include <process/loop.hpp>
 #include <process/process.hpp>
+#include <process/sequence.hpp>
 
 #include <process/metrics/counter.hpp>
 #include <process/metrics/metrics.hpp>
 #include <process/metrics/push_gauge.hpp>
 
-#include <mesos/http.hpp>
-#include <mesos/resources.hpp>
-#include <mesos/type_utils.hpp>
-
-#include <mesos/resource_provider/resource_provider.hpp>
-
-#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
-
-#include <mesos/v1/resource_provider.hpp>
-
 #include <stout/bytes.hpp>
 #include <stout/duration.hpp>
 #include <stout/foreach.hpp>
@@ -79,8 +80,6 @@
 #include "resource_provider/detector.hpp"
 #include "resource_provider/state.hpp"
 
-#include "resource_provider/storage/provider_process.hpp"
-
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
 
@@ -106,8 +105,10 @@ using process::Failure;
 using process::Future;
 using process::loop;
 using process::Owned;
+using process::Process;
 using process::ProcessBase;
 using process::Promise;
+using process::Sequence;
 using process::spawn;
 
 using process::grpc::StatusError;
@@ -224,6 +225,179 @@ static inline Resource createRawDiskResource(
 }
 
 
+class StorageLocalResourceProviderProcess
+  : public Process<StorageLocalResourceProviderProcess>
+{
+public:
+  explicit StorageLocalResourceProviderProcess(
+      const http::URL& _url,
+      const string& _workDir,
+      const ResourceProviderInfo& _info,
+      const SlaveID& _slaveId,
+      const Option<string>& _authToken,
+      bool _strict);
+
+  StorageLocalResourceProviderProcess(
+      const StorageLocalResourceProviderProcess& other) = delete;
+
+  StorageLocalResourceProviderProcess& operator=(
+      const StorageLocalResourceProviderProcess& other) = delete;
+
+  void connected();
+  void disconnected();
+  void received(const Event& event);
+
+private:
+  void initialize() override;
+  void fatal();
+
+  Future<Nothing> recover();
+
+  void doReliableRegistration();
+
+  // The reconcile functions are responsible to reconcile the state of
+  // the resource provider from the recovered state and other sources of
+  // truth, such as CSI plugin responses or the status update manager.
+  Future<Nothing> reconcileResourceProviderState();
+  Future<Nothing> reconcileOperationStatuses();
+  ResourceConversion reconcileResources(
+      const Resources& checkpointed,
+      const Resources& discovered);
+
+  Future<Resources> getRawVolumes();
+  Future<Resources> getStoragePools();
+
+  // Spawns a loop to watch for changes in the set of known profiles and update
+  // the profile mapping and storage pools accordingly.
+  void watchProfiles();
+
+  // Update the profile mapping when the set of known profiles changes.
+  // NOTE: This function never fails. If it fails to translate a new
+  // profile, the resource provider will continue to operate with the
+  // set of profiles it knows about.
+  Future<Nothing> updateProfiles(const hashset<string>& profiles);
+
+  // Reconcile the storage pools when the set of known profiles changes,
+  // or a volume with an unknown profile is destroyed.
+  Future<Nothing> reconcileStoragePools();
+
+  // Returns true if the storage pools are allowed to be reconciled when
+  // the operation is being applied.
+  static bool allowsReconciliation(const Offer::Operation& operation);
+
+  // Functions for received events.
+  void subscribed(const Event::Subscribed& subscribed);
+  void applyOperation(const Event::ApplyOperation& operation);
+  void publishResources(const Event::PublishResources& publish);
+  void acknowledgeOperationStatus(
+      const Event::AcknowledgeOperationStatus& acknowledge);
+  void reconcileOperations(const Event::ReconcileOperations& reconcile);
+
+  // Applies the operation. Speculative operations will be synchronously
+  // applied. Do nothing if the operation is already in a terminal state.
+  Future<Nothing> _applyOperation(const id::UUID& operationUuid);
+
+  // Sends `OPERATION_DROPPED` status update. The operation status will be
+  // checkpointed if `operation` is set.
+  void dropOperation(
+      const id::UUID& operationUuid,
+      const Option<FrameworkID>& frameworkId,
+      const Option<Offer::Operation>& operation,
+      const string& message);
+
+  Future<vector<ResourceConversion>> applyCreateDisk(
+      const Resource& resource,
+      const id::UUID& operationUuid,
+      const Resource::DiskInfo::Source::Type& targetType,
+      const Option<string>& targetProfile);
+
+  Future<vector<ResourceConversion>> applyDestroyDisk(
+      const Resource& resource);
+
+  // Synchronously creates persistent volumes.
+  Try<vector<ResourceConversion>> applyCreate(
+      const Offer::Operation& operation) const;
+
+  // Synchronously cleans up and destroys persistent volumes.
+  Try<vector<ResourceConversion>> applyDestroy(
+      const Offer::Operation& operation) const;
+
+  // Synchronously updates `totalResources` and the operation status and
+  // then asks the status update manager to send status updates.
+  Try<Nothing> updateOperationStatus(
+      const id::UUID& operationUuid,
+      const Try<vector<ResourceConversion>>& conversions);
+
+  void garbageCollectOperationPath(const id::UUID& operationUuid);
+
+  void checkpointResourceProviderState();
+
+  void sendResourceProviderStateUpdate();
+
+  void sendOperationStatusUpdate(const UpdateOperationStatusMessage& update);
+
+  enum State
+  {
+    RECOVERING,
+    DISCONNECTED,
+    CONNECTED,
+    SUBSCRIBED,
+    READY
+  } state;
+
+  const http::URL url;
+  const string workDir;
+  const string metaDir;
+  const ContentType contentType;
+  ResourceProviderInfo info;
+  const string vendor;
+  const SlaveID slaveId;
+  const Option<string> authToken;
+  const bool strict;
+
+  shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
+
+  Owned<Driver> driver;
+  OperationStatusUpdateManager statusUpdateManager;
+
+  // The mapping of known profiles fetched from the DiskProfileAdaptor.
+  hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
+
+  Owned<VolumeManager> volumeManager;
+
+  // We maintain the following invariant: if one operation depends on
+  // another, they cannot be in PENDING state at the same time, i.e.,
+  // the result of the preceding operation must have been reflected in
+  // the total resources.
+  //
+  // NOTE: We store the list of operations in a `LinkedHashMap` to
+  // preserve the order we receive the operations in case we need it.
+  LinkedHashMap<id::UUID, Operation> operations;
+  Resources totalResources;
+  id::UUID resourceVersion;
+
+  // If pending, it means that the storage pools are being reconciled, and all
+  // incoming operations that disallow reconciliation will be dropped.
+  Future<Nothing> reconciled;
+
+  // We maintain a sequence to coordinate reconciliations of storage pools. It
+  // keeps track of pending operations that disallow reconciliation, and ensures
+  // that any reconciliation waits for these operations to finish.
+  Sequence sequence;
+
+  struct Metrics : public csi::Metrics
+  {
+    explicit Metrics(const string& prefix);
+    ~Metrics();
+
+    hashmap<Offer::Operation::Type, PushGauge> operations_pending;
+    hashmap<Offer::Operation::Type, Counter> operations_finished;
+    hashmap<Offer::Operation::Type, Counter> operations_failed;
+    hashmap<Offer::Operation::Type, Counter> operations_dropped;
+  } metrics;
+};
+
+
 StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
     const http::URL& _url,
     const string& _workDir,
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
deleted file mode 100644
index 6e14d90..0000000
--- a/src/resource_provider/storage/provider_process.hpp
+++ /dev/null
@@ -1,240 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
-#define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
-
-#include <memory>
-#include <string>
-#include <vector>
-
-#include <mesos/mesos.hpp>
-#include <mesos/resources.hpp>
-
-#include <mesos/resource_provider/resource_provider.hpp>
-
-#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
-
-#include <mesos/v1/resource_provider.hpp>
-
-#include <process/future.hpp>
-#include <process/http.hpp>
-#include <process/owned.hpp>
-#include <process/process.hpp>
-#include <process/sequence.hpp>
-
-#include <process/metrics/counter.hpp>
-#include <process/metrics/push_gauge.hpp>
-
-#include <stout/hashset.hpp>
-#include <stout/linkedhashmap.hpp>
-#include <stout/nothing.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
-#include <stout/uuid.hpp>
-
-#include "csi/metrics.hpp"
-#include "csi/volume_manager.hpp"
-
-#include "status_update_manager/operation.hpp"
-
-namespace mesos {
-namespace internal {
-
-class StorageLocalResourceProviderProcess
-  : public process::Process<StorageLocalResourceProviderProcess>
-{
-public:
-  explicit StorageLocalResourceProviderProcess(
-      const process::http::URL& _url,
-      const std::string& _workDir,
-      const ResourceProviderInfo& _info,
-      const SlaveID& _slaveId,
-      const Option<std::string>& _authToken,
-      bool _strict);
-
-  StorageLocalResourceProviderProcess(
-      const StorageLocalResourceProviderProcess& other) = delete;
-
-  StorageLocalResourceProviderProcess& operator=(
-      const StorageLocalResourceProviderProcess& other) = delete;
-
-  void connected();
-  void disconnected();
-  void received(const resource_provider::Event& event);
-
-private:
-  void initialize() override;
-  void fatal();
-
-  process::Future<Nothing> recover();
-
-  void doReliableRegistration();
-
-  // The reconcile functions are responsible to reconcile the state of
-  // the resource provider from the recovered state and other sources of
-  // truth, such as CSI plugin responses or the status update manager.
-  process::Future<Nothing> reconcileResourceProviderState();
-  process::Future<Nothing> reconcileOperationStatuses();
-  ResourceConversion reconcileResources(
-      const Resources& checkpointed,
-      const Resources& discovered);
-
-  process::Future<Resources> getRawVolumes();
-  process::Future<Resources> getStoragePools();
-
-  // Spawns a loop to watch for changes in the set of known profiles and update
-  // the profile mapping and storage pools accordingly.
-  void watchProfiles();
-
-  // Update the profile mapping when the set of known profiles changes.
-  // NOTE: This function never fails. If it fails to translate a new
-  // profile, the resource provider will continue to operate with the
-  // set of profiles it knows about.
-  process::Future<Nothing> updateProfiles(const hashset<std::string>& profiles);
-
-  // Reconcile the storage pools when the set of known profiles changes,
-  // or a volume with an unknown profile is destroyed.
-  process::Future<Nothing> reconcileStoragePools();
-
-  // Returns true if the storage pools are allowed to be reconciled when
-  // the operation is being applied.
-  static bool allowsReconciliation(const Offer::Operation& operation);
-
-  // Functions for received events.
-  void subscribed(const resource_provider::Event::Subscribed& subscribed);
-  void applyOperation(
-      const resource_provider::Event::ApplyOperation& operation);
-  void publishResources(
-      const resource_provider::Event::PublishResources& publish);
-  void acknowledgeOperationStatus(
-      const resource_provider::Event::AcknowledgeOperationStatus& acknowledge);
-  void reconcileOperations(
-      const resource_provider::Event::ReconcileOperations& reconcile);
-
-  // Applies the operation. Speculative operations will be synchronously
-  // applied. Do nothing if the operation is already in a terminal state.
-  process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);
-
-  // Sends `OPERATION_DROPPED` status update. The operation status will be
-  // checkpointed if `operation` is set.
-  void dropOperation(
-      const id::UUID& operationUuid,
-      const Option<FrameworkID>& frameworkId,
-      const Option<Offer::Operation>& operation,
-      const std::string& message);
-
-  process::Future<std::vector<ResourceConversion>> applyCreateDisk(
-      const Resource& resource,
-      const id::UUID& operationUuid,
-      const Resource::DiskInfo::Source::Type& targetType,
-      const Option<std::string>& targetProfile);
-
-  process::Future<std::vector<ResourceConversion>> applyDestroyDisk(
-      const Resource& resource);
-
-  // Synchronously creates persistent volumes.
-  Try<std::vector<ResourceConversion>> applyCreate(
-      const Offer::Operation& operation) const;
-
-  // Synchronously cleans up and destroys persistent volumes.
-  Try<std::vector<ResourceConversion>> applyDestroy(
-      const Offer::Operation& operation) const;
-
-  // Synchronously updates `totalResources` and the operation status and
-  // then asks the status update manager to send status updates.
-  Try<Nothing> updateOperationStatus(
-      const id::UUID& operationUuid,
-      const Try<std::vector<ResourceConversion>>& conversions);
-
-  void garbageCollectOperationPath(const id::UUID& operationUuid);
-
-  void checkpointResourceProviderState();
-
-  void sendResourceProviderStateUpdate();
-
-  void sendOperationStatusUpdate(
-      const UpdateOperationStatusMessage& update);
-
-  enum State
-  {
-    RECOVERING,
-    DISCONNECTED,
-    CONNECTED,
-    SUBSCRIBED,
-    READY
-  } state;
-
-  const process::http::URL url;
-  const std::string workDir;
-  const std::string metaDir;
-  const ContentType contentType;
-  ResourceProviderInfo info;
-  const std::string vendor;
-  const SlaveID slaveId;
-  const Option<std::string> authToken;
-  const bool strict;
-
-  std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
-
-  process::Owned<v1::resource_provider::Driver> driver;
-  OperationStatusUpdateManager statusUpdateManager;
-
-  // The mapping of known profiles fetched from the DiskProfileAdaptor.
-  hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;
-
-  process::Owned<csi::VolumeManager> volumeManager;
-
-  // We maintain the following invariant: if one operation depends on
-  // another, they cannot be in PENDING state at the same time, i.e.,
-  // the result of the preceding operation must have been reflected in
-  // the total resources.
-  //
-  // NOTE: We store the list of operations in a `LinkedHashMap` to
-  // preserve the order we receive the operations in case we need it.
-  LinkedHashMap<id::UUID, Operation> operations;
-  Resources totalResources;
-  id::UUID resourceVersion;
-
-  // If pending, it means that the storage pools are being reconciled, and all
-  // incoming operations that disallow reconciliation will be dropped.
-  process::Future<Nothing> reconciled;
-
-  // We maintain a sequence to coordinate reconciliations of storage pools. It
-  // keeps track of pending operations that disallow reconciliation, and ensures
-  // that any reconciliation waits for these operations to finish.
-  process::Sequence sequence;
-
-  struct Metrics : public csi::Metrics
-  {
-    explicit Metrics(const std::string& prefix);
-    ~Metrics();
-
-    hashmap<Offer::Operation::Type, process::metrics::PushGauge>
-      operations_pending;
-    hashmap<Offer::Operation::Type, process::metrics::Counter>
-      operations_finished;
-    hashmap<Offer::Operation::Type, process::metrics::Counter>
-      operations_failed;
-    hashmap<Offer::Operation::Type, process::metrics::Counter>
-      operations_dropped;
-  } metrics;
-};
-
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__