You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:14:39 UTC
[mesos] 07/08: Added a unit test for RPC retry in SLRP.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 4839917fc219398a2929fa56ca094b32b2f3a75d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 22 23:03:02 2019 -0800
Added a unit test for RPC retry in SLRP.
This patch adds a unit test to verify that SLRP will retry
`CreateVolume` and `DeleteVolume` CSI calls with a random exponential
backoff upon receiving `DEADLINE_EXCEEDED` or `UNAVAILABLE`.
Review: https://reviews.apache.org/r/69815
---
.../storage_local_resource_provider_tests.cpp | 407 ++++++++++++++++++++-
1 file changed, 400 insertions(+), 7 deletions(-)
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index d232abf..fb001aa 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -14,11 +14,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include <algorithm>
+
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
+#include <process/grpc.hpp>
#include <process/gtest.hpp>
#include <process/gmock.hpp>
+#include <process/queue.hpp>
#include <process/reap.hpp>
#include <stout/hashmap.hpp>
@@ -27,6 +31,7 @@
#include <stout/os/realpath.hpp>
#include "csi/paths.hpp"
+#include "csi/rpc.hpp"
#include "csi/state.hpp"
#include "linux/fs.hpp"
@@ -35,6 +40,8 @@
#include "module/manager.hpp"
+#include "resource_provider/storage/provider_process.hpp"
+
#include "slave/container_daemon_process.hpp"
#include "slave/paths.hpp"
#include "slave/state.hpp"
@@ -46,6 +53,7 @@
#include "tests/disk_profile_server.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
+#include "tests/mock_csi_plugin.hpp"
namespace http = process::http;
@@ -64,8 +72,11 @@ using process::Future;
using process::Owned;
using process::Promise;
using process::post;
+using process::Queue;
using process::reap;
+using process::grpc::StatusError;
+
using testing::AtMost;
using testing::Between;
using testing::DoAll;
@@ -197,7 +208,8 @@ public:
void setupResourceProviderConfig(
const Bytes& capacity,
const Option<string> volumes = None(),
- const Option<string> createParameters = None())
+ const Option<string> createParameters = None(),
+ const Option<string> forward = None())
{
const string testCsiPluginPath =
path::join(tests::flags.build_dir, "src", "test-csi-plugin");
@@ -231,10 +243,11 @@ public:
"value": "%s",
"arguments": [
"%s",
+ "--work_dir=%s",
"--available_capacity=%s",
- "--create_parameters=%s",
- "--volumes=%s",
- "--work_dir=%s"
+ "%s",
+ "%s",
+ "%s"
]
},
"resources": [
@@ -265,10 +278,12 @@ public:
TEST_CSI_PLUGIN_NAME,
testCsiPluginPath,
testCsiPluginPath,
+ testCsiPluginWorkDir,
stringify(capacity),
- createParameters.getOrElse(""),
- volumes.getOrElse(""),
- testCsiPluginWorkDir);
+ createParameters.isSome()
+ ? "--create_parameters=" + createParameters.get() : "",
+ volumes.isSome() ? "--volumes=" + volumes.get() : "",
+ forward.isSome() ? "--forward=" + forward.get() : "");
ASSERT_SOME(resourceProviderConfig);
@@ -4675,6 +4690,384 @@ TEST_F(
ASSERT_TRUE(operationStatus.has_uuid());
}
+
+// This test verifies that the storage local resource provider will retry
+// `CreateVolume` and `DeleteVolume` CSI calls with a random exponential backoff
+// upon receiving `DEADLINE_EXCEEDED` or `UNAVAILABLE`.
+//
+// To accomplish this:
+// 1. Creates a MOUNT disk from a RAW disk resource.
+// 2. Returns `DEADLINE_EXCEEDED` or `UNAVAILABLE` for the first n
+// `CreateVolume` calls, where `n = numRetryableErrors` defined below. The
+// clock is advanced exponentially to trigger retries.
+// 3. Returns `OK` for the next `CreateVolume` call.
+// 4. Destroys the MOUNT disk.
+// 5. Returns `DEADLINE_EXCEEDED` or `UNAVAILABLE` for the first n
+// `DeleteVolume` calls, where `n = numRetryableErrors` defined below. The
+// clock is advanced exponentially to trigger retries.
+// 6. Returns `UNIMPLEMENTED` for the next `DeleteVolume` call to verify that
+// there is no retry on a non-retryable error.
+TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
+{
+ Clock::pause();
+
+ // The number of retryable errors to return for each RPC, should be >= 1.
+ const size_t numRetryableErrors = 10;
+
+ const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+ ASSERT_SOME(
+ os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+ loadUriDiskProfileAdaptorModule(profilesPath);
+
+ const string mockCsiEndpoint =
+ "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+ MockCSIPlugin plugin;
+ ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+ EXPECT_CALL(plugin, GetCapacity(_, _, _))
+ .WillRepeatedly(Invoke([](
+ grpc::ServerContext* context,
+ const csi::v0::GetCapacityRequest* request,
+ csi::v0::GetCapacityResponse* response) {
+ response->set_available_capacity(Gigabytes(4).bytes());
+
+ return grpc::Status::OK;
+ }));
+
+ Queue<csi::v0::CreateVolumeRequest> createVolumeRequests;
+ Queue<Try<csi::v0::CreateVolumeResponse, StatusError>> createVolumeResults;
+ EXPECT_CALL(plugin, CreateVolume(_, _, _))
+ .WillRepeatedly(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v0::CreateVolumeRequest* request,
+ csi::v0::CreateVolumeResponse* response) -> grpc::Status {
+ Future<Try<csi::v0::CreateVolumeResponse, StatusError>> result =
+ createVolumeResults.get();
+
+ EXPECT_TRUE(result.isPending());
+ createVolumeRequests.put(*request);
+
+ // This extra closure is necessary in order to use `AWAIT_ASSERT_*`, as
+ // these macros require a void return type.
+ [&] { AWAIT_ASSERT_READY(result); }();
+
+ if (result->isError()) {
+ return result->error().status;
+ }
+
+ *response = result->get();
+ return grpc::Status::OK;
+ }));
+
+ Queue<csi::v0::DeleteVolumeRequest> deleteVolumeRequests;
+ Queue<Try<csi::v0::DeleteVolumeResponse, StatusError>> deleteVolumeResults;
+ EXPECT_CALL(plugin, DeleteVolume(_, _, _))
+ .WillRepeatedly(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v0::DeleteVolumeRequest* request,
+ csi::v0::DeleteVolumeResponse* response) -> grpc::Status {
+ Future<Try<csi::v0::DeleteVolumeResponse, StatusError>> result =
+ deleteVolumeResults.get();
+
+ EXPECT_TRUE(result.isPending());
+ deleteVolumeRequests.put(*request);
+
+ // This extra closure is necessary in order to use `AWAIT_ASSERT_*`, as
+ // these macros require a void return type.
+ [&] { AWAIT_ASSERT_READY(result); }();
+
+ if (result->isError()) {
+ return result->error().status;
+ }
+
+ *response = result->get();
+ return grpc::Status::OK;
+ }));
+
+ setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint);
+
+ master::Flags masterFlags = CreateMasterFlags();
+
+ // Set the ping timeout to be sufficiently large to avoid agent disconnection.
+ masterFlags.max_agent_ping_timeouts = 1000;
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+ // Since the local resource provider daemon is started after the agent
+ // is registered, it is guaranteed that the slave will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from
+ // the storage local resource provider.
+ //
+ // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+ // Google Mock will search the expectations in reverse order.
+ Future<UpdateSlaveMessage> updateSlave2 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave1 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(flags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlave1);
+
+ // NOTE: We need to resume the clock so that the resource provider can
+ // periodically check if the CSI endpoint socket has been created by
+ // the plugin container, which runs in another Linux process.
+ Clock::resume();
+
+ AWAIT_READY(updateSlave2);
+ ASSERT_TRUE(updateSlave2->has_resource_providers());
+ ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+ Clock::pause();
+
+ // Register a framework to receive offers.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, "storage");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // Decline offers without RAW disk resources. The master can send such offers
+ // before the resource provider receives profile updates.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers());
+
+ Future<vector<Offer>> offers;
+
+ auto isStoragePool = [](const Resource& r, const string& profile) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().type() == Resource::DiskInfo::Source::RAW &&
+ r.disk().source().has_vendor() &&
+ r.disk().source().vendor() == TEST_CSI_VENDOR &&
+ !r.disk().source().has_id() &&
+ r.disk().source().has_profile() &&
+ r.disk().source().profile() == profile;
+ };
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(isStoragePool, lambda::_1, "test"))))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ Resource raw = *Resources(offers->at(0).resources())
+ .filter(std::bind(isStoragePool, lambda::_1, "test"))
+ .begin();
+
+ // Create a MOUNT disk.
+ Future<UpdateOperationStatusMessage> updateOperationStatus =
+ FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+ driver.acceptOffers(
+ {offers->at(0).id()},
+ {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
+
+ AWAIT_READY(createVolumeRequests.get())
+ << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #1";
+
+ // Settle the clock to verify that there is no more outstanding request.
+ Clock::settle();
+ ASSERT_EQ(0u, createVolumeRequests.size());
+
+ Future<Nothing> createVolumeCall = FUTURE_DISPATCH(
+ _, &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>);
+
+ // Return `DEADLINE_EXCEEDED` for the first `CreateVolume` call.
+ createVolumeResults.put(
+ StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, "")));
+
+ AWAIT_READY(createVolumeCall);
+
+ Duration createVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+ // Settle the clock to ensure that the retry timer has been set, then advance
+ // the clock by the maximum backoff to trigger a retry.
+ Clock::settle();
+ Clock::advance(createVolumeBackoff);
+
+ // Return `UNAVAILABLE` for subsequent `CreateVolume` calls.
+ for (size_t i = 1; i < numRetryableErrors; i++) {
+ AWAIT_READY(createVolumeRequests.get())
+ << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #"
+ << (i + 1);
+
+ // Settle the clock to verify that there is no more outstanding request.
+ Clock::settle();
+ ASSERT_EQ(0u, createVolumeRequests.size());
+
+ createVolumeCall = FUTURE_DISPATCH(
+ _,
+ &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>);
+
+ createVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
+
+ AWAIT_READY(createVolumeCall);
+
+ createVolumeBackoff =
+ std::min(createVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+ // Settle the clock to ensure that the retry timer has been set, then
+ // advance the clock by the maximum backoff to trigger a retry.
+ Clock::settle();
+ Clock::advance(createVolumeBackoff);
+ }
+
+ AWAIT_READY(createVolumeRequests.get())
+ << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #"
+ << (numRetryableErrors + 1);
+
+ // Settle the clock to verify that there is no more outstanding request.
+ Clock::settle();
+ ASSERT_EQ(0u, createVolumeRequests.size());
+
+ auto isMountDisk = [](const Resource& r, const string& profile) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().type() == Resource::DiskInfo::Source::MOUNT &&
+ r.disk().source().has_vendor() &&
+ r.disk().source().vendor() == TEST_CSI_VENDOR &&
+ r.disk().source().has_id() &&
+ r.disk().source().has_profile() &&
+ r.disk().source().profile() == profile;
+ };
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(isMountDisk, lambda::_1, "test"))))
+ .WillOnce(FutureArg<1>(&offers));
+
+ // Return a successful response for the last `CreateVolume` call.
+ csi::v0::CreateVolumeResponse createVolumeResponse;
+ createVolumeResponse.mutable_volume()->set_id(id::UUID::random().toString());
+ createVolumeResponse.mutable_volume()->set_capacity_bytes(
+ Megabytes(raw.scalar().value()).bytes());
+
+ createVolumeResults.put(std::move(createVolumeResponse));
+
+ AWAIT_READY(updateOperationStatus);
+ EXPECT_EQ(OPERATION_FINISHED, updateOperationStatus->status().state());
+
+ // Advance the clock to trigger a batch allocation.
+ Clock::advance(masterFlags.allocation_interval);
+
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ Resource created = *Resources(offers->at(0).resources())
+ .filter(std::bind(isMountDisk, lambda::_1, "test"))
+ .begin();
+
+ // Destroy the MOUNT disk.
+ updateOperationStatus = FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+ driver.acceptOffers({offers->at(0).id()}, {DESTROY_DISK(created)});
+
+ AWAIT_READY(deleteVolumeRequests.get())
+ << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #1";
+
+ // Settle the clock to verify that there is no more outstanding request.
+ Clock::settle();
+ ASSERT_EQ(0u, deleteVolumeRequests.size());
+
+ Future<Nothing> deleteVolumeCall = FUTURE_DISPATCH(
+ _, &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>);
+
+ // Return `DEADLINE_EXCEEDED` for the first `DeleteVolume` call.
+ deleteVolumeResults.put(
+ StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, "")));
+
+ AWAIT_READY(deleteVolumeCall);
+
+ Duration deleteVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+ // Settle the clock to ensure that the retry timer has been set, then advance
+ // the clock by the maximum backoff to trigger a retry.
+ Clock::settle();
+ Clock::advance(deleteVolumeBackoff);
+
+ // Return `UNAVAILABLE` for subsequent `DeleteVolume` calls.
+ for (size_t i = 1; i < numRetryableErrors; i++) {
+ AWAIT_READY(deleteVolumeRequests.get())
+ << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #"
+ << (i + 1);
+
+ // Settle the clock to verify that there is no more outstanding request.
+ Clock::settle();
+ ASSERT_EQ(0u, deleteVolumeRequests.size());
+
+ deleteVolumeCall = FUTURE_DISPATCH(
+ _,
+ &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>);
+
+ deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
+
+ AWAIT_READY(deleteVolumeCall);
+
+ deleteVolumeBackoff =
+ std::min(deleteVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+ // Settle the clock to ensure that the retry timer has been set, then
+ // advance the clock by the maximum backoff to trigger a retry.
+ Clock::settle();
+ Clock::advance(deleteVolumeBackoff);
+ }
+
+ AWAIT_READY(deleteVolumeRequests.get())
+ << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #"
+ << (numRetryableErrors + 1);
+
+ // Settle the clock to verify that there is no more outstanding request.
+ Clock::settle();
+ ASSERT_EQ(0u, deleteVolumeRequests.size());
+
+ // Return a non-retryable error for the last `DeleteVolume` call.
+ deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNIMPLEMENTED, "")));
+
+ AWAIT_READY(updateOperationStatus);
+ EXPECT_EQ(OPERATION_FAILED, updateOperationStatus->status().state());
+
+ // Verify that the RPC metrics count the successes and errors correctly.
+ //
+ // TODO(chhsiao): verify the retry metrics instead once they are in place.
+ JSON::Object snapshot = Metrics();
+
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/errors")));
+ EXPECT_EQ(numRetryableErrors, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/errors")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/successes")));
+ EXPECT_EQ(0, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+ EXPECT_EQ(numRetryableErrors + 1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {