You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:14:39 UTC

[mesos] 07/08: Added a unit test for RPC retry in SLRP.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4839917fc219398a2929fa56ca094b32b2f3a75d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 22 23:03:02 2019 -0800

    Added a unit test for RPC retry in SLRP.
    
    This patch adds a unit test to verify that SLRP will retry
    `CreateVolume` and `DeleteVolume` CSI calls with a random exponential
    backoff upon receiving `DEADLINE_EXCEEDED` or `UNAVAILABLE`.
    
    Review: https://reviews.apache.org/r/69815
---
 .../storage_local_resource_provider_tests.cpp      | 407 ++++++++++++++++++++-
 1 file changed, 400 insertions(+), 7 deletions(-)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index d232abf..fb001aa 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -14,11 +14,15 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <algorithm>
+
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/http.hpp>
+#include <process/grpc.hpp>
 #include <process/gtest.hpp>
 #include <process/gmock.hpp>
+#include <process/queue.hpp>
 #include <process/reap.hpp>
 
 #include <stout/hashmap.hpp>
@@ -27,6 +31,7 @@
 #include <stout/os/realpath.hpp>
 
 #include "csi/paths.hpp"
+#include "csi/rpc.hpp"
 #include "csi/state.hpp"
 
 #include "linux/fs.hpp"
@@ -35,6 +40,8 @@
 
 #include "module/manager.hpp"
 
+#include "resource_provider/storage/provider_process.hpp"
+
 #include "slave/container_daemon_process.hpp"
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
@@ -46,6 +53,7 @@
 #include "tests/disk_profile_server.hpp"
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
+#include "tests/mock_csi_plugin.hpp"
 
 namespace http = process::http;
 
@@ -64,8 +72,11 @@ using process::Future;
 using process::Owned;
 using process::Promise;
 using process::post;
+using process::Queue;
 using process::reap;
 
+using process::grpc::StatusError;
+
 using testing::AtMost;
 using testing::Between;
 using testing::DoAll;
@@ -197,7 +208,8 @@ public:
   void setupResourceProviderConfig(
       const Bytes& capacity,
       const Option<string> volumes = None(),
-      const Option<string> createParameters = None())
+      const Option<string> createParameters = None(),
+      const Option<string> forward = None())
   {
     const string testCsiPluginPath =
       path::join(tests::flags.build_dir, "src", "test-csi-plugin");
@@ -231,10 +243,11 @@ public:
                     "value": "%s",
                     "arguments": [
                       "%s",
+                      "--work_dir=%s",
                       "--available_capacity=%s",
-                      "--create_parameters=%s",
-                      "--volumes=%s",
-                      "--work_dir=%s"
+                      "%s",
+                      "%s",
+                      "%s"
                     ]
                   },
                   "resources": [
@@ -265,10 +278,12 @@ public:
         TEST_CSI_PLUGIN_NAME,
         testCsiPluginPath,
         testCsiPluginPath,
+        testCsiPluginWorkDir,
         stringify(capacity),
-        createParameters.getOrElse(""),
-        volumes.getOrElse(""),
-        testCsiPluginWorkDir);
+        createParameters.isSome()
+          ? "--create_parameters=" + createParameters.get() : "",
+        volumes.isSome() ? "--volumes=" + volumes.get() : "",
+        forward.isSome() ? "--forward=" + forward.get() : "");
 
     ASSERT_SOME(resourceProviderConfig);
 
@@ -4675,6 +4690,384 @@ TEST_F(
   ASSERT_TRUE(operationStatus.has_uuid());
 }
 
+
+// This test verifies that the storage local resource provider will retry
+// `CreateVolume` and `DeleteVolume` CSI calls with a random exponential backoff
+// upon receiving `DEADLINE_EXCEEDED` or `UNAVAILABLE`.
+//
+// To accomplish this:
+//   1. Creates a MOUNT disk from a RAW disk resource.
+//   2. Returns `DEADLINE_EXCEEDED` or `UNAVAILABLE` for the first n
+//      `CreateVolume` calls, where `n = numRetryableErrors` defined below. The
+//      clock is advanced exponentially to trigger retries.
+//   3. Returns `OK` for the next `CreateVolume` call.
+//   4. Destroys the MOUNT disk.
+//   5. Returns `DEADLINE_EXCEEDED` or `UNAVAILABLE` for the first n
+//      `DeleteVolume` calls, where `n = numRetryableErrors` defined below. The
+//      clock is advanced exponentially to trigger retries.
+//   6. Returns `UNIMPLEMENTED` for the next `DeleteVolume` call to verify that
+//      there is no retry on a non-retryable error.
+TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
+{
+  Clock::pause();
+
+  // The number of retryable errors to return for each RPC, should be >= 1.
+  const size_t numRetryableErrors = 10;
+
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+  loadUriDiskProfileAdaptorModule(profilesPath);
+
+  const string mockCsiEndpoint =
+    "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+  MockCSIPlugin plugin;
+  ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+  EXPECT_CALL(plugin, GetCapacity(_, _, _))
+    .WillRepeatedly(Invoke([](
+        grpc::ServerContext* context,
+        const csi::v0::GetCapacityRequest* request,
+        csi::v0::GetCapacityResponse* response) {
+      response->set_available_capacity(Gigabytes(4).bytes());
+
+      return grpc::Status::OK;
+    }));
+
+  Queue<csi::v0::CreateVolumeRequest> createVolumeRequests;
+  Queue<Try<csi::v0::CreateVolumeResponse, StatusError>> createVolumeResults;
+  EXPECT_CALL(plugin, CreateVolume(_, _, _))
+    .WillRepeatedly(Invoke([&](
+        grpc::ServerContext* context,
+        const csi::v0::CreateVolumeRequest* request,
+        csi::v0::CreateVolumeResponse* response) -> grpc::Status {
+      Future<Try<csi::v0::CreateVolumeResponse, StatusError>> result =
+        createVolumeResults.get();
+
+      EXPECT_TRUE(result.isPending());
+      createVolumeRequests.put(*request);
+
+      // This extra closure is necessary in order to use `AWAIT_ASSERT_*`, as
+      // these macros require a void return type.
+      [&] { AWAIT_ASSERT_READY(result); }();
+
+      if (result->isError()) {
+        return result->error().status;
+      }
+
+      *response = result->get();
+      return grpc::Status::OK;
+    }));
+
+  Queue<csi::v0::DeleteVolumeRequest> deleteVolumeRequests;
+  Queue<Try<csi::v0::DeleteVolumeResponse, StatusError>> deleteVolumeResults;
+  EXPECT_CALL(plugin, DeleteVolume(_, _, _))
+    .WillRepeatedly(Invoke([&](
+        grpc::ServerContext* context,
+        const csi::v0::DeleteVolumeRequest* request,
+        csi::v0::DeleteVolumeResponse* response) -> grpc::Status {
+      Future<Try<csi::v0::DeleteVolumeResponse, StatusError>> result =
+        deleteVolumeResults.get();
+
+      EXPECT_TRUE(result.isPending());
+      deleteVolumeRequests.put(*request);
+
+      // This extra closure is necessary in order to use `AWAIT_ASSERT_*`, as
+      // these macros require a void return type.
+      [&] { AWAIT_ASSERT_READY(result); }();
+
+      if (result->isError()) {
+        return result->error().status;
+      }
+
+      *response = result->get();
+      return grpc::Status::OK;
+    }));
+
+  setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint);
+
+  master::Flags masterFlags = CreateMasterFlags();
+
+  // Set the ping timeout to be sufficiently large to avoid agent disconnection.
+  masterFlags.max_agent_ping_timeouts = 1000;
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(flags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+  Clock::pause();
+
+  // Register a framework to receive offers.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // Decline offers without RAW disk resources. The master can send such offers
+  // before the resource provider receives profile updates.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers());
+
+  Future<vector<Offer>> offers;
+
+  auto isStoragePool = [](const Resource& r, const string& profile) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW &&
+      r.disk().source().has_vendor() &&
+      r.disk().source().vendor() == TEST_CSI_VENDOR &&
+      !r.disk().source().has_id() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == profile;
+  };
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isStoragePool, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  Resource raw = *Resources(offers->at(0).resources())
+    .filter(std::bind(isStoragePool, lambda::_1, "test"))
+    .begin();
+
+  // Create a MOUNT disk.
+  Future<UpdateOperationStatusMessage> updateOperationStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  driver.acceptOffers(
+      {offers->at(0).id()},
+      {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
+
+  AWAIT_READY(createVolumeRequests.get())
+    << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #1";
+
+  // Settle the clock to verify that there is no more outstanding request.
+  Clock::settle();
+  ASSERT_EQ(0u, createVolumeRequests.size());
+
+  Future<Nothing> createVolumeCall = FUTURE_DISPATCH(
+      _, &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>);
+
+  // Return `DEADLINE_EXCEEDED` for the first `CreateVolume` call.
+  createVolumeResults.put(
+      StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, "")));
+
+  AWAIT_READY(createVolumeCall);
+
+  Duration createVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+  // Settle the clock to ensure that the retry timer has been set, then advance
+  // the clock by the maximum backoff to trigger a retry.
+  Clock::settle();
+  Clock::advance(createVolumeBackoff);
+
+  // Return `UNAVAILABLE` for subsequent `CreateVolume` calls.
+  for (size_t i = 1; i < numRetryableErrors; i++) {
+    AWAIT_READY(createVolumeRequests.get())
+      << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #"
+      << (i + 1);
+
+    // Settle the clock to verify that there is no more outstanding request.
+    Clock::settle();
+    ASSERT_EQ(0u, createVolumeRequests.size());
+
+    createVolumeCall = FUTURE_DISPATCH(
+        _,
+        &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>);
+
+    createVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
+
+    AWAIT_READY(createVolumeCall);
+
+    createVolumeBackoff =
+      std::min(createVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+    // Settle the clock to ensure that the retry timer has been set, then
+    // advance the clock by the maximum backoff to trigger a retry.
+    Clock::settle();
+    Clock::advance(createVolumeBackoff);
+  }
+
+  AWAIT_READY(createVolumeRequests.get())
+    << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #"
+    << (numRetryableErrors + 1);
+
+  // Settle the clock to verify that there is no more outstanding request.
+  Clock::settle();
+  ASSERT_EQ(0u, createVolumeRequests.size());
+
+  auto isMountDisk = [](const Resource& r, const string& profile) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::MOUNT &&
+      r.disk().source().has_vendor() &&
+      r.disk().source().vendor() == TEST_CSI_VENDOR &&
+      r.disk().source().has_id() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == profile;
+  };
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isMountDisk, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // Return a successful response for the last `CreateVolume` call.
+  csi::v0::CreateVolumeResponse createVolumeResponse;
+  createVolumeResponse.mutable_volume()->set_id(id::UUID::random().toString());
+  createVolumeResponse.mutable_volume()->set_capacity_bytes(
+      Megabytes(raw.scalar().value()).bytes());
+
+  createVolumeResults.put(std::move(createVolumeResponse));
+
+  AWAIT_READY(updateOperationStatus);
+  EXPECT_EQ(OPERATION_FINISHED, updateOperationStatus->status().state());
+
+  // Advance the clock to trigger a batch allocation.
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  Resource created = *Resources(offers->at(0).resources())
+    .filter(std::bind(isMountDisk, lambda::_1, "test"))
+    .begin();
+
+  // Destroy the MOUNT disk.
+  updateOperationStatus = FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  driver.acceptOffers({offers->at(0).id()}, {DESTROY_DISK(created)});
+
+  AWAIT_READY(deleteVolumeRequests.get())
+    << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #1";
+
+  // Settle the clock to verify that there is no more outstanding request.
+  Clock::settle();
+  ASSERT_EQ(0u, deleteVolumeRequests.size());
+
+  Future<Nothing> deleteVolumeCall = FUTURE_DISPATCH(
+      _, &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>);
+
+  // Return `DEADLINE_EXCEEDED` for the first `DeleteVolume` call.
+  deleteVolumeResults.put(
+      StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, "")));
+
+  AWAIT_READY(deleteVolumeCall);
+
+  Duration deleteVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+  // Settle the clock to ensure that the retry timer has been set, then advance
+  // the clock by the maximum backoff to trigger a retry.
+  Clock::settle();
+  Clock::advance(deleteVolumeBackoff);
+
+  // Return `UNAVAILABLE` for subsequent `DeleteVolume` calls.
+  for (size_t i = 1; i < numRetryableErrors; i++) {
+    AWAIT_READY(deleteVolumeRequests.get())
+      << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #"
+      << (i + 1);
+
+    // Settle the clock to verify that there is no more outstanding request.
+    Clock::settle();
+    ASSERT_EQ(0u, deleteVolumeRequests.size());
+
+    deleteVolumeCall = FUTURE_DISPATCH(
+        _,
+        &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>);
+
+    deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
+
+    AWAIT_READY(deleteVolumeCall);
+
+    deleteVolumeBackoff =
+      std::min(deleteVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+    // Settle the clock to ensure that the retry timer has been set, then
+    // advance the clock by the maximum backoff to trigger a retry.
+    Clock::settle();
+    Clock::advance(deleteVolumeBackoff);
+  }
+
+  AWAIT_READY(deleteVolumeRequests.get())
+    << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #"
+    << (numRetryableErrors + 1);
+
+  // Settle the clock to verify that there is no more outstanding request.
+  Clock::settle();
+  ASSERT_EQ(0u, deleteVolumeRequests.size());
+
+  // Return a non-retryable error for the last `DeleteVolume` call.
+  deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNIMPLEMENTED, "")));
+
+  AWAIT_READY(updateOperationStatus);
+  EXPECT_EQ(OPERATION_FAILED, updateOperationStatus->status().state());
+
+  // Verify that the RPC metrics count the successes and errors correctly.
+  //
+  // TODO(chhsiao): verify the retry metrics instead once they are in place.
+  JSON::Object snapshot = Metrics();
+
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+  EXPECT_EQ(1, snapshot.values.at(metricName(
+      "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/errors")));
+  EXPECT_EQ(numRetryableErrors, snapshot.values.at(metricName(
+      "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/errors")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/successes")));
+  EXPECT_EQ(0, snapshot.values.at(metricName(
+      "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/successes")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+  EXPECT_EQ(numRetryableErrors + 1, snapshot.values.at(metricName(
+      "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {