You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/21 22:23:02 UTC

[1/4] impala git commit: IMPALA-4874: Increase maximum KRPC message size

Repository: impala
Updated Branches:
  refs/heads/master f562be8c2 -> a25f69355


IMPALA-4874: Increase maximum KRPC message size

The default value for rpc_max_message_size is 50MB.
Impala currently requires support for messages of
up to 2GB. This changes the value of rpc_max_message_size
to INT_MAX for Impala.

Testing:
- Added a test to test_very_large_strings that generates
  a row with multiple large strings. This row requires
  that the RPC framework successfully transmit over
  400MB. This works for both KRPC and Thrift.
  This query operates under the same amount of memory
  as other queries in large_strings.test.
- Tested separately that larger row sizes also work,
  including tests up to almost 2GB.

Change-Id: I876bba0536e1d85e41eacd9c0aeccfe5c2126e58
Reviewed-on: http://gerrit.cloudera.org:8080/9337
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/baec8cae
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/baec8cae
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/baec8cae

Branch: refs/heads/master
Commit: baec8cae34591c92c9309333280f85f81d852f8d
Parents: f562be8
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Feb 15 13:10:21 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 03:17:57 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/rpc-mgr.cc                              |  7 +++++++
 .../queries/QueryTest/large_strings.test           | 17 +++++++++++++++++
 2 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/baec8cae/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index d723280..7e05f38 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -57,6 +57,9 @@ DECLARE_string(ssl_minimum_version);
 // Defined in kudu/rpc/rpcz_store.cc
 DECLARE_int32(rpc_duration_too_long_ms);
 
+// Defined in kudu/rpc/transfer.cc
+DECLARE_int32(rpc_max_message_size);
+
 DEFINE_int32(num_acceptor_threads, 2,
     "Number of threads dedicated to accepting connection requests for RPC services");
 DEFINE_int32(num_reactor_threads, 0,
@@ -75,6 +78,10 @@ Status RpcMgr::Init() {
   // Log any RPCs which take longer than 2 minutes.
   FLAGS_rpc_duration_too_long_ms = 2 * 60 * 1000;
 
+  // IMPALA-4874: Impala requires support for messages up to 2GB. Override KRPC's default
+  //              maximum of 50MB.
+  FLAGS_rpc_max_message_size = numeric_limits<int32_t>::max();
+
   MessengerBuilder bld("impala-server");
   const scoped_refptr<MetricEntity> entity(
       METRIC_ENTITY_server.Instantiate(&registry_, "krpc-metrics"));

http://git-wip-us.apache.org/repos/asf/impala/blob/baec8cae/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/large_strings.test b/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
index 4419953..1d930db 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/large_strings.test
@@ -209,3 +209,20 @@ select length(madlib_decode_vector(concat_ws(',', s, s, s, s))) from (
 ---- CATCH
 String length larger than allowed limit of 1 GB character data
 =====
+---- QUERY
+# IMPALA-4874: Generate a large row made up of multiple large strings to test RPC
+#              transmission. This uses hashing to make this difficult to compress,
+#              which results in a larger row batch.
+select length(group_concat(h, "!")),
+       length(group_concat(h, "-"))
+from (
+select cast(fnv_hash(concat(l_comment, 'a')) as string) as h from tpch_parquet.lineitem union all
+select cast(fnv_hash(concat(l_comment, 'b')) as string) as h from tpch_parquet.lineitem union all
+select cast(fnv_hash(concat(l_comment, 'c')) as string) as h from tpch_parquet.lineitem union all
+select cast(fnv_hash(concat(l_comment, 'd')) as string) as h from tpch_parquet.lineitem union all
+select cast(fnv_hash(concat(l_comment, 'e')) as string) as h from tpch_parquet.lineitem) a;
+---- TYPES
+INT,INT
+---- RESULTS
+611468161,611468161
+=====
\ No newline at end of file


[4/4] impala git commit: IMPALA-6461 : Micro-optimizations to DataStreamSender::Send

Posted by ta...@apache.org.
IMPALA-6461 : Micro-optimizations to DataStreamSender::Send

While analyzing performance of partition exchange operator,
I noticed that there is dependency and a function call per row in the hot path.

Optimizations in this change are:
1) Remove the data dependency between computing the hash and the channel
2) Inline DataStreamSender::Channel::AddRow
3) Save partition_exprs_.size() to save a couple of instructions

This translates to improving CPI for DataStreamSender::Send by 10%

Change-Id: I642a9dad531a29d4838a3537ab0e04320a69960d
Reviewed-on: http://gerrit.cloudera.org:8080/9221
Reviewed-by: Mostafa Mokhtar <mm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a25f6935
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a25f6935
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a25f6935

Branch: refs/heads/master
Commit: a25f69355fa6cccaabcea86a434c4ca61ed7c8ac
Parents: 74dd9e9
Author: mmokhtar <mm...@cloudera.com>
Authored: Mon Feb 5 17:05:02 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 22:11:22 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-sender.cc      | 77 ++++++++++++++++++--------
 be/src/runtime/krpc-data-stream-sender.cc | 77 ++++++++++++++++++--------
 be/src/runtime/row-batch.h                |  4 ++
 3 files changed, 110 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a25f6935/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index c572467..30bf5b6 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -87,7 +87,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
   // Copies a single row into this channel's output buffer and flushes buffer
   // if it reaches capacity.
   // Returns error status if any of the preceding rpcs failed, OK otherwise.
-  Status AddRow(TupleRow* row) WARN_UNUSED_RESULT;
+  Status ALWAYS_INLINE AddRow(TupleRow* row) WARN_UNUSED_RESULT;
 
   // Asynchronously sends a row batch.
   // Returns the status of the most recently finished TransmitData
@@ -243,7 +243,7 @@ void DataStreamSender::Channel::WaitForRpc() {
   }
 }
 
-Status DataStreamSender::Channel::AddRow(TupleRow* row) {
+inline Status DataStreamSender::Channel::AddRow(TupleRow* row) {
   if (batch_->AtCapacity()) {
     // batch_ is full, let's send it; but first wait for an ongoing
     // transmission to finish before modifying thrift_batch_
@@ -443,16 +443,29 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
   } else if (partition_type_ == TPartitionType::KUDU) {
     DCHECK_EQ(partition_expr_evals_.size(), 1);
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      int32_t partition =
-          *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
-      if (partition < 0) {
-        // This row doesn't coorespond to a partition, e.g. it's outside the given ranges.
-        partition = next_unknown_partition_;
-        ++next_unknown_partition_;
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+      const int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        int32_t partition =
+            *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+        if (partition < 0) {
+          // This row doesn't correspond to a partition,
+          //  e.g. it's outside the given ranges.
+          partition = next_unknown_partition_;
+          ++next_unknown_partition_;
+        }
+        channel_ids[i] = partition % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
     }
   } else {
     DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED);
@@ -460,20 +473,36 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
     // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
     // once we have codegen here.
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      uint64_t hash_val = EXCHANGE_HASH_SEED;
-      for (int j = 0; j < partition_exprs_.size(); ++j) {
-        ScalarExprEvaluator* eval = partition_expr_evals_[j];
-        void* partition_val = eval->GetValue(row);
-        // We can't use the crc hash function here because it does not result in
-        // uncorrelated hashes with different seeds. Instead we use FastHash.
-        // TODO: fix crc hash/GetHashValue()
-        DCHECK(&(eval->root()) == partition_exprs_[j]);
-        hash_val = RawValue::GetHashValueFastHash(
-            partition_val, partition_exprs_[j]->type(), hash_val);
+    const int num_partition_exprs = partition_exprs_.size();
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    // Break the loop into two parts break the data dependency between computing
+    // the hash and calling AddRow()
+    // To keep stack allocation small a RowBatch::HASH_BATCH is used
+    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        uint64_t hash_val = EXCHANGE_HASH_SEED;
+        for (int j = 0; j < num_partition_exprs; ++j) {
+          ScalarExprEvaluator* eval = partition_expr_evals_[j];
+          void* partition_val = eval->GetValue(row);
+          // We can't use the crc hash function here because it does not result in
+          // uncorrelated hashes with different seeds. Instead we use FastHash.
+          // TODO: fix crc hash/GetHashValue()
+          DCHECK(&(eval->root()) == partition_exprs_[j]);
+          hash_val = RawValue::GetHashValueFastHash(
+              partition_val, partition_exprs_[j]->type(), hash_val);
+        }
+        channel_ids[i] = hash_val % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
     }
   }
   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());

http://git-wip-us.apache.org/repos/asf/impala/blob/a25f6935/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 4866e4e..6c0ad01 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -129,7 +129,7 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   // it reaches capacity. This call may block if the row batch's capacity is reached
   // and the preceding RPC is still in progress. Returns error status if serialization
   // failed or if the preceding RPC failed. Return OK otherwise.
-  Status AddRow(TupleRow* row);
+  Status ALWAYS_INLINE AddRow(TupleRow* row);
 
   // Shutdowns the channel and frees the row batch allocation. Any in-flight RPC will
   // be cancelled. It's expected that clients normally call FlushAndSendEos() before
@@ -474,7 +474,7 @@ Status KrpcDataStreamSender::Channel::SendCurrentBatch() {
   return Status::OK();
 }
 
-Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
+inline Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
   if (batch_->AtCapacity()) {
     // batch_ is full, let's send it.
     RETURN_IF_ERROR(SendCurrentBatch());
@@ -660,16 +660,29 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
   } else if (partition_type_ == TPartitionType::KUDU) {
     DCHECK_EQ(partition_expr_evals_.size(), 1);
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      int32_t partition =
-          *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
-      if (partition < 0) {
-        // This row doesn't correspond to a partition, e.g. it's outside the given ranges.
-        partition = next_unknown_partition_;
-        ++next_unknown_partition_;
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        int32_t partition =
+            *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+        if (partition < 0) {
+          // This row doesn't correspond to a partition,
+          // e.g. it's outside the given ranges.
+          partition = next_unknown_partition_;
+          ++next_unknown_partition_;
+        }
+        channel_ids[i] = partition % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
     }
   } else {
     DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED);
@@ -677,20 +690,36 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
     // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
     // once we have codegen here.
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      uint64_t hash_val = EXCHANGE_HASH_SEED;
-      for (int j = 0; j < partition_exprs_.size(); ++j) {
-        ScalarExprEvaluator* eval = partition_expr_evals_[j];
-        void* partition_val = eval->GetValue(row);
-        // We can't use the crc hash function here because it does not result in
-        // uncorrelated hashes with different seeds. Instead we use FastHash.
-        // TODO: fix crc hash/GetHashValue()
-        DCHECK(&(eval->root()) == partition_exprs_[j]);
-        hash_val = RawValue::GetHashValueFastHash(
-            partition_val, partition_exprs_[j]->type(), hash_val);
+    const int num_partition_exprs = partition_exprs_.size();
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    // Break the loop into two parts break the data dependency between computing
+    // the hash and calling AddRow()
+    // To keep stack allocation small a RowBatch::HASH_BATCH is used
+    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        uint64_t hash_val = EXCHANGE_HASH_SEED;
+        for (int j = 0; j < num_partition_exprs; ++j) {
+          ScalarExprEvaluator* eval = partition_expr_evals_[j];
+          void* partition_val = eval->GetValue(row);
+          // We can't use the crc hash function here because it does not result in
+          // uncorrelated hashes with different seeds. Instead we use FastHash.
+          // TODO: fix crc hash/GetHashValue()
+          DCHECK(&(eval->root()) == partition_exprs_[j]);
+          hash_val = RawValue::GetHashValueFastHash(
+              partition_val, partition_exprs_[j]->type(), hash_val);
+        }
+        channel_ids[i] = hash_val % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
     }
   }
   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());

http://git-wip-us.apache.org/repos/asf/impala/blob/a25f6935/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index aad5ebe..3bde4d1 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -385,6 +385,10 @@ class RowBatch {
   // in order to leave room for variable-length data.
   static const int FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
 
+  // Batch size to compute hash, keep it small to avoid large stack allocations.
+  // 16 provided the same speedup compared to operating over a full batch.
+  static const int HASH_BATCH_SIZE = 16;
+
   /// Allocates a buffer large enough for the fixed-length portion of 'capacity_' rows in
   /// this batch from 'tuple_data_pool_'. 'capacity_' is reduced if the allocation would
   /// exceed FIXED_LEN_BUFFER_LIMIT. Always returns enough space for at least one row.


[3/4] impala git commit: IMPALA-6269: Expose KRPC metrics on debug webpage

Posted by ta...@apache.org.
IMPALA-6269: Expose KRPC metrics on debug webpage

This change exposes KRPC metrics on the /rpcz debug web page.

This change also exposes metrics for rejected RPCs on the /metrics debug
web page. See here for an example: https://git.io/vAczm

This change also fixes a bug in PrettyPrinter::GetByteUnit(), which
previously did not work for unsigned values due to an implicit cast.

This change contains tests to check that the metrics show up in /rpcz
and /metrics and that they update as expected when executing queries.

This change is based on a change by Sailesh Mukil.

Change-Id: I7af7c1a84a5be82c979ca4ef1edf35167493be3f
Reviewed-on: http://gerrit.cloudera.org:8080/9292
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/74dd9e9b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/74dd9e9b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/74dd9e9b

Branch: refs/heads/master
Commit: 74dd9e9bfe32541e9ab6bddb8b839f3e491361f1
Parents: 678bf28
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Jan 23 11:01:29 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 10:19:06 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/impala-service-pool.cc         | 103 +++++++++++++--
 be/src/rpc/impala-service-pool.h          |  26 ++--
 be/src/rpc/rpc-mgr-kerberized-test.cc     |   8 +-
 be/src/rpc/rpc-mgr-test-base.h            |  28 +++--
 be/src/rpc/rpc-mgr-test.cc                |  14 ++-
 be/src/rpc/rpc-mgr.cc                     |  52 +++++++-
 be/src/rpc/rpc-mgr.h                      |  15 ++-
 be/src/rpc/rpc-trace.cc                   |  19 ++-
 be/src/rpc/rpc-trace.h                    |   3 +-
 be/src/runtime/exec-env.cc                |   3 +-
 be/src/runtime/exec-env.h                 |   4 +-
 be/src/service/impalad-main.cc            |   2 +-
 be/src/util/histogram-metric.h            |  37 ++++--
 be/src/util/pretty-printer.h              |  51 +++++---
 common/thrift/Metrics.thrift              |   3 +-
 common/thrift/metrics.json                |  10 ++
 tests/custom_cluster/test_krpc_metrics.py |  97 +++++++++++++++
 tests/webserver/test_web_pages.py         |  30 +++++
 www/rpcz.tmpl                             | 166 ++++++++++++++++++++-----
 19 files changed, 547 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 35a5d6d..fb50656 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -31,30 +31,49 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/service_queue.h"
+#include "kudu/util/hdr_histogram.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 
 #include "common/names.h"
 #include "common/status.h"
 
-METRIC_DEFINE_histogram(server, impala_unused,
+METRIC_DEFINE_histogram(server, impala_incoming_queue_time,
     "RPC Queue Time",
     kudu::MetricUnit::kMicroseconds,
     "Number of microseconds incoming RPC requests spend in the worker queue",
     60000000LU, 3);
 
+using namespace rapidjson;
+
 namespace impala {
+// Metric key format for rpc call duration metrics.
+const string RPC_QUEUE_OVERFLOW_METRIC_KEY = "rpc.$0.rpcs_queue_overflow";
 
 ImpalaServicePool::ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
-    size_t service_queue_length, kudu::rpc::ServiceIf* service,
+    size_t service_queue_length, kudu::rpc::GeneratedServiceIf* service,
     MemTracker* service_mem_tracker)
   : service_mem_tracker_(service_mem_tracker),
     service_(service),
     service_queue_(service_queue_length),
-    unused_histogram_(METRIC_impala_unused.Instantiate(entity)) {
+    incoming_queue_time_(METRIC_impala_incoming_queue_time.Instantiate(entity)) {
   DCHECK(service_mem_tracker_ != nullptr);
+  const TMetricDef& overflow_metric_def =
+      MetricDefs::Get(RPC_QUEUE_OVERFLOW_METRIC_KEY, service_->service_name());
+  rpcs_queue_overflow_ = ExecEnv::GetInstance()->rpc_metrics()->RegisterMetric(
+      new IntCounter(overflow_metric_def, 0L));
+  // Initialize additional histograms for each method of the service.
+  // TODO: Retrieve these from KRPC once KUDU-2313 has been implemented.
+  for (const auto& method : service_->methods_by_name()) {
+    const string& method_name = method.first;
+    string payload_size_name = Substitute("$0-payload-size", method_name);
+    payload_size_histograms_[method_name].reset(new HistogramMetric(
+        MakeTMetricDef(method_name, TMetricKind::HISTOGRAM, TUnit::BYTES),
+        1024 * 1024 * 1024, 3));
+  }
 }
 
 ImpalaServicePool::~ImpalaServicePool() {
@@ -101,7 +120,7 @@ void ImpalaServicePool::RejectTooBusy(kudu::rpc::InboundCall* c) {
                  service_->service_name(),
                  c->remote_address().ToString(),
                  service_queue_.max_size());
-  rpcs_queue_overflow_.Add(1);
+  rpcs_queue_overflow_->Increment(1);
   FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
                     kudu::Status::ServiceUnavailable(err_msg), c);
   VLOG(1) << err_msg << " Contents of service queue:\n"
@@ -202,12 +221,11 @@ void ImpalaServicePool::RunThread() {
     }
 
     // We need to call RecordHandlingStarted() to update the InboundCall timing.
-    incoming->RecordHandlingStarted(unused_histogram_);
+    incoming->RecordHandlingStarted(incoming_queue_time_);
     ADOPT_TRACE(incoming->trace());
 
     if (UNLIKELY(incoming->ClientTimedOut())) {
       TRACE_TO(incoming->trace(), "Skipping call since client already timed out"); // NOLINT(*)
-      rpcs_timed_out_in_queue_.Add(1);
 
       // Respond as a failure, even though the client will probably ignore
       // the response anyway.
@@ -217,10 +235,13 @@ void ImpalaServicePool::RunThread() {
       continue;
     }
 
-    TRACE_TO(incoming->trace(), "Handling call"); // NOLINT(*)
+    const string& method_name = incoming->remote_method().method_name();
+    int64_t transfer_size = incoming->GetTransferSize();
+    payload_size_histograms_[method_name]->Update(transfer_size);
 
-    // Release the InboundCall pointer -- when the call is responded to,
-    // it will get deleted at that point.
+    TRACE_TO(incoming->trace(), "Handling call"); // NOLINT(*)
+    // Release the InboundCall pointer -- when the call is responded to, it will get
+    // deleted at that point.
     service_->Handle(incoming.release());
   }
 }
@@ -229,4 +250,68 @@ const string ImpalaServicePool::service_name() const {
   return service_->service_name();
 }
 
+// Render a kudu::Histogram into a human readable string representation.
+// TODO: Switch to structured JSON (IMPALA-6545).
+const string KrpcHistogramToString(const kudu::Histogram* histogram) {
+  DCHECK(histogram != nullptr);
+  DCHECK_EQ(histogram->prototype()->unit(), kudu::MetricUnit::kMicroseconds);
+  kudu::HdrHistogram snapshot(*histogram->histogram());
+  return HistogramMetric::HistogramToHumanReadable(&snapshot, TUnit::TIME_US);
+}
+
+// Expose the service pool metrics by storing them as JSON in 'value'.
+void ImpalaServicePool::ToJson(rapidjson::Value* value, rapidjson::Document* document) {
+  // Add pool metrics.
+  Value service_name_val(service_name().c_str(), document->GetAllocator());
+  value->AddMember("service_name", service_name_val, document->GetAllocator());
+  value->AddMember("queue_size", service_queue_.estimated_queue_length(),
+      document->GetAllocator());
+  value->AddMember("idle_threads", service_queue_.estimated_idle_worker_count(),
+      document->GetAllocator());
+  value->AddMember("rpcs_queue_overflow", rpcs_queue_overflow_->GetValue(),
+      document->GetAllocator());
+
+  Value mem_usage(PrettyPrinter::Print(service_mem_tracker_->consumption(),
+      TUnit::BYTES).c_str(), document->GetAllocator());
+  value->AddMember("mem_usage", mem_usage, document->GetAllocator());
+
+  Value mem_peak(PrettyPrinter::Print(service_mem_tracker_->peak_consumption(),
+      TUnit::BYTES).c_str(), document->GetAllocator());
+  value->AddMember("mem_peak", mem_peak, document->GetAllocator());
+
+  Value incoming_queue_time(KrpcHistogramToString(incoming_queue_time_.get()).c_str(),
+      document->GetAllocator());
+  value->AddMember("incoming_queue_time", incoming_queue_time,
+      document->GetAllocator());
+
+  // Add method specific metrics.
+  const kudu::rpc::GeneratedServiceIf::MethodInfoMap& method_infos =
+      service_->methods_by_name();
+  Value rpc_method_metrics(kArrayType);
+  for (const auto& method : method_infos) {
+    Value method_entry(kObjectType);
+
+    const string& method_name = method.first;
+    Value method_name_val(method_name.c_str(), document->GetAllocator());
+    method_entry.AddMember("method_name", method_name_val, document->GetAllocator());
+
+    kudu::rpc::RpcMethodInfo* method_info = method.second.get();
+    kudu::Histogram* handler_latency = method_info->handler_latency_histogram.get();
+    Value handler_latency_val(KrpcHistogramToString(handler_latency).c_str(),
+        document->GetAllocator());
+    method_entry.AddMember("handler_latency", handler_latency_val,
+        document->GetAllocator());
+
+    HistogramMetric* payload_size = payload_size_histograms_[method_name].get();
+    DCHECK(payload_size != nullptr);
+    Value payload_size_val(payload_size->ToHumanReadable().c_str(),
+        document->GetAllocator());
+    method_entry.AddMember("payload_size", payload_size_val, document->GetAllocator());
+
+    rpc_method_metrics.PushBack(method_entry, document->GetAllocator());
+  }
+  value->AddMember("rpc_method_metrics", rpc_method_metrics, document->GetAllocator());
+}
+
+
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/rpc/impala-service-pool.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index 624e937..9d34366 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -46,7 +46,7 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   /// 'service_mem_tracker' is the MemTracker for tracking the memory usage of RPC
   /// payloads in the service queue.
   ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
-      size_t service_queue_length, kudu::rpc::ServiceIf* service,
+      size_t service_queue_length, kudu::rpc::GeneratedServiceIf* service,
       MemTracker* service_mem_tracker);
 
   virtual ~ImpalaServicePool();
@@ -57,13 +57,17 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   /// Shut down the queue and the thread pool.
   virtual void Shutdown();
 
-  kudu::rpc::RpcMethodInfo* LookupMethod(const kudu::rpc::RemoteMethod& method) override;
+  virtual kudu::rpc::RpcMethodInfo* LookupMethod(const kudu::rpc::RemoteMethod& method)
+    override;
 
   virtual kudu::Status
       QueueInboundCall(gscoped_ptr<kudu::rpc::InboundCall> call) OVERRIDE;
 
   const std::string service_name() const;
 
+  /// Expose the service pool metrics by storing them as JSON in 'value'.
+  void ToJson(rapidjson::Value* value, rapidjson::Document* document);
+
  private:
   void RunThread();
   void RejectTooBusy(kudu::rpc::InboundCall* c);
@@ -80,7 +84,7 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   MemTracker* const service_mem_tracker_;
 
   /// Reference to the implementation of the RPC handlers. Not owned.
-  kudu::rpc::ServiceIf* const service_;
+  kudu::rpc::GeneratedServiceIf* const service_;
 
   /// The set of service threads started to process incoming RPC calls.
   std::vector<std::unique_ptr<Thread>> threads_;
@@ -88,17 +92,15 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   /// The pending RPCs to be dequeued by the service threads.
   kudu::rpc::LifoServiceQueue service_queue_;
 
-  /// TODO: Display these metrics in the debug webpage. IMPALA-6269
-  /// Number of RPCs that timed out while waiting in the service queue.
-  AtomicInt32 rpcs_timed_out_in_queue_;
+  /// Histogram to track time spent by requests in the krpc incoming requests queue.
+  scoped_refptr<kudu::Histogram> incoming_queue_time_;
 
-  /// Number of RPCs that were rejected due to the queue being full.
-  AtomicInt32 rpcs_queue_overflow_;
+  /// Histogram for incoming request payload size for each method of this service.
+  std::unordered_map<std::string, std::unique_ptr<HistogramMetric>>
+      payload_size_histograms_;
 
-  /// Dummy histogram needed to call InboundCall::RecordHandlingStarted() to set
-  /// appropriate internal KRPC state. Unused otherwise.
-  /// TODO: Consider displaying this histogram in the debug webpage. IMPALA-6269
-  scoped_refptr<kudu::Histogram> unused_histogram_;
+  /// Number of RPCs that were rejected due to the queue being full. Not owned.
+  IntCounter* rpcs_queue_overflow_= nullptr;
 
   /// Protects against concurrent Shutdown() operations.
   /// TODO: This seems implausible given our current usage pattern.

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index 08c1971..bb4b9db 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "rpc/rpc-mgr-test-base.h"
+#include "service/fe-support.h"
 
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
@@ -27,7 +28,7 @@ static int kdc_port = GetServerPort();
 
 class RpcMgrKerberizedTest :
     public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
-  virtual void SetUp() {
+  virtual void SetUp() override {
     IpAddr ip;
     ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
     string spn = Substitute("impala-test/$0", ip);
@@ -42,7 +43,7 @@ class RpcMgrKerberizedTest :
     RpcMgrTestBase::SetUp();
   }
 
-  virtual void TearDown() {
+  virtual void TearDown() override {
     ASSERT_OK(kdc_wrapper_->TearDownMiniKDC(GetParam()));
     RpcMgrTestBase::TearDown();
   }
@@ -82,7 +83,8 @@ TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
 
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
 
   // Fill in the path of the current binary for use by the tests.
   CURRENT_EXECUTABLE_PATH = argv[0];

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
index 4a79040..ce063f8 100644
--- a/be/src/rpc/rpc-mgr-test-base.h
+++ b/be/src/rpc/rpc-mgr-test-base.h
@@ -26,6 +26,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "rpc/auth-provider.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "testutil/gtest-util.h"
 #include "testutil/mini-kdc-wrapper.h"
@@ -40,7 +41,7 @@
 
 #include "common/names.h"
 
-using kudu::rpc::ServiceIf;
+using kudu::rpc::GeneratedServiceIf;
 using kudu::rpc::RpcController;
 using kudu::rpc::RpcContext;
 using kudu::rpc::RpcSidecar;
@@ -132,7 +133,7 @@ template <class T> class RpcMgrTestBase : public T {
 
   // Takes over ownership of the newly created 'service' which needs to have a lifetime
   // as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 'service'.
-  ServiceIf* TakeOverService(std::unique_ptr<ServiceIf> service) {
+  GeneratedServiceIf* TakeOverService(std::unique_ptr<GeneratedServiceIf> service) {
     services_.emplace_back(move(service));
     return services_.back().get();
   }
@@ -141,14 +142,15 @@ template <class T> class RpcMgrTestBase : public T {
   TNetworkAddress krpc_address_;
   RpcMgr rpc_mgr_;
 
-  virtual void SetUp() {
+  virtual void SetUp() override {
     IpAddr ip;
     ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
     krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
+    exec_env_.reset(new ExecEnv());
     ASSERT_OK(rpc_mgr_.Init());
   }
 
-  virtual void TearDown() {
+  virtual void TearDown() override {
     rpc_mgr_.Shutdown();
   }
 
@@ -156,7 +158,10 @@ template <class T> class RpcMgrTestBase : public T {
   int32_t payload_[PAYLOAD_SIZE];
 
   // Own all the services used by the test.
-  std::vector<std::unique_ptr<ServiceIf>> services_;
+  std::vector<std::unique_ptr<GeneratedServiceIf>> services_;
+
+  // Required to set up the RPC metric groups used by the service pool.
+  std::unique_ptr<ExecEnv> exec_env_;
 };
 
 typedef std::function<void(RpcContext*)> ServiceCB;
@@ -169,8 +174,8 @@ class PingServiceImpl : public PingServiceIf {
       ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
     : PingServiceIf(entity, tracker), mem_tracker_(-1, "Ping Service"), cb_(cb) {}
 
-  virtual void Ping(
-      const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
+  virtual void Ping(const PingRequestPB* request, PingResponsePB* response, RpcContext*
+      context) override {
     response->set_int_response(42);
     // Incoming requests will already be tracked and we need to release the memory.
     mem_tracker_.Release(context->GetTransferSize());
@@ -194,7 +199,7 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
   // The request comes with an int 'pattern' and a payload of int array sent with
   // sidecar. Scan the array to make sure every element matches 'pattern'.
   virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
-      RpcContext* context) {
+      RpcContext* context) override {
     int32_t pattern = request->pattern();
     Slice payload;
     ASSERT_OK(
@@ -228,15 +233,16 @@ template <class T>
 Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
     RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
   // Test that a service can be started, and will respond to requests.
-  ServiceIf* ping_impl = test_base->TakeOverService(make_unique<PingServiceImpl>(
+  GeneratedServiceIf* ping_impl = test_base->TakeOverService(make_unique<PingServiceImpl>(
       rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
   RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
       static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
 
   // Test that a second service, that verifies the RPC payload is not corrupted,
   // can be started.
-  ServiceIf* scan_mem_impl = test_base->TakeOverService(make_unique<ScanMemServiceImpl>(
-      rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+  GeneratedServiceIf* scan_mem_impl = test_base->TakeOverService(
+      make_unique<ScanMemServiceImpl>(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker()));
   RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
       static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 8d5312f..e90838e 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -16,8 +16,9 @@
 // under the License.
 
 #include "rpc/rpc-mgr-test-base.h"
+#include "service/fe-support.h"
 
-using kudu::rpc::ServiceIf;
+using kudu::rpc::GeneratedServiceIf;
 using kudu::rpc::RpcController;
 using kudu::rpc::RpcContext;
 using kudu::MonoDelta;
@@ -31,11 +32,11 @@ namespace impala {
 
 // For tests that do not require kerberized testing, we use RpcTest.
 class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
-  virtual void SetUp() {
+  virtual void SetUp() override {
     RpcMgrTestBase::SetUp();
   }
 
-  virtual void TearDown() {
+  virtual void TearDown() override {
     RpcMgrTestBase::TearDown();
   }
 };
@@ -178,7 +179,7 @@ TEST_F(RpcMgrTest, SlowCallback) {
   // Test a service which is slow to respond and has a short queue.
   // Set a timeout on the client side. Expect either a client timeout
   // or the service queue filling up.
-  ServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(
+  GeneratedServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(
       rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
   const int num_service_threads = 1;
   const int queue_size = 3;
@@ -204,7 +205,7 @@ TEST_F(RpcMgrTest, SlowCallback) {
 }
 
 TEST_F(RpcMgrTest, AsyncCall) {
-  ServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(
+  GeneratedServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(
       rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
   ASSERT_OK(rpc_mgr_.RegisterService(10, 10, scan_mem_impl,
       static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
@@ -257,7 +258,8 @@ TEST_F(RpcMgrTest, NegotiationTimeout) {
 
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
 
   // Fill in the path of the current binary for use by the tests.
   CURRENT_EXECUTABLE_PATH = argv[0];

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 7e05f38..ed9614e 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -31,14 +31,19 @@
 
 #include "common/names.h"
 
+using namespace rapidjson;
+
 using kudu::HostPort;
 using kudu::MetricEntity;
 using kudu::MonoDelta;
 using kudu::rpc::AcceptorPool;
+using kudu::rpc::DumpRunningRpcsRequestPB;
+using kudu::rpc::DumpRunningRpcsResponsePB;
 using kudu::rpc::MessengerBuilder;
 using kudu::rpc::Messenger;
+using kudu::rpc::RpcConnectionPB;
 using kudu::rpc::RpcController;
-using kudu::rpc::ServiceIf;
+using kudu::rpc::GeneratedServiceIf;
 using kudu::Sockaddr;
 
 DECLARE_string(hostname);
@@ -127,7 +132,7 @@ Status RpcMgr::Init() {
 }
 
 Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-    ServiceIf* service_ptr, MemTracker* service_mem_tracker) {
+    GeneratedServiceIf* service_ptr, MemTracker* service_mem_tracker) {
   DCHECK(is_inited()) << "Must call Init() before RegisterService()";
   DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()";
   scoped_refptr<ImpalaServicePool> service_pool = new ImpalaServicePool(
@@ -154,10 +159,9 @@ Status RpcMgr::StartServices(const TNetworkAddress& address) {
   RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
 
   // Call the messenger to create an AcceptorPool for us.
-  shared_ptr<AcceptorPool> acceptor_pool;
-  KUDU_RETURN_IF_ERROR(messenger_->AddAcceptorPool(sockaddr, &acceptor_pool),
+  KUDU_RETURN_IF_ERROR(messenger_->AddAcceptorPool(sockaddr, &acceptor_pool_),
       "Failed to add acceptor pool");
-  KUDU_RETURN_IF_ERROR(acceptor_pool->Start(FLAGS_num_acceptor_threads),
+  KUDU_RETURN_IF_ERROR(acceptor_pool_->Start(FLAGS_num_acceptor_threads),
       "Acceptor pool failed to start");
   VLOG_QUERY << "Started " << FLAGS_num_acceptor_threads << " acceptor threads";
   services_started_ = true;
@@ -167,6 +171,7 @@ Status RpcMgr::StartServices(const TNetworkAddress& address) {
 void RpcMgr::Shutdown() {
   if (messenger_.get() == nullptr) return;
   for (auto service_pool : service_pools_) service_pool->Shutdown();
+  acceptor_pool_.reset();
 
   messenger_->UnregisterAllServices();
   messenger_->Shutdown();
@@ -180,4 +185,41 @@ bool RpcMgr::IsServerTooBusy(const RpcController& rpc_controller) {
       err->code() == kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY;
 }
 
+void RpcMgr::ToJson(Document* document) {
+  if (messenger_.get() == nullptr) return;
+  // Add acceptor metrics.
+  int64_t num_accepted = 0;
+  if (acceptor_pool_.get() != nullptr) {
+    num_accepted = acceptor_pool_->num_rpc_connections_accepted();
+  }
+  document->AddMember("rpc_connections_accepted", num_accepted, document->GetAllocator());
+
+  // Add messenger metrics.
+  DumpRunningRpcsResponsePB response;
+  messenger_->DumpRunningRpcs(DumpRunningRpcsRequestPB(), &response);
+
+  int64_t num_inbound_calls_in_flight = 0;
+  for (const RpcConnectionPB& conn : response.inbound_connections()) {
+    num_inbound_calls_in_flight += conn.calls_in_flight().size();
+  }
+  document->AddMember("num_inbound_calls_in_flight", num_inbound_calls_in_flight,
+      document->GetAllocator());
+
+  int64_t num_outbound_calls_in_flight = 0;
+  for (const RpcConnectionPB& conn : response.outbound_connections()) {
+    num_outbound_calls_in_flight += conn.calls_in_flight().size();
+  }
+  document->AddMember("num_outbound_calls_in_flight", num_outbound_calls_in_flight,
+      document->GetAllocator());
+
+  // Add service pool metrics
+  Value services(kArrayType);
+  for (auto service_pool : service_pools_) {
+    Value service_entry(kObjectType);
+    service_pool->ToJson(&service_entry, document);
+    services.PushBack(service_entry, document->GetAllocator());
+  }
+  document->AddMember("services", services, document->GetAllocator());
+}
+
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index e87b559..c7107d2 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -29,7 +29,7 @@
 namespace kudu {
 namespace rpc {
 class RpcController;
-class ServiceIf;
+class GeneratedServiceIf;
 } // rpc
 } // kudu
 
@@ -95,6 +95,9 @@ namespace impala {
 /// Inbound connection set-up is handled by a small fixed-size pool of 'acceptor'
 /// threads. The number of threads that accept new TCP connection requests to the service
 /// port is configurable via FLAGS_acceptor_threads.
+///
+/// If 'use_tls' is true, then the underlying messenger is configured with the required
+/// certificates, and encryption is enabled and marked as required.
 class RpcMgr {
  public:
   RpcMgr(bool use_tls = false) : use_tls_(use_tls) {}
@@ -127,7 +130,7 @@ class RpcMgr {
   ///
   /// It is an error to call this after StartServices() has been called.
   Status RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-      kudu::rpc::ServiceIf* service_ptr, MemTracker* service_mem_tracker)
+      kudu::rpc::GeneratedServiceIf* service_ptr, MemTracker* service_mem_tracker)
       WARN_UNUSED_RESULT;
 
   /// Creates a new proxy for a remote service of type P at location 'address', and places
@@ -157,6 +160,11 @@ class RpcMgr {
 
   std::shared_ptr<kudu::rpc::Messenger> messenger() { return messenger_; }
 
+  /// Writes a JSON representation of the RpcMgr's metrics to a value named 'services' in
+  /// 'document'. It will include the number of RPCs accepted so far, the number of calls
+  /// in flight, and metrics and histograms for each service and their methods.
+  void ToJson(rapidjson::Document* document);
+
   ~RpcMgr() {
     DCHECK_EQ(service_pools_.size(), 0)
         << "Must call Shutdown() before destroying RpcMgr";
@@ -175,6 +183,9 @@ class RpcMgr {
   /// track results for idempotent RPC calls.
   const scoped_refptr<kudu::rpc::ResultTracker> tracker_;
 
+  /// Holds a reference to the acceptor pool. Shared ownership with messenger_.
+  std::shared_ptr<kudu::rpc::AcceptorPool> acceptor_pool_;
+
   /// Container for reactor threads which run event loops for RPC services, plus acceptor
   /// threads which manage connection setup. Has to be a shared_ptr as required by
   /// MessangerBuilder::Build().

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/rpc/rpc-trace.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-trace.cc b/be/src/rpc/rpc-trace.cc
index f4da19b..028f397 100644
--- a/be/src/rpc/rpc-trace.cc
+++ b/be/src/rpc/rpc-trace.cc
@@ -22,6 +22,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
+#include "rpc/rpc-mgr.h"
 #include "util/debug-util.h"
 #include "util/time.h"
 #include "util/webserver.h"
@@ -39,6 +40,8 @@ const string RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY = "rpc-method.$0.call_d
 // web-based summary page.
 class RpcEventHandlerManager {
  public:
+  RpcEventHandlerManager(RpcMgr* rpc_mgr) : rpc_mgr_(rpc_mgr) {}
+
   // Adds an event handler to the list of those tracked
   void RegisterEventHandler(RpcEventHandler* event_handler);
 
@@ -64,14 +67,19 @@ class RpcEventHandlerManager {
   // after they are started, event handlers have a lifetime equivalent to the length of
   // the process.
   vector<RpcEventHandler*> event_handlers_;
+
+  // Points to an RpcMgr. If this is not null, then its metrics will be included in the
+  // output of JsonCallback. Not owned, but the object must be guaranteed to live as long
+  // as the process lives.
+  RpcMgr* rpc_mgr_ = nullptr;
 };
 
 // Only instance of RpcEventHandlerManager
 scoped_ptr<RpcEventHandlerManager> handler_manager;
 
-void impala::InitRpcEventTracing(Webserver* webserver) {
-  handler_manager.reset(new RpcEventHandlerManager());
-  if (webserver != NULL) {
+void impala::InitRpcEventTracing(Webserver* webserver, RpcMgr* rpc_mgr) {
+  handler_manager.reset(new RpcEventHandlerManager(rpc_mgr));
+  if (webserver != nullptr) {
     Webserver::UrlCallback json = bind<void>(
         mem_fn(&RpcEventHandlerManager::JsonCallback), handler_manager.get(), _1, _2);
     webserver->RegisterUrlCallback("/rpcz", "rpcz.tmpl", json);
@@ -83,7 +91,7 @@ void impala::InitRpcEventTracing(Webserver* webserver) {
 }
 
 void RpcEventHandlerManager::RegisterEventHandler(RpcEventHandler* event_handler) {
-  DCHECK(event_handler != NULL);
+  DCHECK(event_handler != nullptr);
   lock_guard<mutex> l(lock_);
   event_handlers_.push_back(event_handler);
 }
@@ -98,6 +106,7 @@ void RpcEventHandlerManager::JsonCallback(const Webserver::ArgumentMap& args,
     servers.PushBack(server, document->GetAllocator());
   }
   document->AddMember("servers", servers, document->GetAllocator());
+  if (rpc_mgr_ != nullptr) rpc_mgr_->ToJson(document);
 }
 
 void RpcEventHandlerManager::ResetCallback(const Webserver::ArgumentMap& args,
@@ -137,7 +146,7 @@ void RpcEventHandler::ResetAll() {
 
 RpcEventHandler::RpcEventHandler(const string& server_name, MetricGroup* metrics) :
     server_name_(server_name), metrics_(metrics) {
-  if (handler_manager.get() != NULL) handler_manager->RegisterEventHandler(this);
+  if (handler_manager.get() != nullptr) handler_manager->RegisterEventHandler(this);
 }
 
 void RpcEventHandler::ToJson(Value* server, Document* document) {

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/rpc/rpc-trace.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-trace.h b/be/src/rpc/rpc-trace.h
index 1bd7823..12c7395 100644
--- a/be/src/rpc/rpc-trace.h
+++ b/be/src/rpc/rpc-trace.h
@@ -31,6 +31,7 @@
 namespace impala {
 
 class MetricGroup;
+class RpcMgr;
 class Webserver;
 
 /// An RpcEventHandler is called every time an Rpc is started and completed. There is at
@@ -126,7 +127,7 @@ class RpcEventHandler : public apache::thrift::TProcessorEventHandler {
 };
 
 /// Initialises rpc event tracing, must be called before any RpcEventHandlers are created.
-void InitRpcEventTracing(Webserver* webserver);
+void InitRpcEventTracing(Webserver* webserver, RpcMgr* = nullptr);
 
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index ce8dae0..8a51d88 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -67,7 +67,7 @@
 #include "common/names.h"
 
 using boost::algorithm::join;
-using kudu::rpc::ServiceIf;
+using kudu::rpc::GeneratedServiceIf;
 using namespace strings;
 
 DEFINE_string(catalog_service_host, "localhost",
@@ -152,6 +152,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
     frontend_(new Frontend()),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     query_exec_mgr_(new QueryExecMgr()),
+    rpc_metrics_(metrics_->GetOrCreateChildGroup("rpc")),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     backend_address_(MakeNetworkAddress(hostname, backend_port)) {
 

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 3f050c9..7741276 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -83,7 +83,7 @@ class ExecEnv {
       int subscriber_port, int webserver_port, const std::string& statestore_host,
       int statestore_port);
 
-  /// Returns the first created exec env instance. In a normal impalad, this is
+  /// Returns the most recently created exec env instance. In a normal impalad, this is
   /// the only instance. In test setups with multiple ExecEnv's per process,
   /// we return the most recently created instance.
   static ExecEnv* GetInstance() { return exec_env_; }
@@ -123,6 +123,7 @@ class ExecEnv {
   io::DiskIoMgr* disk_io_mgr() { return disk_io_mgr_.get(); }
   Webserver* webserver() { return webserver_.get(); }
   MetricGroup* metrics() { return metrics_.get(); }
+  MetricGroup* rpc_metrics() { return rpc_metrics_; }
   MemTracker* process_mem_tracker() { return mem_tracker_.get(); }
   ThreadResourceMgr* thread_mgr() { return thread_mgr_.get(); }
   HdfsOpThreadPool* hdfs_op_thread_pool() { return hdfs_op_thread_pool_.get(); }
@@ -210,6 +211,7 @@ class ExecEnv {
 
   /// Not owned by this class
   ImpalaServer* impala_server_ = nullptr;
+  MetricGroup* rpc_metrics_ = nullptr;
 
   bool enable_webserver_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 8463467..b4ec3f0 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -75,7 +75,7 @@ int ImpaladMain(int argc, char** argv) {
   ABORT_IF_ERROR(StartMemoryMaintenanceThread()); // Memory metrics are created in Init().
   ABORT_IF_ERROR(
       StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver(), true));
-  InitRpcEventTracing(exec_env.webserver());
+  InitRpcEventTracing(exec_env.webserver(), exec_env.rpc_mgr());
 
   boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
   Status status =

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/util/histogram-metric.h
----------------------------------------------------------------------
diff --git a/be/src/util/histogram-metric.h b/be/src/util/histogram-metric.h
index a520947..d4e09e4 100644
--- a/be/src/util/histogram-metric.h
+++ b/be/src/util/histogram-metric.h
@@ -39,7 +39,7 @@ class HistogramMetric : public Metric {
     DCHECK_EQ(TMetricKind::HISTOGRAM, def.kind);
   }
 
-  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* value) {
+  virtual void ToJson(rapidjson::Document* document, rapidjson::Value* value) override {
     rapidjson::Value container(rapidjson::kObjectType);
     AddStandardFields(document, &container);
 
@@ -84,36 +84,47 @@ class HistogramMetric : public Metric {
     histogram_.reset(new HdrHistogram(highest, digits));
   }
 
-  virtual void ToLegacyJson(rapidjson::Document*) { }
+  virtual void ToLegacyJson(rapidjson::Document*) override {}
 
   const TUnit::type& unit() const { return unit_; }
 
-  virtual std::string ToHumanReadable() {
+  virtual std::string ToHumanReadable() override {
     boost::lock_guard<SpinLock> l(lock_);
+    return HistogramToHumanReadable(histogram_.get(), unit_);
+  }
+
+  /// Render a HdrHistogram into a human readable string representation. The histogram
+  /// type is a template parameter so that it accepts both Impala's and Kudu's
+  /// HdrHistogram classes.
+  template <class T>
+  static std::string HistogramToHumanReadable(T* histogram, TUnit::type unit) {
+    DCHECK(histogram != nullptr);
     std::stringstream out;
-    out << "Count: " << histogram_->TotalCount() << ", "
-        << "min / max: " << PrettyPrinter::Print(histogram_->MinValue(), unit_)
-        << " / " << PrettyPrinter::Print(histogram_->MaxValue(), unit_) << ", "
+    out << "Count: " << histogram->TotalCount() << ", "
+        << "min / max: " << PrettyPrinter::Print(histogram->MinValue(), unit)
+        << " / " << PrettyPrinter::Print(histogram->MaxValue(), unit) << ", "
         << "25th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(25), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(25), unit) << ", "
         << "50th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(50), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(50), unit) << ", "
         << "75th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(75), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(75), unit) << ", "
         << "90th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(90), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(90), unit) << ", "
         << "95th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(95), unit_) << ", "
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(95), unit) << ", "
         << "99.9th %-ile: "
-        << PrettyPrinter::Print(histogram_->ValueAtPercentile(99.9), unit_);
+        << PrettyPrinter::Print(histogram->ValueAtPercentile(99.9), unit);
     return out.str();
   }
 
  private:
-  // Protects histogram_ pointer itself.
+  /// Protects histogram_ pointer itself.
   SpinLock lock_;
   boost::scoped_ptr<HdrHistogram> histogram_;
   const TUnit::type unit_;
+
+  DISALLOW_COPY_AND_ASSIGN(HistogramMetric);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/be/src/util/pretty-printer.h
----------------------------------------------------------------------
diff --git a/be/src/util/pretty-printer.h b/be/src/util/pretty-printer.h
index 5e76db1..03ba99e 100644
--- a/be/src/util/pretty-printer.h
+++ b/be/src/util/pretty-printer.h
@@ -91,22 +91,12 @@ class PrettyPrinter {
       }
 
       case TUnit::TIME_NS: {
-        ss << std::setprecision(TIME_NS_PRECISION);
-        if (value >= BILLION) {
-          /// If the time is over a second, print it up to ms.
-          value /= MILLION;
-          PrintTimeMs(value, &ss);
-        } else if (value >= MILLION) {
-          /// if the time is over a ms, print it up to microsecond in the unit of ms.
-          ss << DOUBLE_TRUNCATE(static_cast<double>(value) / MILLION, TIME_NS_PRECISION)
-             << "ms";
-        } else if (value > 1000) {
-          /// if the time is over a microsecond, print it using unit microsecond
-          ss << DOUBLE_TRUNCATE(static_cast<double>(value) / 1000, TIME_NS_PRECISION)
-             << "us";
-        } else {
-          ss << DOUBLE_TRUNCATE(value, TIME_NS_PRECISION) << "ns";
-        }
+        PrintTimeNs(value, &ss);
+        break;
+      }
+
+      case TUnit::TIME_US: {
+        PrintTimeNs(value * THOUSAND, &ss);
         break;
       }
 
@@ -204,13 +194,13 @@ class PrettyPrinter {
     if (value == 0) {
       *unit = "";
       return value;
-    } else if (value >= GIGABYTE || value <= -GIGABYTE) {
+    } else if (value >= GIGABYTE || (value < 0 && value <= -GIGABYTE)) {
       *unit = "GB";
       return value / (double) GIGABYTE;
-    } else if (value >= MEGABYTE || value <= -MEGABYTE ) {
+    } else if (value >= MEGABYTE || (value < 0 && value <= -MEGABYTE)) {
       *unit = "MB";
       return value / (double) MEGABYTE;
-    } else if (value >= KILOBYTE || value <= -KILOBYTE)  {
+    } else if (value >= KILOBYTE || (value < 0 && value <= -KILOBYTE)) {
       *unit = "KB";
       return value / (double) KILOBYTE;
     } else {
@@ -247,7 +237,28 @@ class PrettyPrinter {
     return fmod(value, 1. * modulus);
   }
 
-  /// Print the value (time in ms) to ss
+  /// Pretty print the value (time in ns) to ss.
+  template <typename T>
+  static void PrintTimeNs(T value, std::stringstream* ss) {
+    *ss << std::setprecision(TIME_NS_PRECISION);
+    if (value >= BILLION) {
+      /// If the time is over a second, print it up to ms.
+      value /= MILLION;
+      PrintTimeMs(value, ss);
+    } else if (value >= MILLION) {
+      /// if the time is over a ms, print it up to microsecond in the unit of ms.
+      *ss << DOUBLE_TRUNCATE(static_cast<double>(value) / MILLION, TIME_NS_PRECISION)
+        << "ms";
+    } else if (value > THOUSAND) {
+      /// if the time is over a microsecond, print it using unit microsecond.
+      *ss << DOUBLE_TRUNCATE(static_cast<double>(value) / THOUSAND, TIME_NS_PRECISION)
+        << "us";
+    } else {
+      *ss << DOUBLE_TRUNCATE(value, TIME_NS_PRECISION) << "ns";
+    }
+  }
+
+  /// Print the value (time in ms) to ss.
   template <typename T>
   static void PrintTimeMs(T value, std::stringstream* ss) {
     DCHECK_GE(value, static_cast<T>(0));

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/common/thrift/Metrics.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Metrics.thrift b/common/thrift/Metrics.thrift
index 04f5946..4f2c7f2 100644
--- a/common/thrift/Metrics.thrift
+++ b/common/thrift/Metrics.thrift
@@ -32,7 +32,8 @@ enum TUnit {
   // No units at all, may not be a numerical quantity
   NONE,
   TIME_MS,
-  TIME_S
+  TIME_S,
+  TIME_US
 }
 
 // The kind of value that a metric represents.

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index b457741..6328cd4 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1635,5 +1635,15 @@
     "units": "NONE",
     "kind": "PROPERTY",
     "key": "kudu-client.version"
+  },
+  {
+    "description": "Service $0: Total number of incoming RPCs that were rejected due to overflow of the service queue.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Service $0 Incoming RPC Queue Overflows",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "rpc.$0.rpcs_queue_overflow"
   }
 ]

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/tests/custom_cluster/test_krpc_metrics.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_metrics.py b/tests/custom_cluster/test_krpc_metrics.py
new file mode 100644
index 0000000..ec56f41
--- /dev/null
+++ b/tests/custom_cluster/test_krpc_metrics.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import pytest
+import requests
+import time
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import ImpalaCluster
+from tests.common.skip import SkipIf, SkipIfBuildType
+from tests.verifiers.mem_usage_verifier import MemUsageVerifier
+
+@SkipIf.not_krpc
+class TestKrpcMetrics(CustomClusterTestSuite):
+  """Test for KRPC metrics that require special arguments during cluster startup."""
+  RPCZ_URL = 'http://localhost:25000/rpcz?json'
+  METRICS_URL = 'http://localhost:25000/metrics?json'
+  TEST_QUERY = 'select count(*) from tpch_parquet.lineitem l1 \
+      join tpch_parquet.lineitem l2 where l1.l_orderkey = l2.l_orderkey;'
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestKrpcMetrics, cls).setup_class()
+
+  def get_debug_page(self, page_url):
+    """Returns the content of the debug page 'page_url' as json."""
+    response = requests.get(page_url)
+    assert response.status_code == requests.codes.ok
+    return json.loads(response.text)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
+                                     -datastream_service_num_svc_threads=1')
+  def test_krpc_queue_overflow_rpcz(self, vector):
+    """Test that rejected RPCs show up on the /rpcz debug web page.
+    """
+    def get_rpc_overflows():
+      rpcz = self.get_debug_page(self.RPCZ_URL)
+      assert len(rpcz['services']) > 0
+      for s in rpcz['services']:
+        if s['service_name'] == 'impala.DataStreamService':
+          return int(s['rpcs_queue_overflow'])
+      assert False, "Could not find DataStreamService metrics"
+
+    before = get_rpc_overflows()
+    assert before == 0
+    self.client.execute(self.TEST_QUERY)
+    after = get_rpc_overflows()
+
+    assert before < after
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args('-datastream_service_queue_mem_limit=1B \
+                                     -datastream_service_num_svc_threads=1')
+  def test_krpc_queue_overflow_metrics(self, vector):
+    """Test that rejected RPCs show up on the /metrics debug web page.
+    """
+    def iter_metrics(group):
+      for m in group['metrics']:
+        yield m
+      for c in group['child_groups']:
+        for m in iter_metrics(c):
+          yield m
+
+    def get_metric(name):
+      metrics = self.get_debug_page(self.METRICS_URL)['metric_group']
+      for m in iter_metrics(metrics):
+        if m['name'] == name:
+          return int(m['value'])
+
+    metric_name = 'rpc.impala.DataStreamService.rpcs_queue_overflow'
+    before = get_metric(metric_name)
+    assert before == 0
+
+    self.client.execute(self.TEST_QUERY)
+    after = get_metric(metric_name)
+    assert before < after

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 8dd17a4..c736cb3 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from tests.common.skip import SkipIf
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 import json
@@ -32,6 +33,7 @@ class TestWebPage(ImpalaTestSuite):
   TABLE_METRICS_URL = "http://localhost:{0}/table_metrics"
   QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends"
   QUERY_FINSTANCES_URL = "http://localhost:{0}/query_finstances"
+  RPCZ_URL = "http://localhost:{0}/rpcz"
   THREAD_GROUP_URL = "http://localhost:{0}/thread-group"
   # log4j changes do not apply to the statestore since it doesn't
   # have an embedded JVM. So we make two sets of ports to test the
@@ -72,6 +74,11 @@ class TestWebPage(ImpalaTestSuite):
           and string_to_search in response.text, "Offending url: " + input_url
     return response.text
 
+  def get_debug_page(self, page_url):
+    """Returns the content of the debug page 'page_url' as json."""
+    response = self.get_and_check_status(page_url + "?json", ports_to_test=[25000])
+    return json.loads(response)
+
   def get_and_check_status_jvm(self, url, string_to_search = ""):
     """Calls get_and_check_status() for impalad and catalogd only"""
     return self.get_and_check_status(url, string_to_search,
@@ -229,3 +236,26 @@ class TestWebPage(ImpalaTestSuite):
     for pattern in expected_name_patterns:
       assert any(pattern in t for t in thread_names), \
            "Could not find thread matching '%s'" % pattern
+
+  @SkipIf.not_krpc
+  def test_krpc_rpcz(self):
+    """Test that KRPC metrics are exposed in /rpcz and that they are updated when
+    executing a query."""
+    TEST_QUERY = "select count(c2.string_col) from \
+        functional.alltypestiny join functional.alltypessmall c2"
+    SVC_NAME = 'impala.DataStreamService'
+
+    def get_svc_metrics(svc_name):
+      rpcz = self.get_debug_page(self.RPCZ_URL)
+      assert len(rpcz['services']) > 0
+      for s in rpcz['services']:
+        if s['service_name'] == svc_name:
+          assert len(s['rpc_method_metrics']) > 0, '%s metrics are empty' % svc_name
+          return sorted(s['rpc_method_metrics'], key=lambda m: m['method_name'])
+      assert False, 'Could not find metrics for %s' % svc_name
+
+    before = get_svc_metrics(SVC_NAME)
+    self.client.execute(TEST_QUERY)
+    after = get_svc_metrics(SVC_NAME)
+
+    assert before != after

http://git-wip-us.apache.org/repos/asf/impala/blob/74dd9e9b/www/rpcz.tmpl
----------------------------------------------------------------------
diff --git a/www/rpcz.tmpl b/www/rpcz.tmpl
index e62e662..8ade9ef 100644
--- a/www/rpcz.tmpl
+++ b/www/rpcz.tmpl
@@ -18,11 +18,7 @@ under the License.
 -->
 {{> www/common-header.tmpl }}
 
-<h2>RPC durations
-  <button class="btn btn-warning btn-xs" onClick="reset_all();">
-    Reset all
-  </button>
-</h2>
+<h2>RPC durations</h2>
 
 <p class="lead">This page shows the durations of all RPCs served by this
  <samp>{{__common__.process-name}}</samp> process.
@@ -33,6 +29,70 @@ under the License.
   <span id="refresh_on">Auto-refresh on</span>
 </label>  Last updated: <span id="last-updated"></span>
 
+{{?services}}
+<h2>KRPC Services</h2>
+{{/services}}
+{{#services}}
+<h3><samp>{{service_name}}</samp></h3>
+<table class="table table-hover table-bordered" id="{{service_name}}_metrics">
+  <tbody>
+  <tr>
+    <td>
+      <table class="table table-hover">
+        <tr>
+          <th>Queue Size</th>
+          <th>Idle Threads</th>
+          <th>Current Memory Usage</th>
+          <th>Peak Memory Usage</th>
+          <th>RPCs Rejected due to Queue Overflow</th>
+        </tr>
+        <tr>
+          <td id="{{service_name}}_queue_size">{{queue_size}}</td>
+          <td id="{{service_name}}_idle_threads">{{idle_threads}}</td>
+          <td id="{{service_name}}_mem_usage">{{mem_usage}}</td>
+          <td id="{{service_name}}_mem_peak">{{mem_peak}}</td>
+          <td id="{{service_name}}_rpcs_queue_overflow">{{rpcs_queue_overflow}}</td>
+        </tr>
+      </table>
+      <table class="table table-hover">
+        <tr>
+          <th>Incoming Queueing Time</th>
+          <td id="{{service_name}}_incoming_queue_time" colspan=2>{{incoming_queue_time}}
+          </td>
+        </tr>
+      </table>
+    </td>
+  </tr>
+  <tr>
+    <td>
+    {{#rpc_method_metrics}}
+      <table class="table table-bordered table-hover">
+        <tr><td colspan=2>
+          <strong>Method: <i>{{method_name}}</i></strong>
+        </td></tr>
+        <tr>
+          <td>Handler Latency</td>
+          <td class="{{method_name}}_handler_latency">{{handler_latency}}</td>
+        </tr>
+        <tr>
+          <td>Payload Size</td>
+          <td class="{{method_name}}_payload_size">{{payload_size}}</td>
+        </tr>
+      </table>
+    {{/rpc_method_metrics}}
+    </td>
+  </tr>
+  </tbody>
+</table>
+{{/services}}
+
+{{?servers}}
+<h2>Impala RPC Services
+  <button class="btn btn-warning btn-xs" onClick="reset_all();">
+    Reset all
+  </button>
+</h2>
+{{/servers}}
 {{#servers}}
 
 <h3><samp>{{name}} </samp>
@@ -83,44 +143,84 @@ function reset_server(server) {
   xhr.send();
 }
 
+// Update all metrics for services in "servers", which use Impala's old RPC layer.
+function update_impala_services(json) {
+  for (var i = 0; i < json["servers"].length; ++i) {
+    var tbl_json = json["servers"][i];
+    var table = document.getElementById(tbl_json["name"]);
+    if (!table) continue;
+    // Delete all existing rows, stopping at 1 to save the header
+    for (var j = table.rows.length - 1; j >= 1; --j) table.deleteRow(j);
+    tbl_json["methods"].forEach(function(method) {
+      var row = table.insertRow();
+      row.insertCell().innerHTML = "<samp>" + method.name + "</samp>";
+      row.insertCell().innerHTML = method.summary;
+      row.insertCell().innerHTML = method.in_flight;
+      var reset_cell = row.insertCell();
+      reset_cell.align = "center";
+      var button = document.createElement("button");
+      button.className = "btn btn-warning btn-xs";
+      button.appendChild(document.createTextNode("Reset"));
+      button.onclick = function() { reset_method(method.server_name, method.name); }
+      reset_cell.appendChild(button);
+    });
+  }
+}
+
+// Update all krpc metrics from the "services" member of "json".
+function update_krpc_services(json) {
+  // Update each service
+  for (var i = 0; i < json["services"].length; ++i) {
+    var svc_json = json["services"][i];
+    var svc_name = svc_json["service_name"];
+    var table = document.getElementById(svc_name + "_metrics");
+
+    // Skip updates for unknown services.
+    if (!table) continue;
+
+    // Update service metrics
+    var keys = ["queue_size", "idle_threads", "mem_usage", "mem_peak",
+        "rpcs_queue_overflow", "incoming_queue_time"];
+    for (var j = 0; j < keys.length; ++j) {
+      var key = keys[j];
+      var cell = document.getElementById(svc_name + "_" + key);
+      // Skip update for unknown values.
+      if (!cell) continue;
+      cell.innerHTML = svc_json[key];
+    }
+
+    // Update metrics for individual methods.
+    var num_methods = svc_json["rpc_method_metrics"].length;
+    for (var j = 0; j < num_methods; ++j) {
+      var method_json = svc_json["rpc_method_metrics"][j];
+      var method_name = method_json["method_name"];
+      // Update all metrics for this method.
+      var keys = ["handler_latency", "payload_size"];
+      for (var l = 0; l < keys.length; ++l) {
+        var key = keys[l];
+        var cell = $(table).find("." + method_name + "_" + key)[0];
+        // Skip update for unknown values.
+        if (!cell) continue;
+        cell.innerHTML = method_json[key];
+      }
+    }
+  }
+}
+
 function refresh() {
   var xhr = new XMLHttpRequest();
   xhr.responseType = 'text';
   xhr.timeout = 60000;
   xhr.onload = function(ignored_arg) {
-    if (xhr.status != 200) {
-      return;
-    }
+    if (xhr.status != 200) return;
     var blob = xhr.response;
     json = JSON.parse(blob);
-    for (var i = 0; i < json["servers"].length; ++i) {
-      var tbl_json = json["servers"][i];
-      var table = document.getElementById(tbl_json["name"]);
-      if (!table) continue;
-      // Delete all existing rows, stopping at 1 to save the header
-      for (var j = table.rows.length - 1; j >= 1; --j) {
-        table.deleteRow(j);
-      }
-      tbl_json["methods"].forEach(function(method) {
-        var row = table.insertRow();
-        row.insertCell().innerHTML = "<samp>" + method.name + "</samp>";
-        row.insertCell().innerHTML = method.summary;
-        row.insertCell().innerHTML = method.in_flight;
-        var reset_cell = row.insertCell();
-        reset_cell.align = "center";
-        var button = document.createElement("button");
-        button.className = "btn btn-warning btn-xs";
-        button.appendChild(document.createTextNode("Reset"));
-        button.onclick = function() { reset_method(method.server_name, method.name); }
-
-        reset_cell.appendChild(button);
-      });
-    }
-
+    update_impala_services(json);
+    update_krpc_services(json);
     document.getElementById("last-updated").textContent = new Date();
   }
 
-  xhr.ontimeout = function(){ }
+  xhr.ontimeout = function() {}
   xhr.open('GET', "/rpcz?json", true);
   xhr.send();
 }


[2/4] impala git commit: KUDU-2218. tls_socket: properly handle temporary socket errors in Writev

Posted by ta...@apache.org.
KUDU-2218. tls_socket: properly handle temporary socket errors in Writev

This fixes a bug which caused RaftConsensusITest.TestLargeBatches to
fail when run under stress, as in the following command line:

taskset -c 0-4 \
 build/latest/bin/raft_consensus-itest \
   --gtest_filter=\*LargeBat\* \
   --stress-cpu-threads=8

This would produce an error like:
Network error: failed to write to TLS socket: error:1409F07F:SSL routines:SSL3_WRITE_PENDING:bad write retry:s3_pkt.c:878

This means that we were retrying a write after getting EAGAIN, but with
a different buffer than the first time.

I tracked this down to mishandling of temporary socket errors in
TlsSocket::Writev(). In the case that we successfully write part of the
io vector but hit such an error trying to write a later element in the
vector, we were still propagating the error back up to the caller. The
caller didn't realize that part of the write was successful, and thus it
would retry the write from the beginning.

The fix is to fix the above, but also to enable partial writes in
TlsContext. The new test fails if either of the above two changes are
backed out.

Change-Id: If797f220f42bfb2e6f452b66f15e7a758e883472
Reviewed-on: http://gerrit.cloudera.org:8080/8570
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9361
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/678bf28e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/678bf28e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/678bf28e

Branch: refs/heads/master
Commit: 678bf28e233e667b05585110422762614840bdc2
Parents: baec8ca
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Nov 15 22:55:44 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 04:33:55 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/security/tls_context.cc |   2 +-
 be/src/kudu/security/tls_socket.cc  |  10 +-
 security/tls_socket-test.cc         | 277 +++++++++++++++++++++++--------
 3 files changed, 219 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/678bf28e/be/src/kudu/security/tls_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_context.cc b/be/src/kudu/security/tls_context.cc
index b6ec57f..f94e3d2 100644
--- a/be/src/kudu/security/tls_context.cc
+++ b/be/src/kudu/security/tls_context.cc
@@ -96,7 +96,7 @@ Status TlsContext::Init() {
   if (!ctx_) {
     return Status::RuntimeError("failed to create TLS context", GetOpenSSLErrors());
   }
-  SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY);
+  SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY | SSL_MODE_ENABLE_PARTIAL_WRITE);
 
   // Disable SSLv2 and SSLv3 which are vulnerable to various issues such as POODLE.
   // We support versions back to TLSv1.0 since OpenSSL on RHEL 6.4 and earlier does not

http://git-wip-us.apache.org/repos/asf/impala/blob/678bf28e/be/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index f725a49..e68cdce 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -25,6 +25,7 @@
 #include "kudu/security/cert.h"
 #include "kudu/security/openssl_util.h"
 #include "kudu/util/errno.h"
+#include "kudu/util/net/socket.h"
 
 namespace kudu {
 namespace security {
@@ -42,11 +43,11 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
   CHECK(ssl_);
   SCOPED_OPENSSL_NO_PENDING_ERRORS;
 
+  *nwritten = 0;
   if (PREDICT_FALSE(amt == 0)) {
     // Writing an empty buffer is a no-op. This happens occasionally, eg in the
     // case where the response has an empty sidecar. We have to special case
     // it, because SSL_write can return '0' to indicate certain types of errors.
-    *nwritten = 0;
     return Status::OK();
   }
 
@@ -61,7 +62,6 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
                                     ErrnoToString(save_errno), save_errno);
       }
       // Socket not ready to write yet.
-      *nwritten = 0;
       return Status::OK();
     }
     return Status::NetworkError("failed to write to TLS socket",
@@ -90,6 +90,12 @@ Status TlsSocket::Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritt
   }
   RETURN_NOT_OK(SetTcpCork(0));
   *nwritten = total_written;
+  // If we did manage to write something, but not everything, due to a temporary socket
+  // error, then we should still return an OK status indicating a successful _partial_
+  // write.
+  if (total_written > 0 && Socket::IsTemporarySocketError(write_status.posix_code())) {
+    return Status::OK();
+  }
   return write_status;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/678bf28e/security/tls_socket-test.cc
----------------------------------------------------------------------
diff --git a/security/tls_socket-test.cc b/security/tls_socket-test.cc
index a978e68..214d2bf 100644
--- a/security/tls_socket-test.cc
+++ b/security/tls_socket-test.cc
@@ -17,7 +17,11 @@
 
 #include "kudu/security/tls_handshake.h"
 
+#include <algorithm>
 #include <pthread.h>
+#include <sched.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
 
 #include <atomic>
 #include <csignal>
@@ -28,6 +32,7 @@
 #include <memory>
 #include <string>
 #include <thread>
+#include <vector>
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -38,6 +43,8 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -46,25 +53,27 @@
 using std::string;
 using std::thread;
 using std::unique_ptr;
-
+using std::vector;
 
 namespace kudu {
 namespace security {
 
+const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+
+// Size is big enough to not fit into output socket buffer of default size
+// (controlled by setsockopt() with SO_SNDBUF).
+constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
 
 class TlsSocketTest : public KuduTest {
  public:
   void SetUp() override {
     KuduTest::SetUp();
-
     ASSERT_OK(client_tls_.Init());
-    ASSERT_OK(server_tls_.Init());
-    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
   }
 
  protected:
+  void ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock);
   TlsContext client_tls_;
-  TlsContext server_tls_;
 };
 
 Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
@@ -101,19 +110,112 @@ Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
   return Status::OK();
 }
 
+void TlsSocketTest::ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock) {
+  unique_ptr<Socket> client_sock(new Socket());
+  ASSERT_OK(client_sock->Init(0));
+  ASSERT_OK(client_sock->Connect(addr));
+
+  TlsHandshake client;
+  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
+  ASSERT_OK(client.Finish(&client_sock));
+  *sock = std::move(client_sock);
+}
+
+class EchoServer {
+ public:
+  EchoServer()
+      : pthread_sync_(1) {
+  }
+  ~EchoServer() {
+    Stop();
+    Join();
+  }
+
+  void Start() {
+    ASSERT_OK(server_tls_.Init());
+    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+    ASSERT_OK(listen_addr_.ParseString("127.0.0.1", 0));
+    ASSERT_OK(listener_.Init(0));
+    ASSERT_OK(listener_.BindAndListen(listen_addr_, /*listen_queue_size=*/10));
+    ASSERT_OK(listener_.GetSocketAddress(&listen_addr_));
+
+    thread_ = thread([&] {
+        pthread_ = pthread_self();
+        pthread_sync_.CountDown();
+        unique_ptr<Socket> sock(new Socket());
+        Sockaddr remote;
+        CHECK_OK(listener_.Accept(sock.get(), &remote, /*flags=*/0));
+
+        TlsHandshake server;
+        CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+        CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
+        CHECK_OK(server.Finish(&sock));
+
+        CHECK_OK(sock->SetRecvTimeout(kTimeout));
+        unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+        // An "echo" loop for kEchoChunkSize byte buffers.
+        while (!stop_) {
+          size_t n;
+          Status s = sock->BlockingRecv(buf.get(), kEchoChunkSize, &n, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error reading: " << s.ToString();
+          }
+
+          LOG(INFO) << "server echoing " << n << " bytes";
+          size_t written;
+          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error writing: " << s.ToString();
+          }
+          if (slow_read_) {
+            SleepFor(MonoDelta::FromMilliseconds(10));
+          }
+        }
+      });
+  }
+
+  void EnableSlowRead() {
+    slow_read_ = true;
+  }
+
+  const Sockaddr& listen_addr() const {
+    return listen_addr_;
+  }
+
+  bool stopped() const {
+    return stop_;
+  }
+
+  void Stop() {
+    stop_ = true;
+  }
+  void Join() {
+    thread_.join();
+  }
+
+  const pthread_t& pthread() {
+    pthread_sync_.Wait();
+    return pthread_;
+  }
+
+ private:
+  TlsContext server_tls_;
+  Socket listener_;
+  Sockaddr listen_addr_;
+  thread thread_;
+  pthread_t pthread_;
+  CountDownLatch pthread_sync_;
+  std::atomic<bool> stop_ { false };
+
+  bool slow_read_ = false;
+};
+
 void handler(int /* signal */) {}
 
 // Test for failures to handle EINTR during TLS connection
 // negotiation and data send/receive.
 TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
-  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
-  Sockaddr listen_addr;
-  ASSERT_OK(listen_addr.ParseString("127.0.0.1", 0));
-  Socket listener;
-  ASSERT_OK(listener.Init(0));
-  ASSERT_OK(listener.BindAndListen(listen_addr, /*listen_queue_size=*/10));
-  ASSERT_OK(listener.GetSocketAddress(&listen_addr));
-
   // Set up a no-op signal handler for SIGUSR2.
   struct sigaction sa, sa_old;
   memset(&sa, 0, sizeof(sa));
@@ -121,76 +223,117 @@ TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
   sigaction(SIGUSR2, &sa, &sa_old);
   SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
 
-  // Size is big enough to not fit into output socket buffer of default size
-  // (controlled by setsockopt() with SO_SNDBUF).
-  constexpr size_t kSize = 32 * 1024 * 1024;
-
-  pthread_t server_tid;
-  CountDownLatch server_tid_sync(1);
-  std::atomic<bool> stop { false };
-  thread server([&] {
-      server_tid = pthread_self();
-      server_tid_sync.CountDown();
-      unique_ptr<Socket> sock(new Socket());
-      Sockaddr remote;
-      CHECK_OK(listener.Accept(sock.get(), &remote, /*flags=*/0));
-
-      TlsHandshake server;
-      CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
-      CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
-      CHECK_OK(server.Finish(&sock));
-
-      CHECK_OK(sock->SetRecvTimeout(kTimeout));
-      unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
-      // An "echo" loop for kSize byte buffers.
-      while (!stop) {
-        size_t n;
-        Status s = sock->BlockingRecv(buf.get(), kSize, &n, MonoTime::Now() + kTimeout);
-        if (s.ok()) {
-          size_t written;
-          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
-        }
-        if (!s.ok()) {
-          CHECK(stop) << "unexpected error: " << s.ToString();
-        }
-      }
-    });
-  SCOPED_CLEANUP({ server.join(); });
+  EchoServer server;
+  NO_FATALS(server.Start());
 
   // Start a thread to send signals to the server thread.
   thread killer([&]() {
-    server_tid_sync.Wait();
-    while (!stop) {
-      PCHECK(pthread_kill(server_tid, SIGUSR2) == 0);
-      SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
-    }
-  });
+      while (!server.stopped()) {
+        PCHECK(pthread_kill(server.pthread(), SIGUSR2) == 0);
+        SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
+      }
+    });
   SCOPED_CLEANUP({ killer.join(); });
 
-  unique_ptr<Socket> client_sock(new Socket());
-  ASSERT_OK(client_sock->Init(0));
-  ASSERT_OK(client_sock->Connect(listen_addr));
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
 
-  TlsHandshake client;
-  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
-  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
-  ASSERT_OK(client.Finish(&client_sock));
-
-  unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
   for (int i = 0; i < 10; i++) {
     SleepFor(MonoDelta::FromMilliseconds(1));
     size_t nwritten;
-    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kSize, &nwritten,
+    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
         MonoTime::Now() + kTimeout));
     size_t n;
-    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kSize, &n,
+    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &n,
         MonoTime::Now() + kTimeout));
   }
-  stop = true;
+  server.Stop();
   ASSERT_OK(client_sock->Close());
-
   LOG(INFO) << "client done";
 }
 
+// Return an iovec containing the same data as the buffer 'buf' with the length 'len',
+// but split into random-sized chunks. The chunks are sized randomly between 1 and
+// 'max_chunk_size' bytes.
+vector<struct iovec> ChunkIOVec(Random* rng, uint8_t* buf, int len, int max_chunk_size) {
+  vector<struct iovec> ret;
+  uint8_t* p = buf;
+  int rem = len;
+  while (rem > 0) {
+    int len = rng->Uniform(max_chunk_size) + 1;
+    len = std::min(len, rem);
+    ret.push_back({p, static_cast<size_t>(len)});
+    p += len;
+    rem -= len;
+  }
+  return ret;
+}
+
+// Regression test for KUDU-2218, a bug in which Writev would improperly handle
+// partial writes in non-blocking mode.
+TEST_F(TlsSocketTest, TestNonBlockingWritev) {
+  Random rng(GetRandomSeed32());
+
+  EchoServer server;
+  server.EnableSlowRead();
+  NO_FATALS(server.Start());
+
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+
+  int sndbuf = 16 * 1024;
+  CHECK_ERR(setsockopt(client_sock->GetFd(), SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)));
+
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+  unique_ptr<uint8_t[]> rbuf(new uint8_t[kEchoChunkSize]);
+  RandomString(buf.get(), kEchoChunkSize, &rng);
+
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(client_sock->SetNonBlocking(true));
+
+    // Prepare an IOV with the input data split into a bunch of randomly-sized
+    // chunks.
+    vector<struct iovec> iov = ChunkIOVec(&rng, buf.get(), kEchoChunkSize, 1024 * 1024);
+
+    // Loop calling writev until the iov is exhausted
+    int rem = kEchoChunkSize;
+    while (rem > 0) {
+      CHECK(!iov.empty()) << rem;
+      int32_t n;
+      Status s = client_sock->Writev(&iov[0], iov.size(), &n);
+      if (Socket::IsTemporarySocketError(s.posix_code())) {
+        sched_yield();
+        continue;
+      }
+      ASSERT_OK(s);
+      rem -= n;
+      ASSERT_GE(n, 0);
+      while (n > 0) {
+        if (n < iov[0].iov_len) {
+          iov[0].iov_len -= n;
+          iov[0].iov_base = reinterpret_cast<uint8_t*>(iov[0].iov_base) + n;
+          n = 0;
+        } else {
+          n -= iov[0].iov_len;
+          iov.erase(iov.begin());
+        }
+      }
+    }
+    LOG(INFO) << "client waiting";
+
+    size_t n;
+    ASSERT_OK(client_sock->SetNonBlocking(false));
+    ASSERT_OK(client_sock->BlockingRecv(rbuf.get(), kEchoChunkSize, &n,
+        MonoTime::Now() + kTimeout));
+    LOG(INFO) << "client got response";
+
+    ASSERT_EQ(0, memcmp(buf.get(), rbuf.get(), kEchoChunkSize));
+  }
+
+  server.Stop();
+  ASSERT_OK(client_sock->Close());
+}
+
 } // namespace security
 } // namespace kudu