You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/28 23:31:20 UTC
[3/3] impala git commit: KUDU-2301: (Part-1) Add instrumentation on a
per connection level
KUDU-2301: (Part-1) Add instrumentation on a per connection level
This patch returns the OutboundTransfer queue size on a per
connection level and makes them accessible via the
DumpRunningRpcs() call.
A test is added in rpc-test to ensure that this metric works as
expected.
A future patch will add more metrics.
Change-Id: Iae1a5fe0066adf644a9cac41ad6696e1bbf00465
Reviewed-on: http://gerrit.cloudera.org:8080/9343
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9383
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cf5ef7f7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cf5ef7f7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cf5ef7f7
Branch: refs/heads/master
Commit: cf5ef7f70983747103ca2d5052a9a61f7eb4b349
Parents: 6ec44ea
Author: Sailesh Mukil <sa...@apache.org>
Authored: Mon Feb 12 14:30:49 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 23:04:54 2018 +0000
----------------------------------------------------------------------
be/src/kudu/rpc/connection.cc | 4 +-
be/src/kudu/rpc/connection.h | 5 +++
be/src/kudu/rpc/messenger.h | 1 +
be/src/kudu/rpc/rpc-test.cc | 67 ++++++++++++++++++++++++++++
be/src/kudu/rpc/rpc_introspection.proto | 1 +
5 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.cc b/be/src/kudu/rpc/connection.cc
index bc7446e..2cde8c8 100644
--- a/be/src/kudu/rpc/connection.cc
+++ b/be/src/kudu/rpc/connection.cc
@@ -20,6 +20,7 @@
#include <stdint.h>
#include <algorithm>
+#include <cerrno>
#include <iostream>
#include <memory>
#include <set>
@@ -50,7 +51,6 @@
#include "kudu/util/status.h"
#include "kudu/util/trace.h"
-using std::function;
using std::includes;
using std::set;
using std::shared_ptr;
@@ -746,6 +746,8 @@ Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
c->call->DumpPB(req, resp->add_calls_in_flight());
}
}
+
+ resp->set_outbound_queue_size(num_queued_outbound_transfers());
} else if (direction_ == SERVER) {
if (negotiation_complete_) {
// It's racy to dump credentials while negotiating, since the Connection
http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h
index 9a78c14..e40f814 100644
--- a/be/src/kudu/rpc/connection.h
+++ b/be/src/kudu/rpc/connection.h
@@ -18,6 +18,7 @@
#ifndef KUDU_RPC_CONNECTION_H
#define KUDU_RPC_CONNECTION_H
+#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
@@ -220,6 +221,10 @@ class Connection : public RefCountedThreadSafe<Connection> {
scheduled_for_shutdown_ = true;
}
+ size_t num_queued_outbound_transfers() const {
+ return outbound_transfers_.size();
+ }
+
private:
friend struct CallAwaitingResponse;
friend class QueueTransferTask;
http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
index b0099d3..d1a1d79 100644
--- a/be/src/kudu/rpc/messenger.h
+++ b/be/src/kudu/rpc/messenger.h
@@ -311,6 +311,7 @@ class Messenger {
private:
FRIEND_TEST(TestRpc, TestConnectionKeepalive);
FRIEND_TEST(TestRpc, TestConnectionAlwaysKeepalive);
+ FRIEND_TEST(TestRpc, TestClientConnectionsMetrics);
FRIEND_TEST(TestRpc, TestCredentialsPolicy);
FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index f76cc8f..dcbe5a7 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -20,10 +20,15 @@
#include <limits.h>
#include <memory>
#include <string>
+#include <unistd.h>
#include <unordered_map>
#include <vector>
#include <boost/bind.hpp>
+#include <boost/core/ref.hpp>
+#include <boost/function.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/map-util.h"
@@ -31,6 +36,12 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/constants.h"
#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/proxy.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/rtest.pb.h"
#include "kudu/rpc/serialization.h"
#include "kudu/security/test/test_certs.h"
#include "kudu/util/countdown_latch.h"
@@ -418,6 +429,62 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections";
}
+// Test that the metrics on a per connection level work accurately.
+TEST_P(TestRpc, TestClientConnectionMetrics) {
+ // Only run one reactor per messenger, so we can grab the metrics from that
+ // one without having to check all.
+ n_server_reactor_threads_ = 1;
+ keepalive_time_ms_ = -1;
+
+ // Set up server.
+ Sockaddr server_addr;
+ bool enable_ssl = GetParam();
+ StartTestServer(&server_addr, enable_ssl);
+
+ // Set up client.
+ LOG(INFO) << "Connecting to " << server_addr.ToString();
+ shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+ Proxy p(client_messenger, server_addr, server_addr.host(),
+ GenericCalculatorService::static_service_name());
+
+ // Cause the reactor thread to be blocked for 2 seconds.
+ server_messenger_->ScheduleOnReactor(boost::bind(sleep, 2), MonoDelta::FromSeconds(0));
+
+ RpcController controller;
+ DumpRunningRpcsRequestPB dump_req;
+ DumpRunningRpcsResponsePB dump_resp;
+ dump_req.set_include_traces(false);
+
+ // We'll send several calls asynchronously to force RPC queueing on the sender side.
+ int n_calls = 1000;
+ AddRequestPB add_req;
+ add_req.set_x(rand());
+ add_req.set_y(rand());
+ AddResponsePB add_resp;
+
+ vector<unique_ptr<RpcController>> controllers;
+ CountDownLatch latch(n_calls);
+ for (int i = 0; i < n_calls; i++) {
+ controllers.emplace_back(new RpcController());
+ p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, &add_resp,
+ controllers.back().get(), boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
+ }
+
+ // Since we blocked the only reactor thread for sometime, we should see RPCs queued on the
+ // OutboundTransfer queue, unless the main thread is very slow.
+ ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp));
+ ASSERT_EQ(1, dump_resp.outbound_connections_size());
+ ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0);
+
+ // Wait for the calls to be marked finished.
+ latch.Wait();
+
+ // Verify that all the RPCs have finished.
+ for (const auto& controller : controllers) {
+ ASSERT_TRUE(controller->finished());
+ }
+}
+
// Test that outbound connections to the same server are reopen upon every RPC
// call when the 'rpc_reopen_outbound_connections' flag is set.
TEST_P(TestRpc, TestReopenOutboundConnections) {
http://git-wip-us.apache.org/repos/asf/impala/blob/cf5ef7f7/be/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_introspection.proto b/be/src/kudu/rpc/rpc_introspection.proto
index 5c4f4f1..7685903 100644
--- a/be/src/kudu/rpc/rpc_introspection.proto
+++ b/be/src/kudu/rpc/rpc_introspection.proto
@@ -62,6 +62,7 @@ message RpcConnectionPB {
// TODO: swap out for separate fields
optional string remote_user_credentials = 3;
repeated RpcCallInProgressPB calls_in_flight = 4;
+ optional int64 outbound_queue_size = 5;
}
message DumpRunningRpcsRequestPB {