You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/11/29 18:31:24 UTC

[mesos] branch master updated (066e0f8 -> bf4e8b3)

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 066e0f8  Fixed Mesos-Tidy warning: use of redundant 'get'.
     new 252b554  Changed the semantics of `CREATE_DISK` and `DESTROY_DISK` operations.
     new d9bd4d0  Added profiles to storage pools in tests for `CREATE_DISK`.
     new 0c2b715  Rewrote test `ReconcileDroppedOperation` for `CREATE_DISK`.
     new 4c9f1a0  Rewrote test `ConvertPreExistingVolume` for `CREATE_DISK`.
     new f11de38  Implemented the new `CREATE_DISK`/`DESTROY_DISK` semantics in SLRP.
     new 20b5dda  Added validation for `Offer.Operation.CreateDisk.target_profile`.
     new 83ed35c  Cleaned up `include/mesos/type_utils.hpp`.
     new 67b9972  Checkpointed creation parameters for CSI volumes.
     new 96e6eb1  Added MESOS-9275 to the 1.7.1 CHANGELOG.
     new 00a29b7  Refactored the test CSI plugin.
     new 9c0af9a  Fixed `CreateVolume` of the test CSI plugin.
     new 3718c89  Added the `--create_parameters` flag to the test CSI plugin.
     new ea4534a  Recovered disk through `CREATE_DISK` in test `AgentRegisteredWithNewId`.
     new bf4e8b3  Used `OperationID` instead of `string` in test helpers.

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG                                          |   1 +
 include/mesos/mesos.proto                          |  46 +-
 include/mesos/type_utils.hpp                       |  46 +-
 include/mesos/v1/mesos.hpp                         |  27 +-
 include/mesos/v1/mesos.proto                       |  46 +-
 src/csi/state.proto                                |   6 +
 src/csi/utils.hpp                                  |   8 +
 src/examples/test_csi_plugin.cpp                   | 153 +++--
 src/master/validation.cpp                          |   6 +
 src/resource_provider/storage/provider.cpp         | 272 ++++----
 .../storage/uri_disk_profile_adaptor.cpp           |  26 +-
 src/tests/api_tests.cpp                            |  12 +-
 src/tests/master_slave_reconciliation_tests.cpp    |   2 +-
 src/tests/master_tests.cpp                         |   8 +-
 src/tests/master_validation_tests.cpp              |  44 +-
 src/tests/mesos.hpp                                |  62 +-
 src/tests/operation_reconciliation_tests.cpp       |  13 +-
 src/tests/resource_provider_manager_tests.cpp      |   2 +-
 .../storage_local_resource_provider_tests.cpp      | 688 ++++++++++++---------
 19 files changed, 857 insertions(+), 611 deletions(-)


[mesos] 11/14: Fixed `CreateVolume` of the test CSI plugin.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 9c0af9a2fa1c5effec84c2c221fb66fc13d099c1
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Nov 19 15:18:11 2018 -0800

    Fixed `CreateVolume` of the test CSI plugin.
    
    This patch makes sure that `CreateVolume` is idempotent, and check if
    the specified volume capability is supported.
    
    Review: https://reviews.apache.org/r/69402
---
 src/examples/test_csi_plugin.cpp | 48 ++++++++++++++++++++++++++--------------
 1 file changed, 32 insertions(+), 16 deletions(-)

diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index cbb61a5..8b6adb2 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -325,13 +325,36 @@ Status TestCSIPlugin::CreateVolume(
     return Status(grpc::INVALID_ARGUMENT, "Volume name cannot be empty");
   }
 
-  if (request->name().find_first_of(os::PATH_SEPARATOR) != string::npos) {
-    return Status(grpc::INVALID_ARGUMENT, "Volume name cannot contain '/'");
+  if (strings::contains(request->name(), stringify(os::PATH_SEPARATOR))) {
+    return Status(
+        grpc::INVALID_ARGUMENT,
+        "Volume name cannot contain '" + stringify(os::PATH_SEPARATOR) + "'");
   }
 
-  bool alreadyExists = volumes.contains(request->name());
+  foreach (const csi::v0::VolumeCapability& capability,
+           request->volume_capabilities()) {
+    if (capability != defaultVolumeCapability) {
+      return Status(grpc::INVALID_ARGUMENT, "Unsupported volume capabilities");
+    }
+  }
 
-  if (!alreadyExists) {
+  // The volume ID is determined by `name`, so we check whether the volume
+  // corresponding to `name` is compatible to the request if it exists.
+  if (volumes.contains(request->name())) {
+    const VolumeInfo volumeInfo = volumes.at(request->name());
+
+    if (request->has_capacity_range()) {
+      const csi::v0::CapacityRange& range = request->capacity_range();
+
+      if (range.limit_bytes() != 0 &&
+          volumeInfo.size > Bytes(range.limit_bytes())) {
+        return Status(grpc::ALREADY_EXISTS, "Cannot satisfy 'limit_bytes'");
+      } else if (range.required_bytes() != 0 &&
+                 volumeInfo.size < Bytes(range.required_bytes())) {
+        return Status(grpc::ALREADY_EXISTS, "Cannot satisfy 'required_bytes'");
+      }
+    }
+  } else {
     if (availableCapacity == Bytes(0)) {
       return Status(grpc::OUT_OF_RANGE, "Insufficient capacity");
     }
@@ -344,13 +367,12 @@ Status TestCSIPlugin::CreateVolume(
       const csi::v0::CapacityRange& range = request->capacity_range();
 
       // The highest we can pick.
-      Bytes limit = availableCapacity;
-      if (range.limit_bytes() != 0) {
-        limit = min(availableCapacity, Bytes(range.limit_bytes()));
-      }
+      Bytes limit = range.limit_bytes() != 0
+        ? min(availableCapacity, Bytes(range.limit_bytes()))
+        : availableCapacity;
 
       if (range.required_bytes() != 0 &&
-          static_cast<size_t>(range.required_bytes()) > limit.bytes()) {
+          limit < Bytes(range.required_bytes())) {
         return Status(grpc::OUT_OF_RANGE, "Cannot satisfy 'required_bytes'");
       }
 
@@ -370,7 +392,7 @@ Status TestCSIPlugin::CreateVolume(
 
     CHECK_GE(availableCapacity, volumeInfo.size);
     availableCapacity -= volumeInfo.size;
-    volumes.put(volumeInfo.id, volumeInfo);
+    volumes.put(volumeInfo.id, std::move(volumeInfo));
   }
 
   const VolumeInfo& volumeInfo = volumes.at(request->name());
@@ -380,12 +402,6 @@ Status TestCSIPlugin::CreateVolume(
   (*response->mutable_volume()->mutable_attributes())["path"] =
     getVolumePath(volumeInfo);
 
-  if (alreadyExists) {
-    return Status(
-        grpc::ALREADY_EXISTS,
-        "Volume with name '" + request->name() + "' already exists");
-  }
-
   return Status::OK;
 }
 


[mesos] 05/14: Implemented the new `CREATE_DISK`/`DESTROY_DISK` semantics in SLRP.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f11de38a492669f38b8df006b25f504ae2bea4d3
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Nov 15 12:25:18 2018 -0800

    Implemented the new `CREATE_DISK`/`DESTROY_DISK` semantics in SLRP.
    
    The default mount/block volume capabilities is removed from SLRP.
    Instead, `CREATE_DISK` will convert a preprovisioned RAW disk to a
    profile disk, and `DESTROY_DISK` will always deprovision a profile disk
    as long as the CSI plugin is capable of deprovisioning volumes.
    
    Review: https://reviews.apache.org/r/69361
---
 src/csi/utils.hpp                          |   8 +
 src/resource_provider/storage/provider.cpp | 235 ++++++++++++-----------------
 2 files changed, 103 insertions(+), 140 deletions(-)

diff --git a/src/csi/utils.hpp b/src/csi/utils.hpp
index 5ce318e..9145c67 100644
--- a/src/csi/utils.hpp
+++ b/src/csi/utils.hpp
@@ -45,6 +45,14 @@ bool operator==(
 bool operator==(const VolumeCapability& left, const VolumeCapability& right);
 
 
+inline bool operator!=(
+    const VolumeCapability& left,
+    const VolumeCapability& right)
+{
+  return !(left == right);
+}
+
+
 std::ostream& operator<<(
     std::ostream& stream,
     const ControllerServiceCapability::RPC::Type& type);
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index c137fa4..ebd5a88 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -402,7 +402,7 @@ private:
       const string& name,
       const Bytes& capacity,
       const DiskProfileAdaptor::ProfileInfo& profileInfo);
-  Future<Nothing> deleteVolume(const string& volumeId, bool preExisting);
+  Future<bool> deleteVolume(const string& volumeId);
   Future<string> validateCapability(
       const string& volumeId,
       const Option<Labels>& metadata,
@@ -420,7 +420,8 @@ private:
   Future<vector<ResourceConversion>> applyCreateDisk(
       const Resource& resource,
       const id::UUID& operationUuid,
-      const Resource::DiskInfo::Source::Type& type);
+      const Resource::DiskInfo::Source::Type& targetType,
+      const Option<string>& targetProfile);
   Future<vector<ResourceConversion>> applyDestroyDisk(
       const Resource& resource);
 
@@ -460,8 +461,6 @@ private:
 
   shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
 
-  csi::v0::VolumeCapability defaultMountCapability;
-  csi::v0::VolumeCapability defaultBlockCapability;
   string bootId;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
@@ -590,14 +589,6 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
-  // Default mount and block capabilities for pre-existing volumes.
-  defaultMountCapability.mutable_mount();
-  defaultMountCapability.mutable_access_mode()
-    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
-  defaultBlockCapability.mutable_block();
-  defaultBlockCapability.mutable_access_mode()
-    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
-
   Try<string> _bootId = os::bootId();
   if (_bootId.isError()) {
     LOG(ERROR) << "Failed to get boot ID: " << _bootId.error();
@@ -2650,6 +2641,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 
 
 // Returns a CSI volume ID.
+//
 // NOTE: This can only be called after `prepareControllerService`.
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
@@ -2680,9 +2672,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
           const csi::v0::Volume& volume = response.volume();
 
           if (volumes.contains(volume.id())) {
-            // The resource provider failed over after the last
-            // `CreateVolume` call, but before the operation status was
-            // checkpointed.
+            // The resource provider failed over after the last `createVolume`
+            // call, but before the operation status was checkpointed.
             CHECK_EQ(VolumeState::CREATED,
                      volumes.at(volume.id()).state.state());
           } else {
@@ -2702,19 +2693,13 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 }
 
 
+// Returns true if the volume has been deprovisioned.
+//
 // NOTE: This can only be called after `prepareControllerService` and
 // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
-Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
-    const string& volumeId,
-    bool preExisting)
+Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
+    const string& volumeId)
 {
-  // We do not need the capability for pre-existing volumes since no
-  // actual `DeleteVolume` call will be made.
-  if (!preExisting && !controllerCapabilities.createDeleteVolume) {
-    return Failure(
-        "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
-  }
-
   CHECK_SOME(controllerContainerId);
 
   const string volumePath = csi::paths::getVolumePath(
@@ -2724,11 +2709,11 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       volumeId);
 
   if (!volumes.contains(volumeId)) {
-    // The resource provider failed over after the last `DeleteVolume`
-    // call, but before the operation status was checkpointed.
+    // The resource provider failed over after the last `deleteVolume` call, but
+    // before the operation status was checkpointed.
     CHECK(!os::exists(volumePath));
 
-    return Nothing();
+    return controllerCapabilities.createDeleteVolume;
   }
 
   const VolumeData& volume = volumes.at(volumeId);
@@ -2766,7 +2751,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       // state once the above is done.
     }
     case VolumeState::CREATED: {
-      if (!preExisting) {
+      // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
+      // supported. Otherwise, we simply leave it as a preprovisioned volume.
+      if (controllerCapabilities.createDeleteVolume) {
         deleted = deleted
           .then(defer(self(), &Self::getService, controllerContainerId.get()))
           .then(defer(self(), [this, volumeId](csi::v0::Client client) {
@@ -2804,7 +2791,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       volumes.erase(volumeId);
       CHECK_SOME(os::rmdir(volumePath));
 
-      return Nothing();
+      return controllerCapabilities.createDeleteVolume;
     }));
 }
 
@@ -2994,7 +2981,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
       conversions = applyCreateDisk(
           operation.info().create_disk().source(),
           operationUuid,
-          operation.info().create_disk().target_type());
+          operation.info().create_disk().target_type(),
+          operation.info().create_disk().has_target_profile()
+            ? operation.info().create_disk().target_profile()
+            : Option<string>::none());
 
       break;
     }
@@ -3098,120 +3088,50 @@ Future<vector<ResourceConversion>>
 StorageLocalResourceProviderProcess::applyCreateDisk(
     const Resource& resource,
     const id::UUID& operationUuid,
-    const Resource::DiskInfo::Source::Type& type)
+    const Resource::DiskInfo::Source::Type& targetType,
+    const Option<string>& targetProfile)
 {
   CHECK_EQ(Resource::DiskInfo::Source::RAW, resource.disk().source().type());
 
-  // NOTE: Currently we only support two type of RAW disk resources:
+  // NOTE: Currently we only support two types of RAW disk resources:
   //   1. RAW disk from `GetCapacity` with a profile but no volume ID.
-  //   2. RAW disk from `ListVolumes` for a pre-existing volume, which
-  //      has a volume ID but no profile.
+  //   2. RAW disk from `ListVolumes` for a preprovisioned volume, which has a
+  //      volume ID but no profile.
   //
   // For 1, we check if its profile is mount or block capable, then
-  // call `CreateVolume` with the operation UUID as the name (so that
+  // call `createVolume` with the operation UUID as the name (so that
   // the same volume will be returned when recovering from a failover).
   //
-  // For 2, there are two scenarios:
+  // For 2, the target profile will be specified, so we first check if the
+  // profile is mount or block capable. Then, there are two scenarios:
   //   a. If the volume has a checkpointed state (because it was created
-  //      by a previous resource provider), we simply check if its
-  //      checkpointed capability supports the conversion.
+  //      by a previous resource provider), we simply check if its checkpointed
+  //      capability and parameters match the profile.
   //   b. If the volume is newly discovered, `ValidateVolumeCapabilities`
-  //      is called with a default mount or block capability.
+  //      is called with the capability of the profile.
   CHECK_NE(resource.disk().source().has_profile(),
-           resource.disk().source().has_id());
+           resource.disk().source().has_id() && targetProfile.isSome());
 
-  Future<string> created;
-
-  switch (type) {
-    case Resource::DiskInfo::Source::MOUNT: {
-      if (resource.disk().source().has_profile()) {
-        // The profile exists since any operation with a stale profile must have
-        // been dropped for a mismatched resource version or a reconciliation.
-        CHECK(profileInfos.contains(resource.disk().source().profile()))
-          << "Profile '" << resource.disk().source().profile() << "' not found";
+  const string profile =
+    targetProfile.getOrElse(resource.disk().source().profile());
 
-        const DiskProfileAdaptor::ProfileInfo& profileInfo =
-          profileInfos.at(resource.disk().source().profile());
-
-        if (!profileInfo.capability.has_mount()) {
-          return Failure(
-              "Profile '" + resource.disk().source().profile() +
-              "' cannot be used to create a MOUNT disk");
-        }
-
-        // TODO(chhsiao): Call `CreateVolume` sequentially with other
-        // create or delete operations, and send an `UPDATE_STATE` for
-        // RAW profiled resources afterward.
-        created = createVolume(
-            operationUuid.toString(),
-            Bytes(resource.scalar().value() * Bytes::MEGABYTES),
-            profileInfo);
-      } else {
-        const string& volumeId = resource.disk().source().id();
-
-        if (volumes.contains(volumeId)) {
-          if (!volumes.at(volumeId).state.volume_capability().has_mount()) {
-            return Failure(
-                "Volume '" + volumeId +
-                "' cannot be converted to a MOUNT disk");
-          }
+  if (!profileInfos.contains(profile)) {
+    return Failure("Profile '" + profile + "' not found");
+  }
 
-          created = volumeId;
-        } else {
-          // No need to call `ValidateVolumeCapabilities` sequentially
-          // since the volume is not used and thus not in `volumes` yet.
-          created = validateCapability(
-              volumeId,
-              resource.disk().source().has_metadata()
-                ? resource.disk().source().metadata() : Option<Labels>::none(),
-              defaultMountCapability);
-        }
+  const DiskProfileAdaptor::ProfileInfo& profileInfo = profileInfos.at(profile);
+  switch (targetType) {
+    case Resource::DiskInfo::Source::MOUNT: {
+      if (!profileInfo.capability.has_mount()) {
+        return Failure(
+            "Profile '" + profile + "' cannot be used to create a MOUNT disk");
       }
       break;
     }
     case Resource::DiskInfo::Source::BLOCK: {
-      if (resource.disk().source().has_profile()) {
-        // The profile exists since any operation with a stale profile must have
-        // been dropped for a mismatched resource version or a reconciliation.
-        CHECK(profileInfos.contains(resource.disk().source().profile()))
-          << "Profile '" << resource.disk().source().profile() << "' not found";
-
-        const DiskProfileAdaptor::ProfileInfo& profileInfo =
-          profileInfos.at(resource.disk().source().profile());
-
-        if (!profileInfo.capability.has_block()) {
-          return Failure(
-              "Profile '" + resource.disk().source().profile() +
-              "' cannot be used to create a BLOCK disk");
-        }
-
-        // TODO(chhsiao): Call `CreateVolume` sequentially with other
-        // create or delete operations, and send an `UPDATE_STATE` for
-        // RAW profiled resources afterward.
-        created = createVolume(
-            operationUuid.toString(),
-            Bytes(resource.scalar().value() * Bytes::MEGABYTES),
-            profileInfo);
-      } else {
-        const string& volumeId = resource.disk().source().id();
-
-        if (volumes.contains(volumeId)) {
-          if (!volumes.at(volumeId).state.volume_capability().has_block()) {
-            return Failure(
-                "Volume '" + volumeId +
-                "' cannot be converted to a BLOCK disk");
-          }
-
-          created = volumeId;
-        } else {
-          // No need to call `ValidateVolumeCapabilities` sequentially
-          // since the volume is not used and thus not in `volumes` yet.
-          created = validateCapability(
-              volumeId,
-              resource.disk().source().has_metadata()
-                ? resource.disk().source().metadata() : Option<Labels>::none(),
-              defaultBlockCapability);
-        }
+      if (!profileInfo.capability.has_block()) {
+        return Failure(
+            "Profile '" + profile + "' cannot be used to create a BLOCK disk");
       }
       break;
     }
@@ -3222,6 +3142,40 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
     }
   }
 
+  Future<string> created;
+  if (resource.disk().source().has_id()) {
+    const string& volumeId = resource.disk().source().id();
+
+    if (volumes.contains(volumeId)) {
+      const VolumeState& volumeState = volumes.at(volumeId).state;
+
+      // TODO(chhsiao): Validate the volume against the parameters of the
+      // profile once they are checkpointed.
+      if (volumeState.volume_capability() != profileInfo.capability) {
+        return Failure(
+            "Profile '" + profile + "' cannot be applied to volume '" +
+            volumeId + "'");
+      }
+
+      created = volumeId;
+    } else {
+      created = validateCapability(
+          volumeId,
+          resource.disk().source().has_metadata()
+            ? resource.disk().source().metadata()
+            : Option<Labels>::none(),
+          profileInfo.capability);
+    }
+  } else {
+    // TODO(chhsiao): Consider calling `CreateVolume` sequentially with other
+    // create or delete operations, and send an `UPDATE_STATE` for storage pools
+    // afterward. See MESOS-9254.
+    created = createVolume(
+        operationUuid.toString(),
+        Bytes(resource.scalar().value() * Bytes::MEGABYTES),
+        profileInfo);
+  }
+
   return created
     .then(defer(self(), [=](const string& volumeId) {
       CHECK(volumes.contains(volumeId));
@@ -3229,7 +3183,8 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
 
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_id(volumeId);
-      converted.mutable_disk()->mutable_source()->set_type(type);
+      converted.mutable_disk()->mutable_source()->set_type(targetType);
+      converted.mutable_disk()->mutable_source()->set_profile(profile);
 
       if (!volumeState.volume_attributes().empty()) {
         converted.mutable_disk()->mutable_source()->mutable_metadata()
@@ -3241,7 +3196,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
           info.storage().plugin().type(),
           info.storage().plugin().name());
 
-      switch (type) {
+      switch (targetType) {
         case Resource::DiskInfo::Source::MOUNT: {
           // Set the root path relative to agent work dir.
           converted.mutable_disk()->mutable_source()->mutable_mount()
@@ -3274,24 +3229,22 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
   CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT ||
         resource.disk().source().type() == Resource::DiskInfo::Source::BLOCK);
   CHECK(resource.disk().source().has_id());
-  CHECK(volumes.contains(resource.disk().source().id()));
+
+  const string& volumeId = resource.disk().source().id();
+  CHECK(volumes.contains(volumeId));
 
   // Sequentialize the deletion with other operation on the same volume.
-  return volumes.at(resource.disk().source().id()).sequence->add(
-      std::function<Future<Nothing>()>(defer(
-          self(),
-          &Self::deleteVolume,
-          resource.disk().source().id(),
-          !resource.disk().source().has_profile())))
-    .then(defer(self(), [=]() {
+  return volumes.at(volumeId).sequence->add(std::function<Future<bool>()>(
+      defer(self(), &Self::deleteVolume, volumeId)))
+    .then(defer(self(), [=](bool deprovisioned) {
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_type(
           Resource::DiskInfo::Source::RAW);
       converted.mutable_disk()->mutable_source()->clear_mount();
 
-      // We only clear the volume ID and metadata if the destroyed volume is not
-      // a pre-existing volume.
-      if (resource.disk().source().has_profile()) {
+      // We clear the volume ID and metadata if the volume has been
+      // deprovisioned. Otherwise, we clear the profile.
+      if (deprovisioned) {
         converted.mutable_disk()->mutable_source()->clear_id();
         converted.mutable_disk()->mutable_source()->clear_metadata();
 
@@ -3321,6 +3274,8 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
                 defer(self(), &Self::reconcileStoragePools)));
           }
         }
+      } else {
+        converted.mutable_disk()->mutable_source()->clear_profile();
       }
 
       vector<ResourceConversion> conversions;


[mesos] 01/14: Changed the semantics of `CREATE_DISK` and `DESTROY_DISK` operations.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 252b5548b981529ee1e65da1b385979ab3db1d92
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Oct 15 20:15:41 2018 -0700

    Changed the semantics of `CREATE_DISK` and `DESTROY_DISK` operations.
    
    The semantics of these two operations has been updated to provide
    primitives to import CSI volumes and recover CSI volumes against agent
    ID changes and metadata loss.
    
    Review: https://reviews.apache.org/r/69036
---
 include/mesos/mesos.proto    | 46 +++++++++++++++++++++++++++++++++++++++++---
 include/mesos/v1/mesos.proto | 46 +++++++++++++++++++++++++++++++++++++++++---
 2 files changed, 86 insertions(+), 6 deletions(-)

diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 56107f4..c822cc7 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1982,7 +1982,26 @@ message Offer {
       required Value.Scalar subtract = 2;
     }
 
-    // Create a `MOUNT` or `BLOCK` disk resource from a `RAW` disk resource.
+    // Create a `MOUNT` or `BLOCK` disk resource backed by a CSI volume from a
+    // `RAW` disk resource.
+    //
+    // In the typical case where the `RAW` disk resource has a profile and no
+    // source ID, a new CSI volume will be provisioned by Mesos to back the
+    // returned `MOUNT` or `BLOCK` disk resource. However, the `RAW` disk
+    // resource can instead have no profile but a source ID, indicating that
+    // it is already backed by a CSI volume in one of the following scenarios:
+    //
+    // (1) The CSI volume is preprovisioned out-of-band.
+    //
+    // (2) The CSI volume is provisioned by Mesos, but Mesos has lost the
+    //     corresponding `MOUNT` or `BLOCK` resource metadata. This could
+    //     happen if there has been a change in the agent ID or resource
+    //     provider ID where the volume belongs.
+    //
+    // In the above cases, Mesos won't provision a new CSI volume, but instead
+    // will simply return a `MOUNT` or `BLOCK` disk resource backed by the same
+    // CSI volume, with the profile specified in this call.
+    //
     // NOTE: For the time being, this API is subject to change and the related
     // feature is experimental.
     message CreateDisk {
@@ -1990,10 +2009,31 @@ message Offer {
 
       // NOTE: Only `MOUNT` or `BLOCK` is allowed in the `target_type` field.
       required Resource.DiskInfo.Source.Type target_type = 2;
+
+      // Apply the specified profile to the created disk. This field must be set
+      // if `source` does not have a profile, and must not be set if it has one.
+      //
+      // NOTE: The operation will fail If the specified profile is unknown to
+      // Mesos, i.e., not reported by the disk profile adaptor.
+      optional string target_profile = 3;
     }
 
-    // Destroy a `MOUNT` or `BLOCK` disk resource. This will result in a `RAW`
-    // disk resource.
+    // Destroy a `MOUNT` or `BLOCK` disk resource backed by a CSI volume.
+    //
+    // In the typical case where the CSI plugin of the volume supports volume
+    // deprovisioning and the profile of the disk resource is known to Mesos,
+    // the volume will be deprovisioned and a `RAW` disk resource with the same
+    // profile but no source ID will be returned. However, the following corner
+    // cases could lead to different outcomes:
+    //
+    // (1) If the CSI plugin supports volume deprovisioning but the profile of
+    //     the disk resource is no longer reported by the disk profile adaptor,
+    //     the volume will be deprovisioned but no resource will be returned.
+    //
+    // (2) If the CSI plugin does not support volume deprovisioning, the volume
+    //     won't be deprovisioned and a `RAW` disk resource with no profile but
+    //     the same source ID will be returned.
+    //
     // NOTE: For the time being, this API is subject to change and the related
     // feature is experimental.
     message DestroyDisk {
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index c6e7515..51c1bfd 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1974,7 +1974,26 @@ message Offer {
       required Value.Scalar subtract = 2;
     }
 
-    // Create a `MOUNT` or `BLOCK` disk resource from a `RAW` disk resource.
+    // Create a `MOUNT` or `BLOCK` disk resource backed by a CSI volume from a
+    // `RAW` disk resource.
+    //
+    // In the typical case where the `RAW` disk resource has a profile and no
+    // source ID, a new CSI volume will be provisioned by Mesos to back the
+    // returned `MOUNT` or `BLOCK` disk resource. However, the `RAW` disk
+    // resource can instead have no profile but a source ID, indicating that
+    // it is already backed by a CSI volume in one of the following scenarios:
+    //
+    // (1) The CSI volume is preprovisioned out-of-band.
+    //
+    // (2) The CSI volume is provisioned by Mesos, but Mesos has lost the
+    //     corresponding `MOUNT` or `BLOCK` resource metadata. This could
+    //     happen if there has been a change in the agent ID or resource
+    //     provider ID where the volume belongs.
+    //
+    // In the above cases, Mesos won't provision a new CSI volume, but instead
+    // will simply return a `MOUNT` or `BLOCK` disk resource backed by the same
+    // CSI volume, with the profile specified in this call.
+    //
     // NOTE: For the time being, this API is subject to change and the related
     // feature is experimental.
     message CreateDisk {
@@ -1982,10 +2001,31 @@ message Offer {
 
       // NOTE: Only `MOUNT` or `BLOCK` is allowed in the `target_type` field.
       required Resource.DiskInfo.Source.Type target_type = 2;
+
+      // Apply the specified profile to the created disk. This field must be set
+      // if `source` does not have a profile, and must not be set if it has one.
+      //
+      // NOTE: The operation will fail If the specified profile is unknown to
+      // Mesos, i.e., not reported by the disk profile adaptor.
+      optional string target_profile = 3;
     }
 
-    // Destroy a `MOUNT` or `BLOCK` disk resource. This will result in a `RAW`
-    // disk resource.
+    // Destroy a `MOUNT` or `BLOCK` disk resource backed by a CSI volume.
+    //
+    // In the typical case where the CSI plugin of the volume supports volume
+    // deprovisioning and the profile of the disk resource is known to Mesos,
+    // the volume will be deprovisioned and a `RAW` disk resource with the same
+    // profile but no source ID will be returned. However, the following corner
+    // cases could lead to different outcomes:
+    //
+    // (1) If the CSI plugin supports volume deprovisioning but the profile of
+    //     the disk resource is no longer reported by the disk profile adaptor,
+    //     the volume will be deprovisioned but no resource will be returned.
+    //
+    // (2) If the CSI plugin does not support volume deprovisioning, the volume
+    //     won't be deprovisioned and a `RAW` disk resource with no profile but
+    //     the same source ID will be returned.
+    //
     // NOTE: For the time being, this API is subject to change and the related
     // feature is experimental.
     message DestroyDisk {


[mesos] 02/14: Added profiles to storage pools in tests for `CREATE_DISK`.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d9bd4d0cd2b2a6862d74163225040fe86d24567b
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Nov 15 01:18:04 2018 -0800

    Added profiles to storage pools in tests for `CREATE_DISK`.
    
    This patch adds a new `targetProfile` parameter to the `CREATE_DISK`
    test helper, and add profiles to all storage pools in tests, to adhere
    to the new semantics of `CREATE_DISK`.
    
    Review: https://reviews.apache.org/r/69357
---
 src/tests/api_tests.cpp                            | 12 ++++++++++--
 src/tests/master_tests.cpp                         |  6 +-----
 src/tests/mesos.hpp                                | 22 ++++++++++++++++------
 src/tests/operation_reconciliation_tests.cpp       | 11 ++++++-----
 src/tests/resource_provider_manager_tests.cpp      |  2 +-
 .../storage_local_resource_provider_tests.cpp      | 10 ++++++----
 6 files changed, 40 insertions(+), 23 deletions(-)

diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index fdd9f87..edde48a 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -1086,7 +1086,11 @@ TEST_P(MasterAPITest, GetOperations)
   v1::MockResourceProvider resourceProvider(
       info,
       v1::createDiskResource(
-          "200", "*", None(), None(), v1::createDiskSourceRaw()));
+          "200",
+          "*",
+          None(),
+          None(),
+          v1::createDiskSourceRaw(None(), "profile")));
 
   // Start and register resource provider.
   Owned<EndpointDetector> endpointDetector(
@@ -7878,7 +7882,11 @@ TEST_P(AgentAPITest, GetOperations)
   v1::MockResourceProvider resourceProvider(
       info,
       v1::createDiskResource(
-          "200", "*", None(), None(), v1::createDiskSourceRaw()));
+          "200",
+          "*",
+          None(),
+          None(),
+          v1::createDiskSourceRaw(None(), "profile")));
 
   // Start and register a resource provider.
   Owned<EndpointDetector> endpointDetector(
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 61b72ed..561c382 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -9060,11 +9060,7 @@ TEST_F(MasterTest, OperationUpdateDuringFailover)
   resourceProviderInfo.set_name("test");
 
   v1::Resources resourceProviderResources = v1::createDiskResource(
-      "200",
-      "*",
-      None(),
-      None(),
-      v1::createDiskSourceRaw());
+      "200", "*", None(), None(), v1::createDiskSourceRaw(None(), "profile"));
 
   v1::MockResourceProvider resourceProvider(
       resourceProviderInfo,
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index b181f73..9850821 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1413,13 +1413,18 @@ inline typename TOffer::Operation LAUNCH_GROUP(
 template <typename TResource, typename TTargetType, typename TOffer>
 inline typename TOffer::Operation CREATE_DISK(
     const TResource& source,
-    const TTargetType& type,
+    const TTargetType& targetType,
+    const Option<std::string>& targetProfile = None(),
     const Option<std::string>& operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::CREATE_DISK);
   operation.mutable_create_disk()->mutable_source()->CopyFrom(source);
-  operation.mutable_create_disk()->set_target_type(type);
+  operation.mutable_create_disk()->set_target_type(targetType);
+
+  if (targetProfile.isSome()) {
+    operation.mutable_create_disk()->set_target_profile(targetProfile.get());
+  }
 
   if (operationId.isSome()) {
     operation.mutable_id()->set_value(operationId.get());
@@ -3120,18 +3125,23 @@ public:
         update->mutable_status()->add_converted_resources()->CopyFrom(
             operation.info().create_disk().source());
         update->mutable_status()
-          ->mutable_converted_resources()
-          ->Mutable(0)
+          ->mutable_converted_resources(0)
           ->mutable_disk()
           ->mutable_source()
           ->set_type(operation.info().create_disk().target_type());
+        if (operation.info().create_disk().has_target_profile()) {
+          update->mutable_status()
+            ->mutable_converted_resources(0)
+            ->mutable_disk()
+            ->mutable_source()
+            ->set_profile(operation.info().create_disk().target_profile());
+        }
         break;
       case Operation::DESTROY_DISK:
         update->mutable_status()->add_converted_resources()->CopyFrom(
             operation.info().destroy_disk().source());
         update->mutable_status()
-          ->mutable_converted_resources()
-          ->Mutable(0)
+          ->mutable_converted_resources(0)
           ->mutable_disk()
           ->mutable_source()
           ->set_type(Source::RAW);
diff --git a/src/tests/operation_reconciliation_tests.cpp b/src/tests/operation_reconciliation_tests.cpp
index 37d38b3..8afac9a 100644
--- a/src/tests/operation_reconciliation_tests.cpp
+++ b/src/tests/operation_reconciliation_tests.cpp
@@ -678,8 +678,8 @@ TEST_P(OperationReconciliationTest, AgentPendingOperationAfterMasterFailover)
   resourceProviderInfo.set_type("org.apache.mesos.rp.test");
   resourceProviderInfo.set_name("test");
 
-  Resource disk =
-    createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
+  Resource disk = createDiskResource(
+      "200", "*", None(), None(), createDiskSourceRaw(None(), "profile"));
 
   Owned<MockResourceProvider> resourceProvider(
       new MockResourceProvider(
@@ -792,9 +792,10 @@ TEST_P(OperationReconciliationTest, AgentPendingOperationAfterMasterFailover)
       frameworkId,
       offer,
       {CREATE_DISK(
-          source.get(),
-          Resource::DiskInfo::Source::MOUNT,
-          operationId.value())}));
+           source.get(),
+           Resource::DiskInfo::Source::MOUNT,
+           None(),
+           operationId.value())}));
 
   AWAIT_READY(applyOperation);
 
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 5bb740e..b61c50f 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -1001,7 +1001,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
   AWAIT_READY(updateSlaveMessage);
 
   v1::Resource disk = v1::createDiskResource(
-      "200", "*", None(), None(), v1::createDiskSourceRaw());
+      "200", "*", None(), None(), v1::createDiskSourceRaw(None(), "profile"));
 
   updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
 
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 077a465..dddb608 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -3998,9 +3998,7 @@ TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
 
 // This test verifies that if an operation ID is specified, operation status
 // updates are resent to the scheduler until acknowledged.
-TEST_F(
-    StorageLocalResourceProviderTest,
-    RetryOperationStatusUpdateToScheduler)
+TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler)
 {
   Clock::pause();
 
@@ -4134,7 +4132,10 @@ TEST_F(
       frameworkId,
       offer,
       {v1::CREATE_DISK(
-          source.get(), v1::Resource::DiskInfo::Source::MOUNT, operationId)}));
+           source.get(),
+           v1::Resource::DiskInfo::Source::MOUNT,
+           None(),
+           operationId)}));
 
   AWAIT_READY(update);
 
@@ -4318,6 +4319,7 @@ TEST_F(
       {v1::CREATE_DISK(
           source.get(),
           v1::Resource::DiskInfo::Source::MOUNT,
+          None(),
           operationId.value())}));
 
   AWAIT_READY(update);


[mesos] 13/14: Recovered disk through `CREATE_DISK` in test `AgentRegisteredWithNewId`.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit ea4534a39ec5823f13e7b62bf9928faf79c4e881
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Wed Nov 14 20:27:09 2018 -0800

    Recovered disk through `CREATE_DISK` in test `AgentRegisteredWithNewId`.
    
    Test `AgentRegisteredWithNewId` is now improved to exercise the code
    path for recovering disks created by the last agent through
    `CREATE_DISK`.
    
    Review: https://reviews.apache.org/r/69365
---
 .../storage_local_resource_provider_tests.cpp      | 351 ++++++++++++++-------
 1 file changed, 233 insertions(+), 118 deletions(-)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index e2a4d02..8c88c14 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -67,6 +67,7 @@ using process::post;
 using process::reap;
 
 using testing::AtMost;
+using testing::Between;
 using testing::DoAll;
 using testing::Not;
 using testing::Sequence;
@@ -192,7 +193,8 @@ public:
 
   void setupResourceProviderConfig(
       const Bytes& capacity,
-      const Option<string> volumes = None())
+      const Option<string> volumes = None(),
+      const Option<string> createParameters = None())
   {
     const string testCsiPluginName = "test_csi_plugin";
 
@@ -230,6 +232,7 @@ public:
                     "arguments": [
                       "%s",
                       "--available_capacity=%s",
+                      "--create_parameters=%s",
                       "--volumes=%s",
                       "--work_dir=%s"
                     ]
@@ -262,6 +265,7 @@ public:
         testCsiPluginPath,
         testCsiPluginPath,
         stringify(capacity),
+        createParameters.getOrElse(""),
         volumes.getOrElse(""),
         testCsiPluginWorkDir);
 
@@ -273,34 +277,41 @@ public:
   }
 
   // Create a JSON string representing a disk profile mapping containing the
-  // given profile.
-  static string createDiskProfileMapping(const string& profile)
+  // given profile-parameter pairs.
+  static string createDiskProfileMapping(
+      const hashmap<string, Option<JSON::Object>>& profiles)
   {
-    Try<string> diskProfileMapping = strings::format(
-        R"~(
-        {
-          "profile_matrix": {
-            "%s": {
-              "csi_plugin_type_selector": {
-                "plugin_type": "org.apache.mesos.csi.test"
-              },
-              "volume_capabilities": {
-                "mount": {},
-                "access_mode": {
-                  "mode": "SINGLE_NODE_WRITER"
-                }
-              }
-            }
-          }
-        }
-        )~",
-        profile);
-
-    // This extra closure is necessary in order to use `ASSERT_*`, as
-    // these macros require a void return type.
-    [&] { ASSERT_SOME(diskProfileMapping); }();
+    JSON::Object diskProfileMapping{
+      {"profile_matrix", JSON::Object{}}
+    };
+
+    foreachpair (const string& profile,
+                 const Option<JSON::Object>& parameters,
+                 profiles) {
+      JSON::Object profileInfo{
+        {"csi_plugin_type_selector", JSON::Object{
+          {"plugin_type", "org.apache.mesos.csi.test"}
+        }},
+        {"volume_capabilities", JSON::Object{
+          {"mount", JSON::Object{}},
+          {"access_mode", JSON::Object{
+            {"mode", "SINGLE_NODE_WRITER"}
+          }}
+        }}};
+
+      diskProfileMapping
+        .values.at("profile_matrix").as<JSON::Object>()
+        .values.emplace(profile, profileInfo);
+
+      if (parameters.isSome()) {
+        diskProfileMapping
+          .values.at("profile_matrix").as<JSON::Object>()
+          .values.at(profile).as<JSON::Object>()
+          .values.emplace("create_parameters", parameters.get());
+      }
+    }
 
-    return diskProfileMapping.get();
+    return stringify(diskProfileMapping);
   }
 
   string metricName(const string& basename)
@@ -454,7 +465,10 @@ TEST_F(StorageLocalResourceProviderTest, DISABLED_ZeroSizedDisk)
 TEST_F(StorageLocalResourceProviderTest, DISABLED_SmallDisk)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Kilobytes(512), "volume0:512KB");
@@ -649,7 +663,8 @@ TEST_F(StorageLocalResourceProviderTest, ProfileAppeared)
   Clock::settle();
 
   // Update the disk profile mapping.
-  updatedProfileMapping.set(http::OK(createDiskProfileMapping("test")));
+  updatedProfileMapping.set(
+      http::OK(createDiskProfileMapping({{"test", None()}})));
 
   // Advance the clock to make sure another allocation is triggered.
   Clock::advance(masterFlags.allocation_interval);
@@ -681,7 +696,10 @@ TEST_F(StorageLocalResourceProviderTest, ProfileAppeared)
 TEST_F(StorageLocalResourceProviderTest, CreateDestroyDisk)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -856,8 +874,9 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskRecovery)
 
   Promise<http::Response> recoveredProfileMapping;
   EXPECT_CALL(*server.get()->process, profiles(_))
-    .WillOnce(Return(http::OK(createDiskProfileMapping("test"))))
+    .WillOnce(Return(http::OK(createDiskProfileMapping({{"test", None()}}))))
     .WillOnce(Return(recoveredProfileMapping.future()));
+
   loadUriDiskProfileAdaptorModule(stringify(server.get()->process->url()));
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -1027,7 +1046,8 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskRecovery)
 
   // NOTE: We update the disk profile mapping after the `DESTROY_DISK` operation
   // is applied, otherwise it could be dropped due to reconciling storage pools.
-  recoveredProfileMapping.set(http::OK(createDiskProfileMapping("test")));
+  recoveredProfileMapping.set(
+      http::OK(createDiskProfileMapping({{"test", None()}})));
 
   AWAIT_READY(volumeDestroyedOffers);
   ASSERT_FALSE(volumeDestroyedOffers->empty());
@@ -1064,7 +1084,7 @@ TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared)
 
   Promise<http::Response> updatedProfileMapping;
   EXPECT_CALL(*server.get()->process, profiles(_))
-    .WillOnce(Return(http::OK(createDiskProfileMapping("test1"))))
+    .WillOnce(Return(http::OK(createDiskProfileMapping({{"test1", None()}}))))
     .WillOnce(Return(updatedProfileMapping.future()));
 
   const Duration pollInterval = Seconds(10);
@@ -1230,7 +1250,8 @@ TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared)
   Clock::advance(pollInterval);
 
   // Update the disk profile mapping.
-  updatedProfileMapping.set(http::OK(createDiskProfileMapping("test2")));
+  updatedProfileMapping.set(
+      http::OK(createDiskProfileMapping({{"test2", None()}})));
 
   AWAIT_READY(updateSlave3);
 
@@ -1413,14 +1434,18 @@ TEST_F(StorageLocalResourceProviderTest, AgentFailoverPluginKilled)
 
 // This test verifies that if an agent is registered with a new ID,
 // the ID of the resource provider would be changed as well, and any
-// created volume becomes a pre-existing volume.
+// created volume becomes a preprovisioned volume.
 TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping(
+      {{"test1", JSON::Object{{"label", "foo"}}},
+       {"test2", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
-  setupResourceProviderConfig(Gigabytes(4));
+  setupResourceProviderConfig(Gigabytes(2), None(), "label=foo");
 
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
@@ -1449,17 +1474,21 @@ TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId)
   EXPECT_CALL(sched, registered(&driver, _, _));
 
   // The framework is expected to see the following offers in sequence:
-  //   1. One containing a RAW disk resource before `CREATE_DISK`.
-  //   2. One containing a MOUNT disk resource after `CREATE_DISK`.
-  //   3. One containing a RAW pre-existing volume after the agent
-  //      is registered with a new ID.
+  //   1. One containing a RAW disk resource before the 1st `CREATE_DISK`.
+  //   2. One containing a MOUNT disk resource after the 1st `CREATE_DISK`.
+  //   3. One containing a RAW preprovisioned volume after the agent is
+  //      registered with a new ID.
+  //   4. One containing the same preprovisioned volume after the 2nd
+  //      `CREATE_DISK` that specifies a wrong profile.
+  //   5. One containing a MOUNT disk resource after the 3rd `CREATE_DISK` that
+  //      specifies the correct profile.
   //
   // We set up the expectations for these offers as the test progresses.
   Future<vector<Offer>> rawDiskOffers;
-  Future<vector<Offer>> volumeCreatedOffers;
+  Future<vector<Offer>> diskCreatedOffers;
   Future<vector<Offer>> slaveRecoveredOffers;
-
-  Sequence offers;
+  Future<vector<Offer>> operationFailedOffers;
+  Future<vector<Offer>> diskRecoveredOffers;
 
   // We use the following filter to filter offers that do not have
   // wanted resources for 365 days (the maximum).
@@ -1470,44 +1499,42 @@ TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId)
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillRepeatedly(DeclineOffers(declineFilters));
 
-  // Before the agent fails over, we are interested in any storage pool or
-  // volume with a "test" profile.
-  auto hasSourceType = [](
-      const Resource& r,
-      const Resource::DiskInfo::Source::Type& type) {
+  // Before the agent fails over, we are interested in the 'test1' storage pool.
+  auto isStoragePool = [](const Resource& r, const string& profile) {
     return r.has_disk() &&
       r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW &&
+      !r.disk().source().has_id() &&
       r.disk().source().has_profile() &&
-      r.disk().source().profile() == "test" &&
-      r.disk().source().type() == type;
+      r.disk().source().profile() == profile;
   };
 
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
-    .InSequence(offers)
+      std::bind(isStoragePool, lambda::_1, "test1"))))
     .WillOnce(FutureArg<1>(&rawDiskOffers));
 
   driver.start();
 
   AWAIT_READY(rawDiskOffers);
-  ASSERT_FALSE(rawDiskOffers->empty());
-
-  Option<Resource> source;
+  ASSERT_EQ(1u, rawDiskOffers->size());
 
-  foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
-    if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
-      source = resource;
-      break;
-    }
-  }
+  Resource raw = *Resources(rawDiskOffers->at(0).resources())
+    .filter(std::bind(isStoragePool, lambda::_1, "test1"))
+    .begin();
 
-  ASSERT_SOME(source);
+  auto isMountDisk = [](const Resource& r, const string& profile) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::MOUNT &&
+      r.disk().source().has_id() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == profile;
+  };
 
-  // Create a volume.
+  // Create a MOUNT disk of profile 'test1'.
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
-    .InSequence(offers)
-    .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+      std::bind(isMountDisk, lambda::_1, "test1"))))
+    .WillOnce(FutureArg<1>(&diskCreatedOffers));
 
   // We use the following filter so that the resources will not be
   // filtered for 5 seconds (the default).
@@ -1516,34 +1543,27 @@ TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId)
 
   driver.acceptOffers(
       {rawDiskOffers->at(0).id()},
-      {CREATE_DISK(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)},
       acceptFilters);
 
-  AWAIT_READY(volumeCreatedOffers);
-  ASSERT_FALSE(volumeCreatedOffers->empty());
-
-  Option<Resource> createdVolume;
+  AWAIT_READY(diskCreatedOffers);
+  ASSERT_EQ(1u, diskCreatedOffers->size());
 
-  foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
-    if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) {
-      createdVolume = resource;
-      break;
-    }
-  }
+  Resource created = *Resources(diskCreatedOffers->at(0).resources())
+    .filter(std::bind(isMountDisk, lambda::_1, "test1"))
+    .begin();
 
-  ASSERT_SOME(createdVolume);
-  ASSERT_TRUE(createdVolume->has_provider_id());
-  ASSERT_TRUE(createdVolume->disk().source().has_id());
-  ASSERT_TRUE(createdVolume->disk().source().has_metadata());
-  ASSERT_TRUE(createdVolume->disk().source().has_mount());
-  ASSERT_TRUE(createdVolume->disk().source().mount().has_root());
-  EXPECT_FALSE(path::absolute(createdVolume->disk().source().mount().root()));
+  ASSERT_TRUE(created.has_provider_id());
+  ASSERT_TRUE(created.disk().source().has_id());
+  ASSERT_TRUE(created.disk().source().has_metadata());
+  ASSERT_TRUE(created.disk().source().has_mount());
+  ASSERT_TRUE(created.disk().source().mount().has_root());
+  EXPECT_FALSE(path::absolute(created.disk().source().mount().root()));
 
   // Check if the volume is actually created by the test CSI plugin.
   Option<string> volumePath;
 
-  foreach (const Label& label,
-           createdVolume->disk().source().metadata().labels()) {
+  foreach (const Label& label, created.disk().source().metadata().labels()) {
     if (label.key() == "path") {
       volumePath = label.value();
       break;
@@ -1554,7 +1574,14 @@ TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId)
   EXPECT_TRUE(os::exists(volumePath.get()));
 
   // Shut down the agent.
-  EXPECT_CALL(sched, offerRescinded(_, _));
+  //
+  // NOTE: In addition to the last offer being rescinded, the master may send
+  // an offer after receiving an `UpdateSlaveMessage` containing only the
+  // preprovisioned volume, and then receive another `UpdateSlaveMessage`
+  // containing both the volume and a 'test1' storage pool of before the offer
+  // gets declined. In this case, the offer will be rescinded as well.
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .Times(Between(1, 2));
 
   slave.get()->terminate();
 
@@ -1562,23 +1589,34 @@ TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId)
   const string metaDir = slave::paths::getMetaRootDir(slaveFlags.work_dir);
   ASSERT_SOME(os::rm(slave::paths::getLatestSlavePath(metaDir)));
 
+  // NOTE: We setup up the resource provider with an extra storage pool, so that
+  // when the storage pool is offered, we know that the corresponding profile is
+  // known to the resource provider.
+  setupResourceProviderConfig(Gigabytes(4), None(), "label=foo");
+
   // A new registration would trigger another `SlaveRegisteredMessage`.
   slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, Not(slave.get()->pid));
 
   // After the agent fails over, any volume created before becomes a
-  // pre-existing volume, which has an ID but no profile.
-  auto isPreExistingVolume = [](const Resource& r) {
+  // preprovisioned volume, which has an ID but no profile.
+  auto isPreprovisionedVolume = [](const Resource& r) {
     return r.has_disk() &&
       r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW &&
       r.disk().source().has_id() &&
       !r.disk().source().has_profile();
   };
 
+  // NOTE: Instead of expecting a preprovisioned volume, we expect an offer with
+  // a 'test1' storage pool as an indication that the profile is known to the
+  // resource provider. The offer should also have the preprovisioned volume.
+  // But, an extra offer with the storage pool may be received as a side effect
+  // of this workaround, so we decline it if this happens.
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      isPreExistingVolume)))
-    .InSequence(offers)
-    .WillOnce(FutureArg<1>(&slaveRecoveredOffers));
+      std::bind(isStoragePool, lambda::_1, "test1"))))
+    .WillOnce(FutureArg<1>(&slaveRecoveredOffers))
+    .WillRepeatedly(DeclineOffers(declineFilters));
 
   slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
@@ -1586,23 +1624,65 @@ TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId)
   AWAIT_READY(slaveRegisteredMessage);
 
   AWAIT_READY(slaveRecoveredOffers);
-  ASSERT_FALSE(slaveRecoveredOffers->empty());
+  ASSERT_EQ(1u, slaveRecoveredOffers->size());
 
-  Option<Resource> preExistingVolume;
+  Resources _preprovisioned = Resources(slaveRecoveredOffers->at(0).resources())
+    .filter(isPreprovisionedVolume);
 
-  foreach (const Resource& resource, slaveRecoveredOffers->at(0).resources()) {
-    if (isPreExistingVolume(resource) &&
-        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
-      preExistingVolume = resource;
-    }
-  }
+  ASSERT_SOME_EQ(Gigabytes(2), _preprovisioned.disk());
 
-  ASSERT_SOME(preExistingVolume);
-  ASSERT_TRUE(preExistingVolume->has_provider_id());
-  ASSERT_NE(createdVolume->provider_id(), preExistingVolume->provider_id());
+  Resource preprovisioned = *_preprovisioned.begin();
+  ASSERT_TRUE(preprovisioned.has_provider_id());
+  ASSERT_NE(created.provider_id(), preprovisioned.provider_id());
+  ASSERT_EQ(created.disk().source().id(), preprovisioned.disk().source().id());
   ASSERT_EQ(
-      createdVolume->disk().source().id(),
-      preExistingVolume->disk().source().id());
+      created.disk().source().metadata(),
+      preprovisioned.disk().source().metadata());
+
+  // Apply profile 'test2' to the preprovisioned volume, which will fail.
+  EXPECT_CALL(
+      sched, resourceOffers(&driver, OffersHaveResource(preprovisioned)))
+    .WillOnce(FutureArg<1>(&operationFailedOffers));
+
+  Future<UpdateOperationStatusMessage> operationFailedStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  driver.acceptOffers(
+      {slaveRecoveredOffers->at(0).id()},
+      {CREATE_DISK(preprovisioned, Resource::DiskInfo::Source::MOUNT, "test2")},
+      acceptFilters);
+
+  AWAIT_READY(operationFailedStatus);
+  EXPECT_EQ(OPERATION_FAILED, operationFailedStatus->status().state());
+
+  AWAIT_READY(operationFailedOffers);
+  ASSERT_EQ(1u, operationFailedOffers->size());
+
+  // Apply profile 'test1' to the preprovisioned volume, which will succeed.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isMountDisk, lambda::_1, "test1"))))
+    .WillOnce(FutureArg<1>(&diskRecoveredOffers));
+
+  driver.acceptOffers(
+      {operationFailedOffers->at(0).id()},
+      {CREATE_DISK(preprovisioned, Resource::DiskInfo::Source::MOUNT, "test1")},
+      acceptFilters);
+
+  AWAIT_READY(diskRecoveredOffers);
+  ASSERT_EQ(1u, diskRecoveredOffers->size());
+
+  Resource recovered = *Resources(diskRecoveredOffers->at(0).resources())
+    .filter(std::bind(isMountDisk, lambda::_1, "test1"))
+    .begin();
+
+  ASSERT_EQ(preprovisioned.provider_id(), recovered.provider_id());
+
+  ASSERT_EQ(
+      preprovisioned.disk().source().id(), recovered.disk().source().id());
+
+  ASSERT_EQ(
+      preprovisioned.disk().source().metadata(),
+      recovered.disk().source().metadata());
 }
 
 
@@ -1612,7 +1692,10 @@ TEST_F(StorageLocalResourceProviderTest, AgentRegisteredWithNewId)
 TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -1821,7 +1904,10 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
 TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -2087,7 +2173,10 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
 TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesReboot)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -2394,7 +2483,10 @@ TEST_F(
     ROOT_PublishUnpublishResourcesPluginKilled)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -2657,7 +2749,9 @@ TEST_F(
 TEST_F(StorageLocalResourceProviderTest, ImportPreprovisionedVolume)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
 
   loadUriDiskProfileAdaptorModule(profilesPath);
 
@@ -2830,7 +2924,10 @@ TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdate)
   Clock::pause();
 
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -2985,7 +3082,10 @@ TEST_F(
   Clock::pause();
 
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -3234,7 +3334,10 @@ TEST_F(
 TEST_F(StorageLocalResourceProviderTest, OperationStateMetrics)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -3479,7 +3582,10 @@ TEST_F(StorageLocalResourceProviderTest, OperationStateMetrics)
 TEST_F(StorageLocalResourceProviderTest, CsiPluginRpcMetrics)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -3768,7 +3874,10 @@ TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
   Clock::pause();
 
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -3968,7 +4077,10 @@ TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler)
   Clock::pause();
 
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));
@@ -4154,7 +4266,10 @@ TEST_F(
   Clock::pause();
 
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
-  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
   loadUriDiskProfileAdaptorModule(profilesPath);
 
   setupResourceProviderConfig(Gigabytes(4));


[mesos] 10/14: Refactored the test CSI plugin.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 00a29b7cad7500446678e9c6411352a5929c360d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Nov 19 15:10:58 2018 -0800

    Refactored the test CSI plugin.
    
    This patch does not introduce any functional change to the test CSI
    plugin. It simply refactored the check for the default mount volume
    capability and the parsing of `--volumes` flag.
    
    Review: https://reviews.apache.org/r/69400
---
 src/examples/test_csi_plugin.cpp | 64 ++++++++++++++++------------------------
 1 file changed, 25 insertions(+), 39 deletions(-)

diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 66f4ee0..cbb61a5 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -58,7 +58,6 @@ using grpc::ServerBuilder;
 using grpc::ServerContext;
 using grpc::Status;
 
-
 constexpr char PLUGIN_NAME[] = "org.apache.mesos.csi.test";
 constexpr char NODE_ID[] = "localhost";
 constexpr Bytes DEFAULT_VOLUME_CAPACITY = Megabytes(64);
@@ -84,7 +83,7 @@ public:
 
     add(&Flags::volumes,
         "volumes",
-        "Creates pre-existing volumes upon start-up. The volumes are\n"
+        "Creates preprovisioned volumes upon start-up. The volumes are\n"
         "specified as a semicolon-delimited list of name:capacity pairs.\n"
         "If a volume with the same name already exists, the pair will be\n"
         "ignored. (Example: 'volume1:1GB;volume2:2GB')");
@@ -112,6 +111,13 @@ public:
       endpoint(_endpoint),
       availableCapacity(_availableCapacity)
   {
+    // Construct the default mount volume capability.
+    defaultVolumeCapability.mutable_mount();
+    defaultVolumeCapability.mutable_access_mode()
+      ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+
+    // Scan for preprovisioned volumes.
+    //
     // TODO(jieyu): Consider not using CHECKs here.
     Try<list<string>> paths = os::ls(workDir);
     CHECK_SOME(paths);
@@ -260,6 +266,7 @@ private:
   const string endpoint;
 
   Bytes availableCapacity;
+  csi::v0::VolumeCapability defaultVolumeCapability;
   hashmap<string, VolumeInfo> volumes;
 
   unique_ptr<Server> server;
@@ -499,19 +506,9 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
 
   foreach (const csi::v0::VolumeCapability& capability,
            request->volume_capabilities()) {
-    if (capability.has_mount() &&
-        (!capability.mount().fs_type().empty() ||
-         !capability.mount().mount_flags().empty())) {
-      response->set_supported(false);
-      response->set_message("Only default capability is supported");
-
-      return Status::OK;
-    }
-
-    if (capability.access_mode().mode() !=
-        csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
+    if (capability != defaultVolumeCapability) {
       response->set_supported(false);
-      response->set_message("Access mode is not supported");
+      response->set_message("Unsupported volume capabilities");
 
       return Status::OK;
     }
@@ -561,18 +558,9 @@ Status TestCSIPlugin::GetCapacity(
   foreach (const csi::v0::VolumeCapability& capability,
            request->volume_capabilities()) {
     // We report zero capacity for any capability other than the
-    // default-constructed `MountVolume` capability since this plugin
+    // default-constructed mount volume capability since this plugin
     // does not support any filesystem types and mount flags.
-    if (!capability.has_mount() ||
-        !capability.mount().fs_type().empty() ||
-        !capability.mount().mount_flags().empty()) {
-      response->set_available_capacity(0);
-
-      return Status::OK;
-    }
-
-    if (capability.access_mode().mode() !=
-        csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
+    if (capability != defaultVolumeCapability) {
       response->set_available_capacity(0);
 
       return Status::OK;
@@ -926,31 +914,29 @@ int main(int argc, char** argv)
   hashmap<string, Bytes> volumes;
 
   if (flags.volumes.isSome()) {
-    foreach (const string& token, strings::tokenize(flags.volumes.get(), ";")) {
-      vector<string> pair = strings::tokenize(token, ":");
-
+    foreachpair (const string& name,
+                 const vector<string>& capacities,
+                 strings::pairs(flags.volumes.get(), ";", ":")) {
       Option<Error> error;
 
-      if (pair.size() != 2) {
-        error = "Not a name:capacity pair";
-      } else if (pair[0].empty()) {
-        error = "Volume name cannot be empty";
-      } else if (pair[0].find_first_of(os::PATH_SEPARATOR) != string::npos) {
-        error = "Volume name cannot contain '/'";
-      } else if (volumes.contains(pair[0])) {
+      if (strings::contains(name, stringify(os::PATH_SEPARATOR))) {
+        error =
+          "Volume name cannot contain '" + stringify(os::PATH_SEPARATOR) + "'";
+      } else if (capacities.size() != 1) {
         error = "Volume name must be unique";
+      } else if (volumes.contains(name)) {
+        error = "Volume '" + name + "' already exists";
       } else {
-        Try<Bytes> capacity = Bytes::parse(pair[1]);
+        Try<Bytes> capacity = Bytes::parse(capacities[0]);
         if (capacity.isError()) {
           error = capacity.error();
         } else {
-          volumes.put(pair[0], capacity.get());
+          volumes.put(name, capacity.get());
         }
       }
 
       if (error.isSome()) {
-        cerr << "Failed to parse item '" << token << "' in 'volumes' flag: "
-             << error->message << endl;
+        cerr << "Failed to parse the '--volumes' flag: " << error.get() << endl;
         return EXIT_FAILURE;
       }
     }


[mesos] 07/14: Cleaned up `include/mesos/type_utils.hpp`.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 83ed35c45a540ac15177a936a306965039c1145d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Wed Nov 14 20:13:53 2018 -0800

    Cleaned up `include/mesos/type_utils.hpp`.
    
    This patch does the following cleanups:
    
    1. Moved `google::protobuf::Map` equality operator to `type_utils.hpp`.
    2. Moved the type helper templates for the protobuf library that do not
       involve mesos protobufs into the `google::protobuf` namespaces so ADL
       works appropriately.
    3. Removed the type helper templates for the protobuf library from
       `mesos/v1/mesos.hpp` to avoid redefinition.
    
    Review: https://reviews.apache.org/r/69363
---
 include/mesos/type_utils.hpp                       | 46 ++++++++++++++++++++--
 include/mesos/v1/mesos.hpp                         | 27 +------------
 .../storage/uri_disk_profile_adaptor.cpp           | 26 +-----------
 3 files changed, 47 insertions(+), 52 deletions(-)

diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index aa61c0c..3926f49 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -23,6 +23,7 @@
 
 #include <boost/functional/hash.hpp>
 
+#include <google/protobuf/map.h>
 #include <google/protobuf/repeated_field.h>
 
 #include <mesos/mesos.hpp>
@@ -475,7 +476,7 @@ std::ostream& operator<<(
 template <typename T>
 inline std::ostream& operator<<(
     std::ostream& stream,
-    const google::protobuf::RepeatedPtrField<T>& messages)
+    const std::vector<T>& messages)
 {
   stream << "[ ";
   for (auto it = messages.begin(); it != messages.end(); ++it) {
@@ -488,11 +489,48 @@ inline std::ostream& operator<<(
   return stream;
 }
 
+} // namespace mesos {
+
+
+/**
+ * Type utilities for the protobuf library that are not specific to particular
+ * protobuf classes. They are defined in the `google::protobuf` namespace for
+ * argument-dependent lookup.
+ */
+namespace google {
+namespace protobuf {
+
+template <typename Key, typename Value>
+inline bool operator==(
+    const Map<Key, Value>& left, const Map<Key, Value>& right)
+{
+  if (left.size() != right.size()) {
+    return false;
+  }
+
+  for (auto it = left.begin(); it != left.end(); ++it) {
+    auto found = right.find(it->first);
+    if (found == right.end() || found->second != it->second) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+
+template <typename Key, typename Value>
+inline bool operator!=(
+    const Map<Key, Value>& left, const Map<Key, Value>& right)
+{
+  return !(left == right);
+}
+
 
 template <typename T>
 inline std::ostream& operator<<(
     std::ostream& stream,
-    const std::vector<T>& messages)
+    const RepeatedPtrField<T>& messages)
 {
   stream << "[ ";
   for (auto it = messages.begin(); it != messages.end(); ++it) {
@@ -505,7 +543,9 @@ inline std::ostream& operator<<(
   return stream;
 }
 
-} // namespace mesos {
+} // namespace protobuf {
+} // namespace google {
+
 
 namespace std {
 
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index 452bcf2..dc328bf 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -23,8 +23,6 @@
 
 #include <boost/functional/hash.hpp>
 
-#include <google/protobuf/repeated_field.h>
-
 #include <mesos/v1/mesos.pb.h> // ONLY USEFUL AFTER RUNNING PROTOC.
 
 #include <stout/strings.hpp>
@@ -32,7 +30,7 @@
 // This file includes definitions for operators on public protobuf
 // classes (defined in mesos.proto, module.proto, etc.) that don't
 // have these operators generated by the protobuf compiler. The
-// corresponding definitions are in src/v1/type_utils.cpp.
+// corresponding definitions are in src/v1/mesos.cpp.
 //
 // Mesos modules need some of the protobuf classes defined in
 // mesos.proto, module.proto, etc., and require some of these
@@ -469,23 +467,6 @@ std::ostream& operator<<(
 template <typename T>
 inline std::ostream& operator<<(
     std::ostream& stream,
-    const google::protobuf::RepeatedPtrField<T>& messages)
-{
-  stream << "[ ";
-  for (auto it = messages.begin(); it != messages.end(); ++it) {
-    if (it != messages.begin()) {
-      stream << ", ";
-    }
-    stream << *it;
-  }
-  stream << " ]";
-  return stream;
-}
-
-
-template <typename T>
-inline std::ostream& operator<<(
-    std::ostream& stream,
     const std::vector<T>& messages)
 {
   stream << "[ ";
@@ -499,14 +480,10 @@ inline std::ostream& operator<<(
   return stream;
 }
 
-
-std::ostream& operator<<(
-    std::ostream& stream,
-    const hashmap<std::string, std::string>& map);
-
 } // namespace v1 {
 } // namespace mesos {
 
+
 namespace std {
 
 template <>
diff --git a/src/resource_provider/storage/uri_disk_profile_adaptor.cpp b/src/resource_provider/storage/uri_disk_profile_adaptor.cpp
index 6c998ef..cb574be 100644
--- a/src/resource_provider/storage/uri_disk_profile_adaptor.cpp
+++ b/src/resource_provider/storage/uri_disk_profile_adaptor.cpp
@@ -22,6 +22,8 @@
 
 #include <csi/spec.hpp>
 
+#include <mesos/type_utils.hpp>
+
 #include <mesos/module/disk_profile_adaptor.hpp>
 
 #include <process/defer.hpp>
@@ -58,30 +60,6 @@ namespace mesos {
 namespace internal {
 namespace storage {
 
-bool operator==(
-    const Map<string, string>& left,
-    const Map<string, string>& right) {
-  if (left.size() != right.size()) {
-    return false;
-  }
-
-  typename Map<string, string>::const_iterator iterator = left.begin();
-  while (iterator != left.end()) {
-    if (right.count(iterator->first) != 1) {
-      return false;
-    }
-
-    if (iterator->second != right.at(iterator->first)) {
-      return false;
-    }
-
-    ++iterator;
-  }
-
-  return true;
-}
-
-
 UriDiskProfileAdaptor::UriDiskProfileAdaptor(const Flags& _flags)
   : flags(_flags),
     process(new UriDiskProfileAdaptorProcess(flags))


[mesos] 04/14: Rewrote test `ConvertPreExistingVolume` for `CREATE_DISK`.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4c9f1a05e35c70918bd21cf5f9b536efda1da28c
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Sat Nov 10 16:56:36 2018 -0800

    Rewrote test `ConvertPreExistingVolume` for `CREATE_DISK`.
    
    Due to the changes of the `CREATE_DISK` semantics, this test is
    rewritten to convert a preprovisioned volume to a profile volumes, and
    then to destroy it to return the space back to the storage pool.
    
    NOTE: The updated test will fail unless r/69361 (which implements the
    new `CREATE_DISK` semantics) is also applied.
    
    Review: https://reviews.apache.org/r/69360
---
 .../storage_local_resource_provider_tests.cpp      | 243 +++++++++------------
 1 file changed, 104 insertions(+), 139 deletions(-)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 514e56d..e2a4d02 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -2651,51 +2651,35 @@ TEST_F(
 }
 
 
-// This test verifies that the storage local resource provider can
-// convert pre-existing CSI volumes into mount or block volumes.
-TEST_F(StorageLocalResourceProviderTest, ConvertPreExistingVolume)
+// This test verifies that the storage local resource provider can import a
+// preprovisioned CSI volume as a MOUNT disk of a given profile, and return the
+// space back to the storage pool after destroying the volume.
+TEST_F(StorageLocalResourceProviderTest, ImportPreprovisionedVolume)
 {
-  Clock::pause();
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
 
-  setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");
+  loadUriDiskProfileAdaptorModule(profilesPath);
 
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  // NOTE: We setup up the resource provider with an extra storage pool, so that
+  // when the storage pool is offered, we know that the corresponding profile is
+  // known to the resource provider.
+  setupResourceProviderConfig(Gigabytes(2), "volume1:2GB");
+
+  Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
   slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
-  // Since the local resource provider daemon is started after the agent
-  // is registered, it is guaranteed that the slave will send two
-  // `UpdateSlaveMessage`s, where the latter one contains resources from
-  // the storage local resource provider.
-  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
-  // Google Mock will search the expectations in reverse order.
-  Future<UpdateSlaveMessage> updateSlave2 =
-    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
-  Future<UpdateSlaveMessage> updateSlave1 =
-    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
   Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
-  // Advance the clock to trigger agent registration and prevent retry.
-  Clock::advance(slaveFlags.registration_backoff_factor);
-
-  AWAIT_READY(updateSlave1);
-
-  // NOTE: We need to resume the clock so that the resource provider can
-  // periodically check if the CSI endpoint socket has been created by
-  // the plugin container, which runs in another Linux process.
-  Clock::resume();
-
-  AWAIT_READY(updateSlave2);
-  ASSERT_TRUE(updateSlave2->has_resource_providers());
-
-  Clock::pause();
-
   // Register a framework to exercise operations.
   FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
   framework.set_roles(0, "storage");
@@ -2707,147 +2691,128 @@ TEST_F(StorageLocalResourceProviderTest, ConvertPreExistingVolume)
   EXPECT_CALL(sched, registered(&driver, _, _));
 
   // The framework is expected to see the following offers in sequence:
-  //   1. One containing two RAW pre-existing volumes before `CREATE_DISK`s.
-  //   2. One containing a MOUNT and a BLOCK disk resources after
-  //      `CREATE_DISK`s.
-  //   3. One containing two RAW pre-existing volumes after `DESTROY_DISK`s.
+  //   1. One containing a RAW preprovisioned volumes before `CREATE_DISK`.
+  //   2. One containing a MOUNT disk resources after `CREATE_DISK`.
+  //   3. One containing a RAW storage pool after `DESTROY_DISK`.
   //
   // We set up the expectations for these offers as the test progresses.
-  Future<vector<Offer>> rawDisksOffers;
-  Future<vector<Offer>> disksConvertedOffers;
-  Future<vector<Offer>> disksRevertedOffers;
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> diskCreatedOffers;
+  Future<vector<Offer>> diskDestroyedOffers;
 
-  // We are only interested in any pre-existing volume, which has an ID
-  // but no profile.
-  auto isPreExistingVolume = [](const Resource& r) {
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  auto isStoragePool = [](const Resource& r, const string& profile) {
     return r.has_disk() &&
       r.disk().has_source() &&
-      r.disk().source().has_id() &&
-      !r.disk().source().has_profile();
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW &&
+      !r.disk().source().has_id() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == profile;
   };
 
+  // NOTE: Instead of expecting a preprovisioned volume, we expect an offer with
+  // a 'test1' storage pool as an indication that the profile is known to the
+  // resource provider. The offer should also have the preprovisioned volume.
+  // But, an extra offer with the storage pool may be received as a side effect
+  // of this workaround, so we decline it if this happens.
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      isPreExistingVolume)))
-    .WillOnce(FutureArg<1>(&rawDisksOffers));
+      std::bind(isStoragePool, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&rawDiskOffers))
+    .WillRepeatedly(DeclineOffers(declineFilters));
 
   driver.start();
 
-  AWAIT_READY(rawDisksOffers);
-  ASSERT_FALSE(rawDisksOffers->empty());
-
-  vector<Resource> sources;
-
-  foreach (const Resource& resource, rawDisksOffers->at(0).resources()) {
-    if (isPreExistingVolume(resource) &&
-        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
-      sources.push_back(resource);
-    }
-  }
-
-  ASSERT_EQ(2u, sources.size());
-
-  // Create a volume and a block.
-  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      isPreExistingVolume)))
-    .WillOnce(FutureArg<1>(&disksConvertedOffers));
+  AWAIT_READY(rawDiskOffers);
+  ASSERT_EQ(1u, rawDiskOffers->size());
 
-  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
-  // Google Mock will search the expectations in reverse order.
-  Future<UpdateOperationStatusMessage> createBlockStatusUpdate =
-    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
-  Future<UpdateOperationStatusMessage> createVolumeStatusUpdate =
-    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+  auto isPreprovisionedVolume = [](const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW &&
+      r.disk().source().has_id() &&
+      !r.disk().source().has_profile();
+  };
 
-  driver.acceptOffers(
-      {rawDisksOffers->at(0).id()},
-      {CREATE_DISK(sources.at(0), Resource::DiskInfo::Source::MOUNT),
-       CREATE_DISK(sources.at(1), Resource::DiskInfo::Source::BLOCK)});
+  Resources _preprovisioned = Resources(rawDiskOffers->at(0).resources())
+    .filter(isPreprovisionedVolume);
 
-  AWAIT_READY(createVolumeStatusUpdate);
-  AWAIT_READY(createBlockStatusUpdate);
+  ASSERT_SOME_EQ(Gigabytes(2), _preprovisioned.disk());
 
-  // Advance the clock to trigger another allocation.
-  Clock::advance(masterFlags.allocation_interval);
+  Resource preprovisioned = *_preprovisioned.begin();
 
-  AWAIT_READY(disksConvertedOffers);
-  ASSERT_FALSE(disksConvertedOffers->empty());
+  // Get the volume path of the preprovisioned volume.
+  Option<string> volumePath;
 
-  Option<Resource> volume;
-  Option<Resource> block;
-
-  foreach (const Resource& resource, disksConvertedOffers->at(0).resources()) {
-    if (isPreExistingVolume(resource)) {
-      if (resource.disk().source().type() ==
-            Resource::DiskInfo::Source::MOUNT) {
-        volume = resource;
-      } else if (resource.disk().source().type() ==
-                   Resource::DiskInfo::Source::BLOCK) {
-        block = resource;
-      }
+  foreach (const Label& label,
+           preprovisioned.disk().source().metadata().labels()) {
+    if (label.key() == "path") {
+      volumePath = label.value();
+      break;
     }
   }
 
-  ASSERT_SOME(volume);
-  ASSERT_TRUE(volume->disk().source().has_mount());
-  ASSERT_TRUE(volume->disk().source().mount().has_root());
-  EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+  ASSERT_SOME(volumePath);
+  ASSERT_TRUE(os::exists(volumePath.get()));
 
-  ASSERT_SOME(block);
+  auto isMountDisk = [](const Resource& r, const string& profile) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::MOUNT &&
+      r.disk().source().has_id() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == profile;
+  };
 
-  // Destroy the created volume.
+  // Apply profile 'test' to the preprovisioned volume.
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      isPreExistingVolume)))
-    .WillOnce(FutureArg<1>(&disksRevertedOffers));
+      std::bind(isMountDisk, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&diskCreatedOffers));
 
-  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
-  // Google Mock will search the expectations in reverse order.
-  Future<UpdateOperationStatusMessage> destroyBlockStatusUpdate =
-    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
-  Future<UpdateOperationStatusMessage> destroyVolumeStatusUpdate =
-    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
 
   driver.acceptOffers(
-      {disksConvertedOffers->at(0).id()},
-      {DESTROY_DISK(volume.get()),
-       DESTROY_DISK(block.get())});
-
-  AWAIT_READY(destroyVolumeStatusUpdate);
-  AWAIT_READY(destroyBlockStatusUpdate);
-
-  // Advance the clock to trigger another allocation.
-  Clock::advance(masterFlags.allocation_interval);
+      {rawDiskOffers->at(0).id()},
+      {CREATE_DISK(preprovisioned, Resource::DiskInfo::Source::MOUNT, "test")},
+      acceptFilters);
 
-  AWAIT_READY(disksRevertedOffers);
-  ASSERT_FALSE(disksRevertedOffers->empty());
+  AWAIT_READY(diskCreatedOffers);
+  ASSERT_EQ(1u, diskCreatedOffers->size());
 
-  vector<Resource> destroyed;
+  Resource created = *Resources(diskCreatedOffers->at(0).resources())
+    .filter(std::bind(isMountDisk, lambda::_1, "test"))
+    .begin();
 
-  foreach (const Resource& resource, disksRevertedOffers->at(0).resources()) {
-    if (isPreExistingVolume(resource) &&
-        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
-      destroyed.push_back(resource);
-    }
-  }
+  // Destroy the created disk.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isStoragePool, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&diskDestroyedOffers));
 
-  ASSERT_EQ(2u, destroyed.size());
+  driver.acceptOffers(
+      {diskCreatedOffers->at(0).id()},
+      {DESTROY_DISK(created)},
+      acceptFilters);
 
-  foreach (const Resource& resource, destroyed) {
-    ASSERT_FALSE(resource.disk().source().has_mount());
-    ASSERT_TRUE(resource.disk().source().has_metadata());
+  AWAIT_READY(diskDestroyedOffers);
+  ASSERT_EQ(1u, diskDestroyedOffers->size());
 
-    // Check if the volume is not deleted by the test CSI plugin.
-    Option<string> volumePath;
+  Resources raw = Resources(diskDestroyedOffers->at(0).resources())
+    .filter(std::bind(isStoragePool, lambda::_1, "test"));
 
-    foreach (const Label& label, resource.disk().source().metadata().labels()) {
-      if (label.key() == "path") {
-        volumePath = label.value();
-        break;
-      }
-    }
+  EXPECT_SOME_EQ(Gigabytes(4), raw.disk());
 
-    ASSERT_SOME(volumePath);
-    EXPECT_TRUE(os::exists(volumePath.get()));
-  }
+  // Check if the volume is deleted by the test CSI plugin.
+  EXPECT_FALSE(os::exists(volumePath.get()));
 }
 
 


[mesos] 03/14: Rewrote test `ReconcileDroppedOperation` for `CREATE_DISK`.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 0c2b715db40d20789c214b8dad6c49196198ea72
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Wed Nov 14 21:03:19 2018 -0800

    Rewrote test `ReconcileDroppedOperation` for `CREATE_DISK`.
    
    Previously the `ReconcileDroppedOperation` test relies on converting
    preprovisioned volumes. To adapt the new semantics for `CREATE_DISK`,
    this test is rewritten to create two disks from a storage pool, with one
    operation dropped.
    
    Review: https://reviews.apache.org/r/69359
---
 .../storage_local_resource_provider_tests.cpp      | 74 +++++++++++-----------
 1 file changed, 37 insertions(+), 37 deletions(-)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index dddb608..514e56d 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -3793,15 +3793,20 @@ TEST_F(StorageLocalResourceProviderTest, CsiPluginRpcMetrics)
 
 
 // Master reconciles operations that are missing from a reregistering slave.
-// In this case, the `ApplyOperationMessage` is dropped, so the resource
-// provider should send OPERATION_DROPPED. Operations on agent default
-// resources are also tested here; for such operations, the agent generates the
-// dropped status.
+// In this case, one of the two `ApplyOperationMessage`s is dropped, so the
+// resource provider should send only one OPERATION_DROPPED.
+//
+// TODO(greggomann): Test operations on agent default resources: for such
+// operations, the agent generates the dropped status.
 TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
 {
   Clock::pause();
 
-  setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+  ASSERT_SOME(os::write(profilesPath, createDiskProfileMapping("test")));
+  loadUriDiskProfileAdaptorModule(profilesPath);
+
+  setupResourceProviderConfig(Gigabytes(4));
 
   master::Flags masterFlags = CreateMasterFlags();
   Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
@@ -3810,6 +3815,7 @@ TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
   StandaloneMasterDetector detector(master.get()->pid);
 
   slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
@@ -3865,40 +3871,33 @@ TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillRepeatedly(DeclineOffers(declineFilters));
 
-  // We are only interested in pre-existing volumes, which have IDs but no
-  // profile. We use pre-existing volumes to make it easy to send multiple
-  // operations on multiple resources.
-  auto isPreExistingVolume = [](const Resource& r) {
+  auto isRaw = [](const Resource& r) {
     return r.has_disk() &&
       r.disk().has_source() &&
-      r.disk().source().has_id() &&
-      !r.disk().source().has_profile();
+      r.disk().source().has_profile() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW;
   };
 
   Future<vector<Offer>> offersBeforeOperations;
 
-  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      isPreExistingVolume)))
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(isRaw)))
     .WillOnce(FutureArg<1>(&offersBeforeOperations))
     .WillRepeatedly(DeclineOffers(declineFilters)); // Decline further offers.
 
   driver.start();
 
   AWAIT_READY(offersBeforeOperations);
-  ASSERT_FALSE(offersBeforeOperations->empty());
-
-  vector<Resource> sources;
+  ASSERT_EQ(1u, offersBeforeOperations->size());
 
-  foreach (
-      const Resource& resource,
-      offersBeforeOperations->at(0).resources()) {
-    if (isPreExistingVolume(resource) &&
-        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
-      sources.push_back(resource);
-    }
-  }
+  Resources raw =
+    Resources(offersBeforeOperations->at(0).resources()).filter(isRaw);
 
-  ASSERT_EQ(2u, sources.size());
+  // Create two MOUNT disks of 2GB each.
+  ASSERT_SOME_EQ(Gigabytes(4), raw.disk());
+  Resource source1 = *raw.begin();
+  source1.mutable_scalar()->set_value(
+      static_cast<double>(Gigabytes(2).bytes()) / Bytes::MEGABYTES);
+  Resource source2 = *(raw - source1).begin();
 
   // Drop one of the operations on the way to the agent.
   Future<ApplyOperationMessage> applyOperationMessage =
@@ -3916,8 +3915,8 @@ TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
   // Attempt the creation of two volumes.
   driver.acceptOffers(
       {offersBeforeOperations->at(0).id()},
-      {CREATE_DISK(sources.at(0), Resource::DiskInfo::Source::MOUNT),
-       CREATE_DISK(sources.at(1), Resource::DiskInfo::Source::MOUNT)},
+      {CREATE_DISK(source1, Resource::DiskInfo::Source::MOUNT),
+       CREATE_DISK(source2, Resource::DiskInfo::Source::MOUNT)},
       acceptFilters);
 
   // Ensure that the operations are processed.
@@ -3964,8 +3963,15 @@ TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
 
   Future<vector<Offer>> offersAfterOperations;
 
-  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      isPreExistingVolume)))
+  auto isMountDisk = [](const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::MOUNT;
+  };
+
+  EXPECT_CALL(
+      sched, resourceOffers(&driver, OffersHaveAnyResource(isMountDisk)))
     .WillOnce(FutureArg<1>(&offersAfterOperations));
 
   // Advance the clock to trigger a batch allocation.
@@ -3974,14 +3980,8 @@ TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
   AWAIT_READY(offersAfterOperations);
   ASSERT_FALSE(offersAfterOperations->empty());
 
-  vector<Resource> converted;
-
-  foreach (const Resource& resource, offersAfterOperations->at(0).resources()) {
-    if (isPreExistingVolume(resource) &&
-        resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT) {
-      converted.push_back(resource);
-    }
-  }
+  Resources converted =
+    Resources(offersAfterOperations->at(0).resources()).filter(isMountDisk);
 
   ASSERT_EQ(1u, converted.size());
 


[mesos] 06/14: Added validation for `Offer.Operation.CreateDisk.target_profile`.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 20b5dda99dda43b47e001dd3826b71597debee02
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Wed Nov 14 20:34:51 2018 -0800

    Added validation for `Offer.Operation.CreateDisk.target_profile`.
    
    Review: https://reviews.apache.org/r/69356
---
 src/master/validation.cpp             |  6 +++++
 src/tests/master_validation_tests.cpp | 44 +++++++++++++++++++++++++++++++----
 2 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 5768ac8..5fccc9f 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -2528,6 +2528,12 @@ Option<Error> validate(const Offer::Operation::CreateDisk& createDisk)
     return Error("'target_type' is neither MOUNT or BLOCK");
   }
 
+  if (source.disk().source().has_profile() == createDisk.has_target_profile()) {
+    return createDisk.has_target_profile()
+      ? Error("'target_profile' must not be set when 'source' has a profile")
+      : Error("'target_profile' must be set when 'source' has no profile");
+  }
+
   return None();
 }
 
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index aa7c8f7..3aa7b2a 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -2025,32 +2025,46 @@ TEST_F(ShrinkVolumeOperationValidationTest, MissingCapability)
 TEST(OperationValidationTest, CreateDisk)
 {
   Resource disk1 = createDiskResource(
-      "10", "*", None(), None(), createDiskSourceRaw());
+      "10", "*", None(), None(), createDiskSourceRaw(None(), "profile"));
 
   Resource disk2 = createDiskResource(
-      "20", "*", None(), None(), createDiskSourceMount());
+      "20", "*", None(), None(), createDiskSourceRaw());
 
   Resource disk3 = createDiskResource(
-      "30", "*", None(), None(), createDiskSourceRaw());
+      "30", "*", None(), None(), createDiskSourceMount());
+
+  Resource disk4 = createDiskResource(
+      "40", "*", None(), None(), createDiskSourceRaw(None(), "profile"));
 
   disk1.mutable_provider_id()->set_value("provider1");
   disk2.mutable_provider_id()->set_value("provider2");
+  disk3.mutable_provider_id()->set_value("provider3");
 
   Offer::Operation::CreateDisk createDisk;
   createDisk.mutable_source()->CopyFrom(disk1);
   createDisk.set_target_type(Resource::DiskInfo::Source::MOUNT);
+  createDisk.clear_target_profile();
 
   Option<Error> error = operation::validate(createDisk);
   EXPECT_NONE(error);
 
   createDisk.mutable_source()->CopyFrom(disk1);
   createDisk.set_target_type(Resource::DiskInfo::Source::BLOCK);
+  createDisk.clear_target_profile();
+
+  error = operation::validate(createDisk);
+  EXPECT_NONE(error);
+
+  createDisk.mutable_source()->CopyFrom(disk2);
+  createDisk.set_target_type(Resource::DiskInfo::Source::MOUNT);
+  createDisk.set_target_profile("profile");
 
   error = operation::validate(createDisk);
   EXPECT_NONE(error);
 
   createDisk.mutable_source()->CopyFrom(disk1);
   createDisk.set_target_type(Resource::DiskInfo::Source::PATH);
+  createDisk.clear_target_profile();
 
   error = operation::validate(createDisk);
   ASSERT_SOME(error);
@@ -2058,17 +2072,39 @@ TEST(OperationValidationTest, CreateDisk)
       error->message,
       "'target_type' is neither MOUNT or BLOCK"));
 
+  createDisk.mutable_source()->CopyFrom(disk1);
+  createDisk.set_target_type(Resource::DiskInfo::Source::MOUNT);
+  createDisk.set_target_profile("profile");
+
+  error = operation::validate(createDisk);
+  ASSERT_SOME(error);
+  EXPECT_TRUE(strings::contains(
+      error->message,
+      "'target_profile' must not be set when 'source' has a profile"));
+
   createDisk.mutable_source()->CopyFrom(disk2);
   createDisk.set_target_type(Resource::DiskInfo::Source::MOUNT);
+  createDisk.clear_target_profile();
 
   error = operation::validate(createDisk);
   ASSERT_SOME(error);
   EXPECT_TRUE(strings::contains(
       error->message,
-      "'source' is not a RAW disk resource"));
+      "'target_profile' must be set when 'source' has no profile"));
 
   createDisk.mutable_source()->CopyFrom(disk3);
   createDisk.set_target_type(Resource::DiskInfo::Source::MOUNT);
+  createDisk.clear_target_profile();
+
+  error = operation::validate(createDisk);
+  ASSERT_SOME(error);
+  EXPECT_TRUE(strings::contains(
+      error->message,
+      "'source' is not a RAW disk resource"));
+
+  createDisk.mutable_source()->CopyFrom(disk4);
+  createDisk.set_target_type(Resource::DiskInfo::Source::MOUNT);
+  createDisk.clear_target_profile();
 
   error = operation::validate(createDisk);
   ASSERT_SOME(error);


[mesos] 12/14: Added the `--create_parameters` flag to the test CSI plugin.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3718c8952f5449dc797228f39fd80bea5de9c6f0
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Nov 19 15:26:21 2018 -0800

    Added the `--create_parameters` flag to the test CSI plugin.
    
    When the flag is specified, `CreateVolume` and `GetCapacity` work only
    if the `parameters` argument matches this flag. This will be used to
    test the checkpointing of create parameters of CSI volumes in SLRP.
    
    Review: https://reviews.apache.org/r/69364
---
 src/examples/test_csi_plugin.cpp | 49 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 48 insertions(+), 1 deletion(-)

diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 8b6adb2..af18303 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -21,6 +21,8 @@
 #include <grpcpp/server_context.h>
 #include <grpcpp/security/server_credentials.h>
 
+#include <mesos/type_utils.hpp>
+
 #include <stout/bytes.hpp>
 #include <stout/flags.hpp>
 #include <stout/hashmap.hpp>
@@ -81,6 +83,12 @@ public:
         "The available disk capacity managed by the plugin, in addition\n"
         "to the pre-existing volumes specified in the --volumes flag.");
 
+    add(&Flags::create_parameters,
+        "create_parameters",
+        "The parameters required for volume creation. The parameters are\n"
+        "specified as a semicolon-delimited list of param=value pairs.\n"
+        "(Example: 'param1=value1;param2=value2')");
+
     add(&Flags::volumes,
         "volumes",
         "Creates preprovisioned volumes upon start-up. The volumes are\n"
@@ -92,6 +100,7 @@ public:
   string endpoint;
   string work_dir;
   Bytes available_capacity;
+  Option<string> create_parameters;
   Option<string> volumes;
 };
 
@@ -106,10 +115,12 @@ public:
       const string& _workDir,
       const string& _endpoint,
       const Bytes& _availableCapacity,
+      const hashmap<string, string>& _createParameters,
       const hashmap<string, Bytes>& _volumes)
     : workDir(_workDir),
       endpoint(_endpoint),
-      availableCapacity(_availableCapacity)
+      availableCapacity(_availableCapacity),
+      createParameters(_createParameters.begin(), _createParameters.end())
   {
     // Construct the default mount volume capability.
     defaultVolumeCapability.mutable_mount();
@@ -267,6 +278,7 @@ private:
 
   Bytes availableCapacity;
   csi::v0::VolumeCapability defaultVolumeCapability;
+  google::protobuf::Map<string, string> createParameters;
   hashmap<string, VolumeInfo> volumes;
 
   unique_ptr<Server> server;
@@ -338,6 +350,10 @@ Status TestCSIPlugin::CreateVolume(
     }
   }
 
+  if (request->parameters() != createParameters) {
+    return Status(grpc::INVALID_ARGUMENT, "Unsupported create parameters");
+  }
+
   // The volume ID is determined by `name`, so we check whether the volume
   // corresponding to `name` is compatible to the request if it exists.
   if (volumes.contains(request->name())) {
@@ -530,6 +546,8 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
     }
   }
 
+  // TODO(chhsiao): Validate the parameters once we get CSI v1.
+
   response->set_supported(true);
 
   return Status::OK;
@@ -583,6 +601,12 @@ Status TestCSIPlugin::GetCapacity(
     }
   }
 
+  if (request->parameters() != createParameters) {
+      response->set_available_capacity(0);
+
+      return Status::OK;
+  }
+
   response->set_available_capacity(availableCapacity.bytes());
 
   return Status::OK;
@@ -927,6 +951,28 @@ int main(int argc, char** argv)
     return EXIT_FAILURE;
   }
 
+  hashmap<string, string> createParameters;
+
+  if (flags.create_parameters.isSome()) {
+    foreachpair (const string& param,
+                 const vector<string>& values,
+                 strings::pairs(flags.create_parameters.get(), ";", "=")) {
+      Option<Error> error;
+
+      if (values.size() != 1) {
+        error = "Parameter keys must be unique";
+      } else {
+        createParameters.put(param, values[0]);
+      }
+
+      if (error.isSome()) {
+        cerr << "Failed to parse the '--create_parameters' flags: "
+             << error->message << endl;
+        return EXIT_FAILURE;
+      }
+    }
+  }
+
   hashmap<string, Bytes> volumes;
 
   if (flags.volumes.isSome()) {
@@ -971,6 +1017,7 @@ int main(int argc, char** argv)
       flags.work_dir,
       flags.endpoint,
       flags.available_capacity,
+      createParameters,
       volumes));
 
   plugin->wait();


[mesos] 14/14: Used `OperationID` instead of `string` in test helpers.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit bf4e8b392b3fa58ffdbf5f14ce3f0ba7a1674a0c
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Nov 15 01:27:39 2018 -0800

    Used `OperationID` instead of `string` in test helpers.
    
    This change makes the helper more consistent with other helpers, and is
    more future-proof to changes in `OperationID`.
    
    Review: https://reviews.apache.org/r/69366
---
 src/tests/master_slave_reconciliation_tests.cpp    |  2 +-
 src/tests/master_tests.cpp                         |  2 +-
 src/tests/mesos.hpp                                | 40 +++++++++++++---------
 src/tests/operation_reconciliation_tests.cpp       |  4 +--
 .../storage_local_resource_provider_tests.cpp      | 16 +++++----
 5 files changed, 37 insertions(+), 27 deletions(-)

diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
index 064b4d4..de6e382 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -577,7 +577,7 @@ TEST_F(
   operationId.set_value("operation");
 
   mesos.send(v1::createCallAccept(
-      frameworkId, offer, {v1::RESERVE(reserved, operationId.value())}));
+      frameworkId, offer, {v1::RESERVE(reserved, operationId)}));
 
   AWAIT_READY(applyOperationMessage);
 
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 561c382..ef2c001 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -9286,7 +9286,7 @@ TEST_F(MasterTest, DropOperationWithIDAffectingDefaultResources)
   operationId.set_value("operation");
 
   mesos.send(v1::createCallAccept(
-      frameworkId, offer, {v1::RESERVE(reserved, operationId.value())}));
+      frameworkId, offer, {v1::RESERVE(reserved, operationId)}));
 
   // Wait for the framework to receive the OPERATION_ERROR update.
   AWAIT_READY(operationErrorUpdate);
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 9850821..c08e7e6 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1310,17 +1310,17 @@ inline TDomainInfo createDomainInfo(
 
 
 // Helpers for creating operations.
-template <typename TResources, typename TOffer>
+template <typename TResources, typename TOperationID, typename TOffer>
 inline typename TOffer::Operation RESERVE(
     const TResources& resources,
-    const Option<std::string> operationId = None())
+    const Option<TOperationID>& operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::RESERVE);
   operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
 
   if (operationId.isSome()) {
-    operation.mutable_id()->set_value(operationId.get());
+    operation.mutable_id()->CopyFrom(operationId.get());
   }
 
   return operation;
@@ -1410,12 +1410,16 @@ inline typename TOffer::Operation LAUNCH_GROUP(
 }
 
 
-template <typename TResource, typename TTargetType, typename TOffer>
+template <
+    typename TResource,
+    typename TTargetType,
+    typename TOperationID,
+    typename TOffer>
 inline typename TOffer::Operation CREATE_DISK(
     const TResource& source,
     const TTargetType& targetType,
     const Option<std::string>& targetProfile = None(),
-    const Option<std::string>& operationId = None())
+    const Option<TOperationID>& operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::CREATE_DISK);
@@ -1427,7 +1431,7 @@ inline typename TOffer::Operation CREATE_DISK(
   }
 
   if (operationId.isSome()) {
-    operation.mutable_id()->set_value(operationId.get());
+    operation.mutable_id()->CopyFrom(operationId.get());
   }
 
   return operation;
@@ -1747,7 +1751,8 @@ inline DomainInfo createDomainInfo(Args&&... args)
 template <typename... Args>
 inline Offer::Operation RESERVE(Args&&... args)
 {
-  return common::RESERVE<Resources, Offer>(std::forward<Args>(args)...);
+  return common::RESERVE<Resources, OperationID, Offer>(
+      std::forward<Args>(args)...);
 }
 
 
@@ -1804,9 +1809,9 @@ inline Offer::Operation LAUNCH_GROUP(Args&&... args)
 template <typename... Args>
 inline Offer::Operation CREATE_DISK(Args&&... args)
 {
-  return common::CREATE_DISK<Resource,
-                             Resource::DiskInfo::Source::Type,
-                             Offer>(std::forward<Args>(args)...);
+  return common::
+    CREATE_DISK<Resource, Resource::DiskInfo::Source::Type, OperationID, Offer>(
+        std::forward<Args>(args)...);
 }
 
 
@@ -2041,8 +2046,10 @@ inline hashmap<std::string, double> convertToHashmap(Args&&... args)
 template <typename... Args>
 inline mesos::v1::Offer::Operation RESERVE(Args&&... args)
 {
-  return common::RESERVE<mesos::v1::Resources, mesos::v1::Offer>(
-      std::forward<Args>(args)...);
+  return common::RESERVE<
+      mesos::v1::Resources,
+      mesos::v1::OperationID,
+      mesos::v1::Offer>(std::forward<Args>(args)...);
 }
 
 
@@ -2107,10 +2114,11 @@ inline mesos::v1::Offer::Operation LAUNCH_GROUP(Args&&... args)
 template <typename... Args>
 inline mesos::v1::Offer::Operation CREATE_DISK(Args&&... args)
 {
-  return common::CREATE_DISK<mesos::v1::Resource,
-                             mesos::v1::Resource::DiskInfo::Source::Type,
-                             mesos::v1::Offer>(
-      std::forward<Args>(args)...);
+  return common::CREATE_DISK<
+      mesos::v1::Resource,
+      mesos::v1::Resource::DiskInfo::Source::Type,
+      mesos::v1::OperationID,
+      mesos::v1::Offer>(std::forward<Args>(args)...);
 }
 
 
diff --git a/src/tests/operation_reconciliation_tests.cpp b/src/tests/operation_reconciliation_tests.cpp
index 8afac9a..a5e81a1 100644
--- a/src/tests/operation_reconciliation_tests.cpp
+++ b/src/tests/operation_reconciliation_tests.cpp
@@ -179,7 +179,7 @@ TEST_P(OperationReconciliationTest, PendingOperation)
   mesos.send(createCallAccept(
       frameworkId,
       offer,
-      {RESERVE(reservedResources, operationId.value())}));
+      {RESERVE(reservedResources, operationId)}));
 
   AWAIT_READY(applyOperationMessage);
 
@@ -795,7 +795,7 @@ TEST_P(OperationReconciliationTest, AgentPendingOperationAfterMasterFailover)
            source.get(),
            Resource::DiskInfo::Source::MOUNT,
            None(),
-           operationId.value())}));
+           operationId)}));
 
   AWAIT_READY(applyOperation);
 
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 8c88c14..422a152 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -4204,7 +4204,9 @@ TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler)
     .WillOnce(FutureArg<1>(&retriedUpdate));
 
   // Create a volume.
-  const string operationId = "operation";
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
   mesos.send(v1::createCallAccept(
       frameworkId,
       offer,
@@ -4216,7 +4218,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler)
 
   AWAIT_READY(update);
 
-  ASSERT_EQ(operationId, update->status().operation_id().value());
+  ASSERT_EQ(operationId, update->status().operation_id());
   ASSERT_EQ(
       mesos::v1::OperationState::OPERATION_FINISHED, update->status().state());
   ASSERT_TRUE(update->status().has_uuid());
@@ -4230,7 +4232,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler)
   // should resend it after the status update retry interval minimum.
   AWAIT_READY(retriedUpdate);
 
-  ASSERT_EQ(operationId, retriedUpdate->status().operation_id().value());
+  ASSERT_EQ(operationId, retriedUpdate->status().operation_id());
   ASSERT_EQ(
       mesos::v1::OperationState::OPERATION_FINISHED,
       retriedUpdate->status().state());
@@ -4397,10 +4399,10 @@ TEST_F(
       frameworkId,
       offer,
       {v1::CREATE_DISK(
-          source.get(),
-          v1::Resource::DiskInfo::Source::MOUNT,
-          None(),
-          operationId.value())}));
+           source.get(),
+           v1::Resource::DiskInfo::Source::MOUNT,
+           None(),
+           operationId)}));
 
   AWAIT_READY(update);
 


[mesos] 08/14: Checkpointed creation parameters for CSI volumes.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 67b9972436bec23f82b9e30cd23d9651fad2764e
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Nov 15 16:39:59 2018 -0800

    Checkpointed creation parameters for CSI volumes.
    
    The parameters of CSI volumes created by SLRPs are now checkpointed, and
    used to validate volumes created from previous SLRP runs.
    
    Review: https://reviews.apache.org/r/69362
---
 src/csi/state.proto                        |   6 ++
 src/resource_provider/storage/provider.cpp | 103 +++++++++++++++--------------
 2 files changed, 59 insertions(+), 50 deletions(-)

diff --git a/src/csi/state.proto b/src/csi/state.proto
index 8445399..264a565 100644
--- a/src/csi/state.proto
+++ b/src/csi/state.proto
@@ -20,6 +20,9 @@ import "csi.proto";
 
 package mesos.csi.state;
 
+// NOTE: The keywords 'REQUIRED' and 'OPTIONAL' are to be interpreted as
+// described in the CSI specification:
+// https://github.com/container-storage-interface/spec/blob/master/spec.md#required-vs-optional // NOLINT
 
 // Represents the state of a provisioned volume with respect to a node.
 message VolumeState {
@@ -43,6 +46,9 @@ message VolumeState {
   // The capability used to publish the volume. This is a REQUIRED field.
   .csi.v0.VolumeCapability volume_capability = 2;
 
+  // The parameters used when creating the volume. This is an OPTIONAL field.
+  map<string, string> parameters = 6;
+
   // Attributes of the volume to be used on the node. This field MUST match the
   // attributes of the `Volume` returned by `CreateVolume`. This is an OPTIONAL
   // field.
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index ebd5a88..a22c82c 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -403,10 +403,10 @@ private:
       const Bytes& capacity,
       const DiskProfileAdaptor::ProfileInfo& profileInfo);
   Future<bool> deleteVolume(const string& volumeId);
-  Future<string> validateCapability(
+  Future<Nothing> validateVolume(
       const string& volumeId,
       const Option<Labels>& metadata,
-      const csi::v0::VolumeCapability& capability);
+      const DiskProfileAdaptor::ProfileInfo& profileInfo);
   Future<Resources> listVolumes();
   Future<Resources> getCapacities();
 
@@ -2681,6 +2681,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
             volumeState.set_state(VolumeState::CREATED);
             volumeState.mutable_volume_capability()
               ->CopyFrom(profileInfo.capability);
+            *volumeState.mutable_parameters() = profileInfo.parameters;
             *volumeState.mutable_volume_attributes() = volume.attributes();
 
             volumes.put(volume.id(), std::move(volumeState));
@@ -2796,17 +2797,33 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
-// Validates if a volume has the specified capability. This is called when
-// applying `CREATE_DISK` on a pre-existing volume, so we make it return a
-// volume ID, similar to `createVolume`.
-// NOTE: This can only be called after `prepareIdentityService` and only for
-// newly discovered volumes.
-Future<string> StorageLocalResourceProviderProcess::validateCapability(
+// Validates if a volume supports the capability of the specified profile.
+//
+// NOTE: This can only be called after `prepareIdentityService`.
+//
+// TODO(chhsiao): Validate the volume against the parameters of the profile once
+// we get CSI v1.
+Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
     const string& volumeId,
     const Option<Labels>& metadata,
-    const csi::v0::VolumeCapability& capability)
+    const DiskProfileAdaptor::ProfileInfo& profileInfo)
 {
-  CHECK(!volumes.contains(volumeId));
+  // If the volume has a checkpointed state, the validation succeeds only if the
+  // capability and parameters of the specified profile are the same as those in
+  // the checkpoint.
+  if (volumes.contains(volumeId)) {
+    const VolumeState& volumeState = volumes.at(volumeId).state;
+
+    if (volumeState.volume_capability() != profileInfo.capability) {
+      return Failure("Invalid volume capability for volume '" + volumeId + "'");
+    }
+
+    if (volumeState.parameters() != profileInfo.parameters) {
+      return Failure("Invalid parameters for volume '" + volumeId + "'");
+    }
+
+    return Nothing();
+  }
 
   if (!pluginCapabilities.controllerService) {
     return Failure(
@@ -2820,19 +2837,20 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
       google::protobuf::Map<string, string> volumeAttributes;
 
       if (metadata.isSome()) {
-        volumeAttributes = convertLabelsToStringMap(metadata.get()).get();
+        volumeAttributes =
+          CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
       }
 
       csi::v0::ValidateVolumeCapabilitiesRequest request;
       request.set_volume_id(volumeId);
-      request.add_volume_capabilities()->CopyFrom(capability);
+      request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
       *request.mutable_volume_attributes() = volumeAttributes;
 
       return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
           client, std::move(request))
         .then(defer(self(), [=](
             const csi::v0::ValidateVolumeCapabilitiesResponse& response)
-            -> Future<string> {
+            -> Future<Nothing> {
           if (!response.supported()) {
             return Failure(
                 "Unsupported volume capability for volume '" + volumeId +
@@ -2841,13 +2859,15 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
 
           VolumeState volumeState;
           volumeState.set_state(VolumeState::CREATED);
-          volumeState.mutable_volume_capability()->CopyFrom(capability);
+          volumeState.mutable_volume_capability()
+            ->CopyFrom(profileInfo.capability);
+          *volumeState.mutable_parameters() = profileInfo.parameters;
           *volumeState.mutable_volume_attributes() = volumeAttributes;
 
           volumes.put(volumeId, std::move(volumeState));
           checkpointVolumeState(volumeId);
 
-          return volumeId;
+          return Nothing();
         }));
     }));
 }
@@ -3103,12 +3123,13 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
   // the same volume will be returned when recovering from a failover).
   //
   // For 2, the target profile will be specified, so we first check if the
-  // profile is mount or block capable. Then, there are two scenarios:
-  //   a. If the volume has a checkpointed state (because it was created
-  //      by a previous resource provider), we simply check if its checkpointed
+  // profile is mount or block capable. Then, we call `validateVolume` to handle
+  // the following two scenarios:
+  //   a. If the volume has a checkpointed state (because it is created by a
+  //      previous resource provider), we simply check if its checkpointed
   //      capability and parameters match the profile.
-  //   b. If the volume is newly discovered, `ValidateVolumeCapabilities`
-  //      is called with the capability of the profile.
+  //   b. If the volume is newly discovered, `ValidateVolumeCapabilities` is
+  //      called with the capability of the profile.
   CHECK_NE(resource.disk().source().has_profile(),
            resource.disk().source().has_id() && targetProfile.isSome());
 
@@ -3142,39 +3163,21 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
     }
   }
 
-  Future<string> created;
-  if (resource.disk().source().has_id()) {
-    const string& volumeId = resource.disk().source().id();
-
-    if (volumes.contains(volumeId)) {
-      const VolumeState& volumeState = volumes.at(volumeId).state;
-
-      // TODO(chhsiao): Validate the volume against the parameters of the
-      // profile once they are checkpointed.
-      if (volumeState.volume_capability() != profileInfo.capability) {
-        return Failure(
-            "Profile '" + profile + "' cannot be applied to volume '" +
-            volumeId + "'");
-      }
-
-      created = volumeId;
-    } else {
-      created = validateCapability(
-          volumeId,
+  // TODO(chhsiao): Consider calling `createVolume` sequentially with other
+  // create or delete operations, and send an `UPDATE_STATE` for storage pools
+  // afterward. See MESOS-9254.
+  Future<string> created = resource.disk().source().has_profile()
+    ? createVolume(
+          operationUuid.toString(),
+          Bytes(resource.scalar().value() * Bytes::MEGABYTES),
+          profileInfo)
+    : validateVolume(
+          resource.disk().source().id(),
           resource.disk().source().has_metadata()
             ? resource.disk().source().metadata()
             : Option<Labels>::none(),
-          profileInfo.capability);
-    }
-  } else {
-    // TODO(chhsiao): Consider calling `CreateVolume` sequentially with other
-    // create or delete operations, and send an `UPDATE_STATE` for storage pools
-    // afterward. See MESOS-9254.
-    created = createVolume(
-        operationUuid.toString(),
-        Bytes(resource.scalar().value() * Bytes::MEGABYTES),
-        profileInfo);
-  }
+          profileInfo)
+        .then([=] { return resource.disk().source().id(); });
 
   return created
     .then(defer(self(), [=](const string& volumeId) {


[mesos] 09/14: Added MESOS-9275 to the 1.7.1 CHANGELOG.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 96e6eb16874eb5d41891aec2c755a4cdac5b30db
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Nov 20 19:28:47 2018 -0800

    Added MESOS-9275 to the 1.7.1 CHANGELOG.
---
 CHANGELOG | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG b/CHANGELOG
index 6e82394..b37f910 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -18,6 +18,7 @@ Release Notes - Mesos - Version 1.7.1 (WIP)
   * [MESOS-9231] - `docker inspect` may return an unexpected result to Docker executor due to a race condition.
   * [MESOS-9267] - Mesos agent crashes when CNI network is not configured but used.
   * [MESOS-9274] - v1 JAVA scheduler library can drop TEARDOWN upon destruction.
+  * [MESOS-9275] - Allow optional `profile` to be specified in `CREATE_DISK` offer operation.
   * [MESOS-9279] - Docker Containerizer 'usage' call might be expensive if mount table is big.
   * [MESOS-9281] - SLRP gets a stale checkpoint after system crash.
   * [MESOS-9283] - Docker containerizer actor can get backlogged with large number of containers.