You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/28 23:31:20 UTC

[3/3] impala git commit: KUDU-2301: (Part-1) Add instrumentation on a per connection level

KUDU-2301: (Part-1) Add instrumentation on a per connection level

This patch returns the OutboundTransfer queue size on a per
connection level and makes them accessible via the
DumpRunningRpcs() call.

A test is added in rpc-test to ensure that this metric works as
expected.

A future patch will add more metrics.

Change-Id: Iae1a5fe0066adf644a9cac41ad6696e1bbf00465
Reviewed-on: http://gerrit.cloudera.org:8080/9343
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9383
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cf5ef7f7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cf5ef7f7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cf5ef7f7

Branch: refs/heads/master
Commit: cf5ef7f70983747103ca2d5052a9a61f7eb4b349
Parents: 6ec44ea
Author: Sailesh Mukil <sa...@apache.org>
Authored: Mon Feb 12 14:30:49 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 23:04:54 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/connection.cc           |  4 +-
 be/src/kudu/rpc/connection.h            |  5 +++
 be/src/kudu/rpc/messenger.h             |  1 +
 be/src/kudu/rpc/rpc-test.cc             | 67 ++++++++++++++++++++++++++++
 be/src/kudu/rpc/rpc_introspection.proto |  1 +
 5 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.cc b/be/src/kudu/rpc/connection.cc
index bc7446e..2cde8c8 100644
--- a/be/src/kudu/rpc/connection.cc
+++ b/be/src/kudu/rpc/connection.cc
@@ -20,6 +20,7 @@
 #include <stdint.h>
 
 #include <algorithm>
+#include <cerrno>
 #include <iostream>
 #include <memory>
 #include <set>
@@ -50,7 +51,6 @@
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
 
-using std::function;
 using std::includes;
 using std::set;
 using std::shared_ptr;
@@ -746,6 +746,8 @@ Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
         c->call->DumpPB(req, resp->add_calls_in_flight());
       }
     }
+
+    resp->set_outbound_queue_size(num_queued_outbound_transfers());
   } else if (direction_ == SERVER) {
     if (negotiation_complete_) {
       // It's racy to dump credentials while negotiating, since the Connection

http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h
index 9a78c14..e40f814 100644
--- a/be/src/kudu/rpc/connection.h
+++ b/be/src/kudu/rpc/connection.h
@@ -18,6 +18,7 @@
 #ifndef KUDU_RPC_CONNECTION_H
 #define KUDU_RPC_CONNECTION_H
 
+#include <cstddef>
 #include <cstdint>
 #include <limits>
 #include <memory>
@@ -220,6 +221,10 @@ class Connection : public RefCountedThreadSafe<Connection> {
     scheduled_for_shutdown_ = true;
   }
 
+  size_t num_queued_outbound_transfers() const {
+    return outbound_transfers_.size();
+  }
+
  private:
   friend struct CallAwaitingResponse;
   friend class QueueTransferTask;

http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
index b0099d3..d1a1d79 100644
--- a/be/src/kudu/rpc/messenger.h
+++ b/be/src/kudu/rpc/messenger.h
@@ -311,6 +311,7 @@ class Messenger {
  private:
   FRIEND_TEST(TestRpc, TestConnectionKeepalive);
   FRIEND_TEST(TestRpc, TestConnectionAlwaysKeepalive);
+  FRIEND_TEST(TestRpc, TestClientConnectionsMetrics);
   FRIEND_TEST(TestRpc, TestCredentialsPolicy);
   FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index f76cc8f..dcbe5a7 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -20,10 +20,15 @@
 #include <limits.h>
 #include <memory>
 #include <string>
+#include <unistd.h>
 #include <unordered_map>
 #include <vector>
 
 #include <boost/bind.hpp>
+#include <boost/core/ref.hpp>
+#include <boost/function.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/map-util.h"
@@ -31,6 +36,12 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/proxy.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/rtest.pb.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/security/test/test_certs.h"
 #include "kudu/util/countdown_latch.h"
@@ -418,6 +429,62 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
   ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections";
 }
 
+// Test that the metrics on a per connection level work accurately.
+TEST_P(TestRpc, TestClientConnectionMetrics) {
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+  keepalive_time_ms_ = -1;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
+
+  // Cause the reactor thread to be blocked for 2 seconds.
+  server_messenger_->ScheduleOnReactor(boost::bind(sleep, 2), MonoDelta::FromSeconds(0));
+
+  RpcController controller;
+  DumpRunningRpcsRequestPB dump_req;
+  DumpRunningRpcsResponsePB dump_resp;
+  dump_req.set_include_traces(false);
+
+  // We'll send several calls asynchronously to force RPC queueing on the sender side.
+  int n_calls = 1000;
+  AddRequestPB add_req;
+  add_req.set_x(rand());
+  add_req.set_y(rand());
+  AddResponsePB add_resp;
+
+  vector<unique_ptr<RpcController>> controllers;
+  CountDownLatch latch(n_calls);
+  for (int i = 0; i < n_calls; i++) {
+    controllers.emplace_back(new RpcController());
+    p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, &add_resp,
+        controllers.back().get(), boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
+  }
+
+  // Since we blocked the only reactor thread for sometime, we should see RPCs queued on the
+  // OutboundTransfer queue, unless the main thread is very slow.
+  ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp));
+  ASSERT_EQ(1, dump_resp.outbound_connections_size());
+  ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0);
+
+  // Wait for the calls to be marked finished.
+  latch.Wait();
+
+  // Verify that all the RPCs have finished.
+  for (const auto& controller : controllers) {
+    ASSERT_TRUE(controller->finished());
+  }
+}
+
 // Test that outbound connections to the same server are reopen upon every RPC
 // call when the 'rpc_reopen_outbound_connections' flag is set.
 TEST_P(TestRpc, TestReopenOutboundConnections) {

http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_introspection.proto b/be/src/kudu/rpc/rpc_introspection.proto
index 5c4f4f1..7685903 100644
--- a/be/src/kudu/rpc/rpc_introspection.proto
+++ b/be/src/kudu/rpc/rpc_introspection.proto
@@ -62,6 +62,7 @@ message RpcConnectionPB {
   // TODO: swap out for separate fields
   optional string remote_user_credentials = 3;
   repeated RpcCallInProgressPB calls_in_flight = 4;
+  optional int64 outbound_queue_size = 5;
 }
 
 message DumpRunningRpcsRequestPB {