You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2018/10/30 04:20:46 UTC

[2/2] kudu git commit: rpc-test: fix TestClientConnectionMetrics

rpc-test: fix TestClientConnectionMetrics

Every now and then, this test would fail with:

  rpc-test.cc:542: Failure
  Expected: (dump_resp.outbound_connections(0).outbound_queue_size()) > (0), actual: 0 vs 0

Unfortunately, the test would go on to crash (and trigger a TSAN warning)
due to the lack of proper cleanup in the event of an ASSERT failure. I've
fixed that in this patch.

I also tried to address the root of the test flakiness (that the outbound
transfer queue contains at least one element), but I couldn't find a good
way to do it. Blocking the server reactor thread has no effect on
client-side queuing. And we can't block the client reactor thread outright
because DumpRunningRpcs runs on it. Some of this is touched on in the
original code review[1] that committed the test.

Having given up, I wrapped the whole thing in an ASSERT_EVENTUALLY. It's
ham-fisted for sure, but it seems to work: without it, the test fails every
100-200 runs on my laptop, and with it I can't get it to fail at all. I also
looped it 1000 times in TSAN mode with 8 stress threads and didn't see any
failures. I don't understand the krpc subsystem very well, so if there's a
better way, I'm all ears.

1. https://gerrit.cloudera.org/c/9343/

Change-Id: I9c565b80bdca435d18787c7df0ec992728363980
Reviewed-on: http://gerrit.cloudera.org:8080/11819
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3a77ba13
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3a77ba13
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3a77ba13

Branch: refs/heads/master
Commit: 3a77ba131b68d0ef5affc043c29c81f4d07a2659
Parents: d0205b9
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon Oct 29 13:05:21 2018 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue Oct 30 04:20:21 2018 +0000

----------------------------------------------------------------------
 src/kudu/rpc/rpc-test.cc | 94 ++++++++++++++++++++++---------------------
 1 file changed, 48 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3a77ba13/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 077b5a3..1cffdfd 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/rpc/rpc-test-base.h"
-
 #include <cerrno>
 #include <cstdint>
 #include <cstdlib>
@@ -26,13 +24,11 @@
 #include <ostream>
 #include <set>
 #include <string>
-#include <unistd.h>
 #include <unordered_map>
 #include <vector>
 
 #include <boost/bind.hpp>
 #include <boost/core/ref.hpp>
-#include <boost/function.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -49,6 +45,7 @@
 #include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/proxy.h"
 #include "kudu/rpc/reactor.h"
+#include "kudu/rpc/rpc-test-base.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
@@ -495,59 +492,64 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
 
 // Test that the metrics on a per connection level work accurately.
 TEST_P(TestRpc, TestClientConnectionMetrics) {
-  // Only run one reactor per messenger, so we can grab the metrics from that
-  // one without having to check all.
-  n_server_reactor_threads_ = 1;
-  keepalive_time_ms_ = -1;
-
   // Set up server.
   Sockaddr server_addr;
   bool enable_ssl = GetParam();
   ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
 
-  // Set up client.
+  // Set up client with one reactor so that we can grab the metrics from just
+  // that reactor.
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger;
   ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
   Proxy p(client_messenger, server_addr, server_addr.host(),
           GenericCalculatorService::static_service_name());
 
-  // Cause the reactor thread to be blocked for 2 seconds.
-  server_messenger_->ScheduleOnReactor(boost::bind(sleep, 2), MonoDelta::FromSeconds(0));
-
-  RpcController controller;
-  DumpRunningRpcsRequestPB dump_req;
-  DumpRunningRpcsResponsePB dump_resp;
-  dump_req.set_include_traces(false);
-
-  // We'll send several calls asynchronously to force RPC queueing on the sender side.
-  int n_calls = 1000;
-  AddRequestPB add_req;
-  add_req.set_x(rand());
-  add_req.set_y(rand());
-  AddResponsePB add_resp;
-
-  vector<unique_ptr<RpcController>> controllers;
-  CountDownLatch latch(n_calls);
-  for (int i = 0; i < n_calls; i++) {
-    controllers.emplace_back(new RpcController());
-    p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, &add_resp,
-        controllers.back().get(), boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
-  }
-
-  // Since we blocked the only reactor thread for sometime, we should see RPCs queued on the
-  // OutboundTransfer queue, unless the main thread is very slow.
-  ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp));
-  ASSERT_EQ(1, dump_resp.outbound_connections_size());
-  ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0);
-
-  // Wait for the calls to be marked finished.
-  latch.Wait();
+  // Here we queue a bunch of calls to the server and test that the sender's
+  // OutboundTransfer queue is indeed populated with those calls. Unfortunately,
+  // we have no surefire way of controlling the queue directly; a fast client
+  // reactor thread or a slow main thread could cause all of the outbound calls
+  // to be sent before we test the queue size, even though the server can't yet process them.
+  //
+  // So we repeat the entire exercise until we get a non-zero queue size.
+  ASSERT_EVENTUALLY([&]{
+    // We'll send several calls asynchronously to force RPC queueing on the sender side.
+    constexpr int n_calls = 1000;
+    AddRequestPB add_req;
+    add_req.set_x(rand());
+    add_req.set_y(rand());
+    AddResponsePB add_resp;
+
+    // Send the calls.
+    vector<unique_ptr<RpcController>> controllers;
+    CountDownLatch latch(n_calls);
+    for (int i = 0; i < n_calls; i++) {
+      controllers.emplace_back(new RpcController());
+      p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, &add_resp,
+                     controllers.back().get(), boost::bind(
+                         &CountDownLatch::CountDown, boost::ref(latch)));
+    }
+    auto cleanup = MakeScopedCleanup([&](){
+      latch.Wait();
+    });
+
+    // Test the OutboundTransfer queue.
+    DumpRunningRpcsRequestPB dump_req;
+    DumpRunningRpcsResponsePB dump_resp;
+    dump_req.set_include_traces(false);
+    ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp));
+    ASSERT_EQ(1, dump_resp.outbound_connections_size());
+    ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0);
+
+    // Unblock all of the calls and wait for them to finish.
+    latch.Wait();
+    cleanup.cancel();
 
-  // Verify that all the RPCs have finished.
-  for (const auto& controller : controllers) {
-    ASSERT_TRUE(controller->finished());
-  }
+    // Verify that all the RPCs have finished.
+    for (const auto& controller : controllers) {
+      ASSERT_TRUE(controller->finished());
+    }
+  });
 }
 
 // Test that outbound connections to the same server are reopen upon every RPC
@@ -1134,7 +1136,7 @@ TEST_P(TestRpc, TestApplicationFeatureFlag) {
 
 TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) {
   auto savedFlags = kSupportedServerRpcFeatureFlags;
-  auto cleanup = MakeScopedCleanup([&] () { kSupportedServerRpcFeatureFlags = savedFlags; });
+  SCOPED_CLEANUP({ kSupportedServerRpcFeatureFlags = savedFlags; });
   kSupportedServerRpcFeatureFlags = {};
 
   // Set up server.