You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2018/10/30 04:20:46 UTC
[2/2] kudu git commit: rpc-test: fix TestClientConnectionMetrics
rpc-test: fix TestClientConnectionMetrics
Every now and then, this test would fail with:
rpc-test.cc:542: Failure
Expected: (dump_resp.outbound_connections(0).outbound_queue_size()) > (0), actual: 0 vs 0
Unfortunately, the test would go on to crash (and trigger a TSAN warning)
due to the lack of proper cleanup in the event of an ASSERT failure. I've
fixed that in this patch.
I also tried to address the root of the test flakiness (that the outbound
transfer queue contains at least one element), but I couldn't find a good
way to do it. Blocking the server reactor thread has no effect on
client-side queuing. And we can't block the client reactor thread outright
because DumpRunningRpcs runs on it. Some of this is touched on in the
original code review[1] that committed the test.
Having given up, I wrapped the whole thing in an ASSERT_EVENTUALLY. It's
ham-fisted for sure, but it seems to work: without it, the test fails every
100-200 runs on my laptop, and with it I can't get it to fail at all. I also
looped it 1000 times in TSAN mode with 8 stress threads and didn't see any
failures. I don't understand the krpc subsystem very well, so if there's a
better way, I'm all ears.
1. https://gerrit.cloudera.org/c/9343/
Change-Id: I9c565b80bdca435d18787c7df0ec992728363980
Reviewed-on: http://gerrit.cloudera.org:8080/11819
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3a77ba13
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3a77ba13
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3a77ba13
Branch: refs/heads/master
Commit: 3a77ba131b68d0ef5affc043c29c81f4d07a2659
Parents: d0205b9
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon Oct 29 13:05:21 2018 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue Oct 30 04:20:21 2018 +0000
----------------------------------------------------------------------
src/kudu/rpc/rpc-test.cc | 94 ++++++++++++++++++++++---------------------
1 file changed, 48 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/3a77ba13/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 077b5a3..1cffdfd 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#include "kudu/rpc/rpc-test-base.h"
-
#include <cerrno>
#include <cstdint>
#include <cstdlib>
@@ -26,13 +24,11 @@
#include <ostream>
#include <set>
#include <string>
-#include <unistd.h>
#include <unordered_map>
#include <vector>
#include <boost/bind.hpp>
#include <boost/core/ref.hpp>
-#include <boost/function.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
@@ -49,6 +45,7 @@
#include "kudu/rpc/outbound_call.h"
#include "kudu/rpc/proxy.h"
#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/rpc-test-base.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
@@ -495,59 +492,64 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
// Test that the metrics on a per connection level work accurately.
TEST_P(TestRpc, TestClientConnectionMetrics) {
- // Only run one reactor per messenger, so we can grab the metrics from that
- // one without having to check all.
- n_server_reactor_threads_ = 1;
- keepalive_time_ms_ = -1;
-
// Set up server.
Sockaddr server_addr;
bool enable_ssl = GetParam();
ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
- // Set up client.
+ // Set up client with one reactor so that we can grab the metrics from just
+ // that reactor.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
Proxy p(client_messenger, server_addr, server_addr.host(),
GenericCalculatorService::static_service_name());
- // Cause the reactor thread to be blocked for 2 seconds.
- server_messenger_->ScheduleOnReactor(boost::bind(sleep, 2), MonoDelta::FromSeconds(0));
-
- RpcController controller;
- DumpRunningRpcsRequestPB dump_req;
- DumpRunningRpcsResponsePB dump_resp;
- dump_req.set_include_traces(false);
-
- // We'll send several calls asynchronously to force RPC queueing on the sender side.
- int n_calls = 1000;
- AddRequestPB add_req;
- add_req.set_x(rand());
- add_req.set_y(rand());
- AddResponsePB add_resp;
-
- vector<unique_ptr<RpcController>> controllers;
- CountDownLatch latch(n_calls);
- for (int i = 0; i < n_calls; i++) {
- controllers.emplace_back(new RpcController());
- p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, &add_resp,
- controllers.back().get(), boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
- }
-
- // Since we blocked the only reactor thread for sometime, we should see RPCs queued on the
- // OutboundTransfer queue, unless the main thread is very slow.
- ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp));
- ASSERT_EQ(1, dump_resp.outbound_connections_size());
- ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0);
-
- // Wait for the calls to be marked finished.
- latch.Wait();
+ // Here we queue a bunch of calls to the server and test that the sender's
+ // OutboundTransfer queue is indeed populated with those calls. Unfortunately,
+ // we have no surefire way of controlling the queue directly; a fast client
+ // reactor thread or a slow main thread could cause all of the outbound calls
+ // to be sent before we test the queue size, even though the server can't yet process them.
+ //
+ // So we repeat the entire exercise until we get a non-zero queue size.
+ ASSERT_EVENTUALLY([&]{
+ // We'll send several calls asynchronously to force RPC queueing on the sender side.
+ constexpr int n_calls = 1000;
+ AddRequestPB add_req;
+ add_req.set_x(rand());
+ add_req.set_y(rand());
+ AddResponsePB add_resp;
+
+ // Send the calls.
+ vector<unique_ptr<RpcController>> controllers;
+ CountDownLatch latch(n_calls);
+ for (int i = 0; i < n_calls; i++) {
+ controllers.emplace_back(new RpcController());
+ p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, &add_resp,
+ controllers.back().get(), boost::bind(
+ &CountDownLatch::CountDown, boost::ref(latch)));
+ }
+ auto cleanup = MakeScopedCleanup([&](){
+ latch.Wait();
+ });
+
+ // Test the OutboundTransfer queue.
+ DumpRunningRpcsRequestPB dump_req;
+ DumpRunningRpcsResponsePB dump_resp;
+ dump_req.set_include_traces(false);
+ ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp));
+ ASSERT_EQ(1, dump_resp.outbound_connections_size());
+ ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0);
+
+ // Unblock all of the calls and wait for them to finish.
+ latch.Wait();
+ cleanup.cancel();
- // Verify that all the RPCs have finished.
- for (const auto& controller : controllers) {
- ASSERT_TRUE(controller->finished());
- }
+ // Verify that all the RPCs have finished.
+ for (const auto& controller : controllers) {
+ ASSERT_TRUE(controller->finished());
+ }
+ });
}
// Test that outbound connections to the same server are reopen upon every RPC
@@ -1134,7 +1136,7 @@ TEST_P(TestRpc, TestApplicationFeatureFlag) {
TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) {
auto savedFlags = kSupportedServerRpcFeatureFlags;
- auto cleanup = MakeScopedCleanup([&] () { kSupportedServerRpcFeatureFlags = savedFlags; });
+ SCOPED_CLEANUP({ kSupportedServerRpcFeatureFlags = savedFlags; });
kSupportedServerRpcFeatureFlags = {};
// Set up server.