You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/12/12 16:54:26 UTC

[impala] branch master updated (6bfa86f -> 313d758)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 6bfa86f  IMPALA-6894: Use an internal representation of query states in ClientRequestState
     new acddf76  [DOCS] Impala is not optimized for the IN operator when accessing HBASE
     new c7d2af1  IMPALA-9122 : Ignore FileNotFoundException when loading a table
     new 313d758  IMPALA-9235: add more per-connection stats to /rpcz

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/kudu/rpc/connection.cc                      | 226 ++++++++++++++++++++-
 be/src/kudu/rpc/connection.h                       |   7 +-
 be/src/kudu/rpc/inbound_call.cc                    |   2 +-
 be/src/kudu/rpc/inbound_call.h                     |   4 +-
 be/src/kudu/rpc/messenger.cc                       |   6 +-
 be/src/kudu/rpc/messenger.h                        |  10 +-
 be/src/kudu/rpc/outbound_call.cc                   |   2 +-
 be/src/kudu/rpc/outbound_call.h                    |   4 +-
 be/src/kudu/rpc/reactor.cc                         |  12 +-
 be/src/kudu/rpc/reactor.h                          |  12 +-
 be/src/kudu/rpc/rpc_introspection.proto            |  34 +++-
 be/src/kudu/rpc/rpc_stub-test.cc                   |   8 +-
 be/src/rpc/rpc-mgr.cc                              |  67 +++++-
 be/src/util/CMakeLists.txt                         |   1 +
 be/src/util/json-util.cc                           | 164 +++++++++++++++
 be/src/util/json-util.h                            |  20 +-
 docs/topics/impala_hbase.xml                       |  31 +--
 .../org/apache/impala/common/FileSystemUtil.java   |  21 +-
 tests/custom_cluster/test_event_processing.py      |   2 +-
 tests/webserver/test_web_pages.py                  |  36 +++-
 www/rpcz.tmpl                                      | 106 +++++++++-
 21 files changed, 699 insertions(+), 76 deletions(-)
 create mode 100644 be/src/util/json-util.cc


[impala] 02/03: IMPALA-9122 : Ignore FileNotFoundException when loading a table

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c7d2af1bd7320cdbc9b738350cb9a038467dc493
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed Nov 27 10:37:17 2019 -0800

    IMPALA-9122 : Ignore FileNotFoundException when loading a table
    
    It is possible that when the file metadata of a table or partition is
    being loaded, some temporary files (like the ones in .hive-staging
    directory) are deleted by external engines like Hive. This causes a
    FileNotFoundException during the load and it fails the reload command.
    In general, this should not be a problem since users are careful not to
    modify the table from Hive or Spark while Impala is reading them. In
    the worst case, currently the refresh command fails which can be
    retried by the user. However, this does not go well with when event
    processing is turned on. EventProcessor tries to reload the table as
    soon as it sees a INSERT_EVENT from metastore. Hive may be still
    cleaning up the staging directories when EventProcessor issues a reload
    causing it go in error state.
    
    Ideally, we should have some sort of intra-engine synchronization
    semantics to avoid such issues, but that is much more complex
    architectural change. For now, we should ignore such errors and skip
    the deleted file from being loaded.
    
    Testing: Unfortunately, this error is hard to reproduce locally. I
    tried creating multiple threads which delete some files while multiple
    FileMetadataLoaders are loading concurrently but it didn't fail for me.
    Ran TestEventProcessing.test_insert_events in a loop for more than an
    hour and didn't see any failure.
    
    Change-Id: Iecf6b193b0d57de27d41ad6ef6e1719005d9e908
    Reviewed-on: http://gerrit.cloudera.org:8080/14806
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/common/FileSystemUtil.java    | 21 ++++++++++++++++-----
 tests/custom_cluster/test_event_processing.py       |  2 +-
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index f2de972..7eccd13 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -676,9 +676,20 @@ public class FileSystemUtil {
       // we don't need to do anything (extra calls to hasNext() must not affect
       // state)
       while (curFile_ == null) {
-        if (!baseIterator_.hasNext()) return false;
-        // if the next fileStatus is in ignored directory skip it
-        FileStatus next = baseIterator_.next();
+        FileStatus next;
+        try {
+          if (!baseIterator_.hasNext()) return false;
+          // if the next fileStatus is in ignored directory skip it
+           next = baseIterator_.next();
+        } catch (FileNotFoundException ex) {
+          // in case of concurrent operations by multiple engines it is possible that
+          // some temporary files are deleted while Impala is loading the table. For
+          // instance, hive deletes the temporary files in the .hive-staging directory
+          // after an insert query from Hive completes. If we are loading the table at
+          // the same time, we may get a FileNotFoundException which is safe to ignore.
+          LOG.warn(ex.getMessage());
+          continue;
+        }
         if (!isInIgnoredDirectory(startPath_, next)) {
           curFile_ = next;
           return true;
@@ -702,13 +713,13 @@ public class FileSystemUtil {
    * Iterator which recursively visits directories on a FileSystem, yielding
    * files in an unspecified order.
    */
-  static class RecursingIterator implements RemoteIterator<FileStatus> {
+  private static class RecursingIterator implements RemoteIterator<FileStatus> {
     private final FileSystem fs_;
     private final Stack<RemoteIterator<FileStatus>> iters_ = new Stack<>();
     private RemoteIterator<FileStatus> curIter_;
     private FileStatus curFile_;
 
-    RecursingIterator(FileSystem fs, Path startPath) throws IOException {
+    private RecursingIterator(FileSystem fs, Path startPath) throws IOException {
       this.fs_ = Preconditions.checkNotNull(fs);
       curIter_ = fs.listStatusIterator(Preconditions.checkNotNull(startPath));
     }
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 432d421..0b7103b 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -69,7 +69,7 @@ class TestEventProcessing(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
   @SkipIfHive2.acid
-  def test_insert_events_transactional(self):
+  def test_transactional_insert_events(self):
     """Executes 'run_test_insert_events' for transactional tables.
     """
     self.run_test_insert_events(is_transactional=True)


[impala] 03/03: IMPALA-9235: add more per-connection stats to /rpcz

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 313d758a89f389dc69ef5b8b3e8a07cf27ec2105
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Dec 10 16:03:28 2019 -0800

    IMPALA-9235: add more per-connection stats to /rpcz
    
    This backports commit 0f6d33b4a29873197952335a5777ccf9163fc307
    from Kudu and makes corresponding Impala changes.
    
    It also exposes some additional information returned by KRPC,
    including per-connection information for inbound connections
    and in-flight RPCs. Some of these are in the JSON only, others
    are exposed in the data tables on the debug page.
    
    This adds some Protobuf -> JSON utilities similar to those
    used in Kudu, except instead of outputting strings, they
    append to rapidjson documents.
    
    Testing:
    Added a sanity test to test_web_pages.
    
    Manually checked the /rpcz page when running queries.
    
    Backport notes:
    Mostly this was a clean cherry-pick. I omitted the changes
    to be/src/kudu/rpc/rpc-test.cc, since those seem to depend
    on previous Kudu changes and we don't run that test anyway.
    I also omitted the changes to Kudu's server code, which
    we don't have copied here.
    
    The Kudu commit message is reproduced here:
    =========================================
    rpc: add TCP socket statistics to /rpcz
    
    This adds the ability to fetch various bits of socket-level information
    for each RPC connection and publish the info into /rpcz. The information
    itself is fetched using getsockopt(TCP_INFO) as well as ioctls to check
    the current send and receive queue lengths.
    
    This data can help resolve whether a use case is network bound or bound
    by the application itself. For example, a high number of retransmitted
    packets can indicate that the network path to the receiver is
    overloaded.
    
    Eventually we may want to expose some of this information on a per-call
    basis. However, doing so is quite tricky, since 'send()' completes when
    the data has been placed into the outbound packet queue and doesn't wait
    until the data is ACKed. We'd need to defer checking for retransmissions
    until all of the data has been ACKed, which is at some indeterminate
    point in the future. The very newest kernels allow subscribing to such
    notifications (along with lots of interesting stats) but, given none of
    that is available in el7, it's probably not worth tackling at this
    point.
    =========================================
    
    Change-Id: I3696463e22123fe81073af4aa495a96b7d4f7ee2
    Reviewed-on: http://gerrit.cloudera.org:8080/14884
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/kudu/rpc/connection.cc           | 226 +++++++++++++++++++++++++++++++-
 be/src/kudu/rpc/connection.h            |   7 +-
 be/src/kudu/rpc/inbound_call.cc         |   2 +-
 be/src/kudu/rpc/inbound_call.h          |   4 +-
 be/src/kudu/rpc/messenger.cc            |   6 +-
 be/src/kudu/rpc/messenger.h             |  10 +-
 be/src/kudu/rpc/outbound_call.cc        |   2 +-
 be/src/kudu/rpc/outbound_call.h         |   4 +-
 be/src/kudu/rpc/reactor.cc              |  12 +-
 be/src/kudu/rpc/reactor.h               |  12 +-
 be/src/kudu/rpc/rpc_introspection.proto |  34 ++++-
 be/src/kudu/rpc/rpc_stub-test.cc        |   8 +-
 be/src/rpc/rpc-mgr.cc                   |  67 ++++++++--
 be/src/util/CMakeLists.txt              |   1 +
 be/src/util/json-util.cc                | 164 +++++++++++++++++++++++
 be/src/util/json-util.h                 |  20 ++-
 tests/webserver/test_web_pages.py       |  36 ++++-
 www/rpcz.tmpl                           | 106 ++++++++++++++-
 18 files changed, 666 insertions(+), 55 deletions(-)

diff --git a/be/src/kudu/rpc/connection.cc b/be/src/kudu/rpc/connection.cc
index c9a7576..2b79464 100644
--- a/be/src/kudu/rpc/connection.cc
+++ b/be/src/kudu/rpc/connection.cc
@@ -17,6 +17,9 @@
 
 #include "kudu/rpc/connection.h"
 
+#include <netinet/in.h>
+#include <string.h>
+
 #include <algorithm>
 #include <cerrno>
 #include <iostream>
@@ -31,7 +34,6 @@
 #include <glog/logging.h>
 
 #include "kudu/gutil/map-util.h"
-#include "kudu/util/slice.h"
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/inbound_call.h"
@@ -44,8 +46,15 @@
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+#ifdef __linux__
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <linux/tcp.h>
+#endif
+
 using std::includes;
 using std::set;
 using std::shared_ptr;
@@ -57,6 +66,132 @@ namespace rpc {
 
 typedef OutboundCall::Phase Phase;
 
+namespace {
+
+// tcp_info struct duplicated from linux/tcp.h.
+//
+// This allows us to decouple the compile-time Linux headers from the
+// runtime Linux kernel. The compile-time headers (and kernel) might be
+// older than the runtime kernel, in which case an ifdef-based approach
+// wouldn't allow us to get all of the info available.
+//
+// NOTE: this struct has been annotated with some local notes about the
+// contents of each field.
+struct tcp_info {
+  // Various state-tracking information.
+  // ------------------------------------------------------------
+  uint8_t    tcpi_state;
+  uint8_t    tcpi_ca_state;
+  uint8_t    tcpi_retransmits;
+  uint8_t    tcpi_probes;
+  uint8_t    tcpi_backoff;
+  uint8_t    tcpi_options;
+  uint8_t    tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
+  uint8_t    tcpi_delivery_rate_app_limited:1;
+
+  // Configurations.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_rto;
+  uint32_t   tcpi_ato;
+  uint32_t   tcpi_snd_mss;
+  uint32_t   tcpi_rcv_mss;
+
+  // Counts of packets in various states in the outbound queue.
+  // At first glance one might think these are monotonic counters, but
+  // in fact they are instantaneous counts of queued packets and thus
+  // not very useful for our purposes.
+  // ------------------------------------------------------------
+  // Number of packets outstanding that haven't been acked.
+  uint32_t   tcpi_unacked;
+
+  // Number of packets outstanding that have been selective-acked.
+  uint32_t   tcpi_sacked;
+
+  // Number of packets outstanding that have been deemed lost (a SACK arrived
+  // for a later packet)
+  uint32_t   tcpi_lost;
+
+  // Number of packets in the queue that have been retransmitted.
+  uint32_t   tcpi_retrans;
+
+  // The number of packets towards the highest SACKed sequence number
+  // (some measure of reording, removed in later Linux versions by
+  // 737ff314563ca27f044f9a3a041e9d42491ef7ce)
+  uint32_t   tcpi_fackets;
+
+  // Times when various events occurred.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_last_data_sent;
+  uint32_t   tcpi_last_ack_sent;     /* Not remembered, sorry. */
+  uint32_t   tcpi_last_data_recv;
+  uint32_t   tcpi_last_ack_recv;
+
+  // Path MTU.
+  uint32_t   tcpi_pmtu;
+
+  // Receiver slow start threshold.
+  uint32_t   tcpi_rcv_ssthresh;
+
+  // Smoothed RTT estimate and variance based on the time between sending data and receiving
+  // corresponding ACK. See https://tools.ietf.org/html/rfc2988 for details.
+  uint32_t   tcpi_rtt;
+  uint32_t   tcpi_rttvar;
+
+  // Slow start threshold.
+  uint32_t   tcpi_snd_ssthresh;
+  // Sender congestion window (in number of MSS-sized packets)
+  uint32_t   tcpi_snd_cwnd;
+  // Advertised MSS.
+  uint32_t   tcpi_advmss;
+  // Amount of packet reordering allowed.
+  uint32_t   tcpi_reordering;
+
+  // Receiver-side RTT estimate per the Dynamic Right Sizing algorithm:
+  //
+  // "A system that is only transmitting acknowledgements can still estimate the round-trip
+  // time by observing the time between when a byte is first acknowledged and the receipt of
+  // data that is at least one window beyond the sequence number that was acknowledged. If the
+  // sender is being throttled by the network, this estimate will be valid. However, if the
+  // sending application did not have any data to send, the measured time could be much larger
+  // than the actual round-trip time. Thus this measurement acts only as an upper-bound on the
+  // round-trip time and should be be used only when it is the only source of round-trip time
+  // information."
+  uint32_t   tcpi_rcv_rtt;
+  uint32_t   tcpi_rcv_space;
+
+  // Total number of retransmitted packets.
+  uint32_t   tcpi_total_retrans;
+
+  // Pacing-related metrics.
+  uint64_t   tcpi_pacing_rate;
+  uint64_t   tcpi_max_pacing_rate;
+
+  // Total bytes ACKed by remote peer.
+  uint64_t   tcpi_bytes_acked;    /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
+  // Total bytes received (for which ACKs have been sent out).
+  uint64_t   tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */
+  // Segments sent and received.
+  uint32_t   tcpi_segs_out;       /* RFC4898 tcpEStatsPerfSegsOut */
+  uint32_t   tcpi_segs_in;        /* RFC4898 tcpEStatsPerfSegsIn */
+
+  // The following metrics are quite new and not in el7.
+  // ------------------------------------------------------------
+  uint32_t   tcpi_notsent_bytes;
+  uint32_t   tcpi_min_rtt;
+  uint32_t   tcpi_data_segs_in;      /* RFC4898 tcpEStatsDataSegsIn */
+  uint32_t   tcpi_data_segs_out;     /* RFC4898 tcpEStatsDataSegsOut */
+
+  // Calculated rate at which data was delivered.
+  uint64_t   tcpi_delivery_rate;
+
+  // Timers for various states.
+  uint64_t   tcpi_busy_time;      /* Time (usec) busy sending data */
+  uint64_t   tcpi_rwnd_limited;   /* Time (usec) limited by receive window */
+  uint64_t   tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
+};
+
+} // anonymous namespace
+
 ///
 /// Connection
 ///
@@ -736,7 +871,7 @@ void Connection::MarkNegotiationComplete() {
   negotiation_complete_ = true;
 }
 
-Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
+Status Connection::DumpPB(const DumpConnectionsRequestPB& req,
                           RpcConnectionPB* resp) {
   DCHECK(reactor_thread_->IsCurrentThread());
   resp->set_remote_ip(remote_.ToString());
@@ -768,8 +903,95 @@ Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
   } else {
     LOG(FATAL);
   }
+#ifdef __linux__
+  if (negotiation_complete_) {
+    // TODO(todd): it's a little strange to not set socket level stats during
+    // negotiation, but we don't have access to the socket here until negotiation
+    // is complete.
+    WARN_NOT_OK(GetSocketStatsPB(resp->mutable_socket_stats()),
+                "could not fill in TCP info for RPC connection");
+  }
+#endif // __linux__
+  return Status::OK();
+}
+
+#ifdef __linux__
+Status Connection::GetSocketStatsPB(SocketStatsPB* pb) const {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  int fd = socket_->GetFd();
+  CHECK_GE(fd, 0);
+
+  // Fetch TCP_INFO statistics from the kernel.
+  tcp_info ti;
+  memset(&ti, 0, sizeof(ti));
+  socklen_t len = sizeof(ti);
+  int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &ti, &len);
+  if (rc == 0) {
+#   define HAS_FIELD(field_name) \
+        (len >= offsetof(tcp_info, field_name) + sizeof(ti.field_name))
+    if (!HAS_FIELD(tcpi_total_retrans)) {
+      // All the fields up through tcpi_total_retrans were present since very old
+      // kernel versions, beyond our minimal supported. So, we can just bail if we
+      // don't get sufficient data back.
+      return Status::NotSupported("bad length returned for TCP_INFO");
+    }
+
+    pb->set_rtt(ti.tcpi_rtt);
+    pb->set_rttvar(ti.tcpi_rttvar);
+    pb->set_snd_cwnd(ti.tcpi_snd_cwnd);
+    pb->set_total_retrans(ti.tcpi_total_retrans);
+
+    // The following fields were added later in kernel development history.
+    // In RHEL6 they were backported starting in 6.8. Even though they were
+    // backported all together as a group, we'll just be safe and check for
+    // each individually.
+    if (HAS_FIELD(tcpi_pacing_rate)) {
+      pb->set_pacing_rate(ti.tcpi_pacing_rate);
+    }
+    if (HAS_FIELD(tcpi_max_pacing_rate)) {
+      pb->set_max_pacing_rate(ti.tcpi_max_pacing_rate);
+    }
+    if (HAS_FIELD(tcpi_bytes_acked)) {
+      pb->set_bytes_acked(ti.tcpi_bytes_acked);
+    }
+    if (HAS_FIELD(tcpi_bytes_received)) {
+      pb->set_bytes_received(ti.tcpi_bytes_received);
+    }
+    if (HAS_FIELD(tcpi_segs_out)) {
+      pb->set_segs_out(ti.tcpi_segs_out);
+    }
+    if (HAS_FIELD(tcpi_segs_in)) {
+      pb->set_segs_in(ti.tcpi_segs_in);
+    }
+
+    // Calculate sender bandwidth based on the same logic used by the 'ss' utility.
+    if (ti.tcpi_rtt > 0 && ti.tcpi_snd_mss && ti.tcpi_snd_cwnd) {
+      // Units:
+      //  rtt = usec
+      //  cwnd = number of MSS-size packets
+      //  mss = bytes / packet
+      //
+      // Dimensional analysis:
+      //   packets * bytes/packet * usecs/sec / usec -> bytes/sec
+      static constexpr int kUsecsPerSec = 1000000;
+      pb->set_send_bytes_per_sec(static_cast<int64_t>(ti.tcpi_snd_cwnd) *
+                                 ti.tcpi_snd_mss * kUsecsPerSec / ti.tcpi_rtt);
+    }
+  }
+
+  // Fetch the queue sizes.
+  int queue_len = 0;
+  rc = ioctl(fd, TIOCOUTQ, &queue_len);
+  if (rc == 0) {
+    pb->set_send_queue_bytes(queue_len);
+  }
+  rc = ioctl(fd, FIONREAD, &queue_len);
+  if (rc == 0) {
+    pb->set_receive_queue_bytes(queue_len);
+  }
   return Status::OK();
 }
+#endif // __linux__
 
 } // namespace rpc
 } // namespace kudu
diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h
index 84b325d..8f80b5a 100644
--- a/be/src/kudu/rpc/connection.h
+++ b/be/src/kudu/rpc/connection.h
@@ -50,12 +50,13 @@ namespace kudu {
 
 namespace rpc {
 
-class DumpRunningRpcsRequestPB;
+class DumpConnectionsRequestPB;
 class InboundCall;
 class OutboundCall;
 class RpcConnectionPB;
 class ReactorThread;
 class RpczStore;
+class SocketStatsPB;
 enum class CredentialsPolicy;
 
 //
@@ -200,7 +201,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Indicate that negotiation is complete and that the Reactor is now in control of the socket.
   void MarkNegotiationComplete();
 
-  Status DumpPB(const DumpRunningRpcsRequestPB& req,
+  Status DumpPB(const DumpConnectionsRequestPB& req,
                 RpcConnectionPB* resp);
 
   ReactorThread* reactor_thread() const { return reactor_thread_; }
@@ -305,6 +306,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
   void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);
 
+  Status GetSocketStatsPB(SocketStatsPB* pb) const;
+
   // The reactor thread that created this connection.
   ReactorThread* const reactor_thread_;
 
diff --git a/be/src/kudu/rpc/inbound_call.cc b/be/src/kudu/rpc/inbound_call.cc
index 6920071..655c453 100644
--- a/be/src/kudu/rpc/inbound_call.cc
+++ b/be/src/kudu/rpc/inbound_call.cc
@@ -250,7 +250,7 @@ string InboundCall::ToString() const {
                       header_.call_id());
 }
 
-void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+void InboundCall::DumpPB(const DumpConnectionsRequestPB& req,
                          RpcCallInProgressPB* resp) {
   resp->mutable_header()->CopyFrom(header_);
   if (req.include_traces() && trace_) {
diff --git a/be/src/kudu/rpc/inbound_call.h b/be/src/kudu/rpc/inbound_call.h
index 0db4c37..07c57dc 100644
--- a/be/src/kudu/rpc/inbound_call.h
+++ b/be/src/kudu/rpc/inbound_call.h
@@ -54,7 +54,7 @@ class Trace;
 namespace rpc {
 
 class Connection;
-class DumpRunningRpcsRequestPB;
+class DumpConnectionsRequestPB;
 class RemoteUser;
 class RpcCallInProgressPB;
 class RpcSidecar;
@@ -135,7 +135,7 @@ class InboundCall {
 
   std::string ToString() const;
 
-  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+  void DumpPB(const DumpConnectionsRequestPB& req, RpcCallInProgressPB* resp);
 
   const RemoteUser& remote_user() const;
 
diff --git a/be/src/kudu/rpc/messenger.cc b/be/src/kudu/rpc/messenger.cc
index 17ac0c5..4129172 100644
--- a/be/src/kudu/rpc/messenger.cc
+++ b/be/src/kudu/rpc/messenger.cc
@@ -449,11 +449,11 @@ Status Messenger::Init() {
   return Status::OK();
 }
 
-Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                                  DumpRunningRpcsResponsePB* resp) {
+Status Messenger::DumpConnections(const DumpConnectionsRequestPB& req,
+                                  DumpConnectionsResponsePB* resp) {
   shared_lock<rw_spinlock> guard(lock_.get_lock());
   for (Reactor* reactor : reactors_) {
-    RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp));
+    RETURN_NOT_OK(reactor->DumpConnections(req, resp));
   }
   return Status::OK();
 }
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
index b3a78e0..56b087c 100644
--- a/be/src/kudu/rpc/messenger.h
+++ b/be/src/kudu/rpc/messenger.h
@@ -60,8 +60,8 @@ using security::RpcAuthentication;
 using security::RpcEncryption;
 
 class AcceptorPool;
-class DumpRunningRpcsRequestPB;
-class DumpRunningRpcsResponsePB;
+class DumpConnectionsRequestPB;
+class DumpConnectionsResponsePB;
 class InboundCall;
 class Messenger;
 class OutboundCall;
@@ -265,9 +265,9 @@ class Messenger {
   // Take ownership of the socket via Socket::Release
   void RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote);
 
-  // Dump the current RPCs into the given protobuf.
-  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                         DumpRunningRpcsResponsePB* resp);
+  // Dump info on related TCP connections into the given protobuf.
+  Status DumpConnections(const DumpConnectionsRequestPB& req,
+                         DumpConnectionsResponsePB* resp);
 
   // Run 'func' on a reactor thread after 'when' time elapses.
   //
diff --git a/be/src/kudu/rpc/outbound_call.cc b/be/src/kudu/rpc/outbound_call.cc
index 37d02ac..17761f5 100644
--- a/be/src/kudu/rpc/outbound_call.cc
+++ b/be/src/kudu/rpc/outbound_call.cc
@@ -449,7 +449,7 @@ string OutboundCall::ToString() const {
   return Substitute("RPC call $0 -> $1", remote_method_.ToString(), conn_id_.ToString());
 }
 
-void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+void OutboundCall::DumpPB(const DumpConnectionsRequestPB& req,
                           RpcCallInProgressPB* resp) {
   std::lock_guard<simple_spinlock> l(lock_);
   resp->mutable_header()->CopyFrom(header_);
diff --git a/be/src/kudu/rpc/outbound_call.h b/be/src/kudu/rpc/outbound_call.h
index 8d43891..c48e496 100644
--- a/be/src/kudu/rpc/outbound_call.h
+++ b/be/src/kudu/rpc/outbound_call.h
@@ -55,7 +55,7 @@ namespace kudu {
 namespace rpc {
 
 class CallResponse;
-class DumpRunningRpcsRequestPB;
+class DumpConnectionsRequestPB;
 class RpcCallInProgressPB;
 class RpcController;
 class RpcSidecar;
@@ -154,7 +154,7 @@ class OutboundCall {
 
   std::string ToString() const;
 
-  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+  void DumpPB(const DumpConnectionsRequestPB& req, RpcCallInProgressPB* resp);
 
   ////////////////////////////////////////////////////////////
   // Getters
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
index fd88afb..a3e56f7 100644
--- a/be/src/kudu/rpc/reactor.cc
+++ b/be/src/kudu/rpc/reactor.cc
@@ -290,8 +290,8 @@ Status ReactorThread::GetMetrics(ReactorMetrics* metrics) {
   return Status::OK();
 }
 
-Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                                      DumpRunningRpcsResponsePB* resp) {
+Status ReactorThread::DumpConnections(const DumpConnectionsRequestPB& req,
+                                      DumpConnectionsResponsePB* resp) {
   DCHECK(IsCurrentThread());
   for (const scoped_refptr<Connection>& conn : server_conns_) {
     RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
@@ -831,10 +831,10 @@ Status Reactor::RunOnReactorThread(const boost::function<Status()>& f) {
   return task.Wait();
 }
 
-Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                                DumpRunningRpcsResponsePB* resp) {
-  return RunOnReactorThread(boost::bind(&ReactorThread::DumpRunningRpcs, &thread_,
-                                        boost::ref(req), resp));
+Status Reactor::DumpConnections(const DumpConnectionsRequestPB& req,
+                                DumpConnectionsResponsePB* resp) {
+  return RunOnReactorThread(boost::bind(&ReactorThread::DumpConnections,
+                                        &thread_, boost::ref(req), resp));
 }
 
 class RegisterConnectionTask : public ReactorTask {
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
index e01f71e..4ab7ef6 100644
--- a/be/src/kudu/rpc/reactor.h
+++ b/be/src/kudu/rpc/reactor.h
@@ -50,8 +50,8 @@ namespace rpc {
 
 typedef std::list<scoped_refptr<Connection>> conn_list_t;
 
-class DumpRunningRpcsRequestPB;
-class DumpRunningRpcsResponsePB;
+class DumpConnectionsRequestPB;
+class DumpConnectionsResponsePB;
 class OutboundCall;
 class Reactor;
 class ReactorThread;
@@ -150,8 +150,8 @@ class ReactorThread {
   Status Init();
 
   // Add any connections on this reactor thread into the given status dump.
-  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                         DumpRunningRpcsResponsePB* resp);
+  Status DumpConnections(const DumpConnectionsRequestPB& req,
+                         DumpConnectionsResponsePB* resp);
 
   // Shuts down a reactor thread, optionally waiting for it to exit.
   // Reactor::Shutdown() must have been called already.
@@ -360,8 +360,8 @@ class Reactor {
   Status GetMetrics(ReactorMetrics *metrics);
 
   // Add any connections on this reactor thread into the given status dump.
-  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
-                         DumpRunningRpcsResponsePB* resp);
+  Status DumpConnections(const DumpConnectionsRequestPB& req,
+                         DumpConnectionsResponsePB* resp);
 
   // Queue a new incoming connection. Takes ownership of the underlying fd from
   // 'socket', but not the Socket object itself.
diff --git a/be/src/kudu/rpc/rpc_introspection.proto b/be/src/kudu/rpc/rpc_introspection.proto
index 7685903..05be722 100644
--- a/be/src/kudu/rpc/rpc_introspection.proto
+++ b/be/src/kudu/rpc/rpc_introspection.proto
@@ -50,6 +50,33 @@ message RpcCallInProgressPB {
   optional State state = 4;
 }
 
+// The SocketStatsPB message is used to report on socket-level information
+// for RPC-related TCP connections (Linux-only). Essentially, the message
+// contains some metrics and counters from Linux-specific 'tcp_info' structure
+// defined in /usr/include/linux/tcp.h. For more information on the TCP on
+// Linux, see http://man7.org/linux/man-pages/man7/tcp.7.html
+message SocketStatsPB {
+  optional uint32 rtt = 1;
+  optional uint32 rttvar = 2;
+  optional uint32 snd_cwnd = 3;
+  optional uint32 total_retrans = 4;
+
+  optional uint32 pacing_rate = 5;
+  optional uint32 max_pacing_rate = 6;
+
+  optional uint64 bytes_acked = 7;
+  optional uint64 bytes_received = 8;
+  optional uint32 segs_out = 9;
+  optional uint32 segs_in = 10;
+
+  optional uint64 send_queue_bytes = 11;
+  optional uint64 receive_queue_bytes = 12;
+
+  // Calculated sender throughput.
+  optional int32 send_bytes_per_sec = 13;
+};
+
+// Debugging information about a currently-open RPC connection.
 message RpcConnectionPB {
   enum StateType {
     UNKNOWN = 999;
@@ -63,13 +90,16 @@ message RpcConnectionPB {
   optional string remote_user_credentials = 3;
   repeated RpcCallInProgressPB calls_in_flight = 4;
   optional int64 outbound_queue_size = 5;
+
+  // Information on the actual TCP connection as reported by the kernel.
+  optional SocketStatsPB socket_stats = 6;
 }
 
-message DumpRunningRpcsRequestPB {
+message DumpConnectionsRequestPB {
   optional bool include_traces = 1 [ default = false ];
 }
 
-message DumpRunningRpcsResponsePB {
+message DumpConnectionsResponsePB {
   repeated RpcConnectionPB inbound_connections = 1;
   repeated RpcConnectionPB outbound_connections = 2;
 }
diff --git a/be/src/kudu/rpc/rpc_stub-test.cc b/be/src/kudu/rpc/rpc_stub-test.cc
index e626276..ce26241 100644
--- a/be/src/kudu/rpc/rpc_stub-test.cc
+++ b/be/src/kudu/rpc/rpc_stub-test.cc
@@ -571,11 +571,11 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
                boost::bind(&CountDownLatch::CountDown, &sleep.latch));
 
   // Check the running RPC status on the client messenger.
-  DumpRunningRpcsRequestPB dump_req;
-  DumpRunningRpcsResponsePB dump_resp;
+  DumpConnectionsRequestPB dump_req;
+  DumpConnectionsResponsePB dump_resp;
   dump_req.set_include_traces(true);
 
-  ASSERT_OK(client_messenger_->DumpRunningRpcs(dump_req, &dump_resp));
+  ASSERT_OK(client_messenger_->DumpConnections(dump_req, &dump_resp));
   LOG(INFO) << "client messenger: " << SecureDebugString(dump_resp);
   ASSERT_EQ(1, dump_resp.outbound_connections_size());
   ASSERT_EQ(1, dump_resp.outbound_connections(0).calls_in_flight_size());
@@ -588,7 +588,7 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
   // asynchronously off of the main thread (ie the server may not be handling it yet)
   for (int i = 0; i < 100; i++) {
     dump_resp.Clear();
-    ASSERT_OK(server_messenger_->DumpRunningRpcs(dump_req, &dump_resp));
+    ASSERT_OK(server_messenger_->DumpConnections(dump_req, &dump_resp));
     if (dump_resp.inbound_connections_size() > 0 &&
         dump_resp.inbound_connections(0).calls_in_flight_size() > 0) {
       break;
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 09893e0..1e16f67 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -17,6 +17,10 @@
 
 #include "rpc/rpc-mgr.h"
 
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/message.h>
+
 #include "exec/kudu-util.h"
 #include "kudu/rpc/acceptor_pool.h"
 #include "kudu/rpc/remote_user.h"
@@ -30,6 +34,7 @@
 #include "runtime/mem-tracker.h"
 #include "util/auth-util.h"
 #include "util/cpu-info.h"
+#include "util/json-util.h"
 #include "util/network-util.h"
 #include "util/openssl-util.h"
 
@@ -41,12 +46,13 @@ using kudu::HostPort;
 using kudu::MetricEntity;
 using kudu::MonoDelta;
 using kudu::rpc::AcceptorPool;
-using kudu::rpc::DumpRunningRpcsRequestPB;
-using kudu::rpc::DumpRunningRpcsResponsePB;
+using kudu::rpc::DumpConnectionsRequestPB;
+using kudu::rpc::DumpConnectionsResponsePB;
 using kudu::rpc::GeneratedServiceIf;
 using kudu::rpc::MessengerBuilder;
 using kudu::rpc::Messenger;
 using kudu::rpc::RemoteUser;
+using kudu::rpc::RpcCallInProgressPB;
 using kudu::rpc::RpcConnectionPB;
 using kudu::rpc::RpcContext;
 using kudu::rpc::RpcController;
@@ -227,8 +233,8 @@ void RpcMgr::Shutdown() {
 bool RpcMgr::IsServerTooBusy(const RpcController& rpc_controller) {
   const kudu::Status status = rpc_controller.status();
   const kudu::rpc::ErrorStatusPB* err = rpc_controller.error_response();
-  return status.IsRemoteError() && err != nullptr && err->has_code() &&
-      err->code() == kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY;
+  return status.IsRemoteError() && err != nullptr && err->has_code()
+      && err->code() == kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY;
 }
 
 void RpcMgr::ToJson(Document* document) {
@@ -241,18 +247,42 @@ void RpcMgr::ToJson(Document* document) {
   document->AddMember("rpc_connections_accepted", num_accepted, document->GetAllocator());
 
   // Add messenger metrics.
-  DumpRunningRpcsResponsePB response;
-  messenger_->DumpRunningRpcs(DumpRunningRpcsRequestPB(), &response);
+  DumpConnectionsResponsePB response;
+  messenger_->DumpConnections(DumpConnectionsRequestPB(), &response);
 
   int64_t num_inbound_calls_in_flight = 0;
+  // Add per connection metrics for inbound connections.
+  Value inbound_per_conn_metrics(kArrayType);
   for (const RpcConnectionPB& conn : response.inbound_connections()) {
+    Value per_conn_metrics_entry(kObjectType);
+    Value remote_ip_str(conn.remote_ip().c_str(), document->GetAllocator());
+    per_conn_metrics_entry.AddMember(
+        "remote_ip", remote_ip_str, document->GetAllocator());
+    per_conn_metrics_entry.AddMember(
+        "num_calls_in_flight", conn.calls_in_flight().size(), document->GetAllocator());
     num_inbound_calls_in_flight += conn.calls_in_flight().size();
+    Value socket_stats_entry(kObjectType);
+    ProtobufToJson(conn.socket_stats(), document, &socket_stats_entry);
+    per_conn_metrics_entry.AddMember(
+        "socket_stats", socket_stats_entry, document->GetAllocator());
+
+    Value calls_in_flight(kArrayType);
+    for (const RpcCallInProgressPB& call : conn.calls_in_flight()) {
+      Value call_in_flight(kObjectType);
+      ProtobufToJson(call, document, &call_in_flight);
+      calls_in_flight.PushBack(call_in_flight, document->GetAllocator());
+    }
+    per_conn_metrics_entry.AddMember(
+      "calls_in_flight", calls_in_flight, document->GetAllocator());
+    inbound_per_conn_metrics.PushBack(per_conn_metrics_entry, document->GetAllocator());
   }
+  document->AddMember(
+      "inbound_per_conn_metrics", inbound_per_conn_metrics, document->GetAllocator());
   document->AddMember("num_inbound_calls_in_flight", num_inbound_calls_in_flight,
       document->GetAllocator());
 
-  // Add per connection metrics.
-  Value per_conn_metrics(kArrayType);
+  // Add per connection metrics for outbound connections.
+  Value outbound_per_conn_metrics(kArrayType);
   int64_t num_outbound_calls_in_flight = 0;
   for (const RpcConnectionPB& conn : response.outbound_connections()) {
     num_outbound_calls_in_flight += conn.calls_in_flight().size();
@@ -263,10 +293,27 @@ void RpcMgr::ToJson(Document* document) {
     per_conn_metrics_entry.AddMember(
         "remote_ip", remote_ip_str, document->GetAllocator());
     per_conn_metrics_entry.AddMember(
+        "num_calls_in_flight", conn.calls_in_flight().size(), document->GetAllocator());
+    per_conn_metrics_entry.AddMember(
         "outbound_queue_size", conn.outbound_queue_size(), document->GetAllocator());
-    per_conn_metrics.PushBack(per_conn_metrics_entry, document->GetAllocator());
+
+    Value socket_stats_entry(kObjectType);
+    ProtobufToJson(conn.socket_stats(), document, &socket_stats_entry);
+    per_conn_metrics_entry.AddMember(
+        "socket_stats", socket_stats_entry, document->GetAllocator());
+
+    Value calls_in_flight(kArrayType);
+    for (const RpcCallInProgressPB& call : conn.calls_in_flight()) {
+      Value call_in_flight(kObjectType);
+      ProtobufToJson(call, document, &call_in_flight);
+      calls_in_flight.PushBack(call_in_flight, document->GetAllocator());
+    }
+    per_conn_metrics_entry.AddMember(
+        "calls_in_flight", calls_in_flight, document->GetAllocator());
+    outbound_per_conn_metrics.PushBack(per_conn_metrics_entry, document->GetAllocator());
   }
-  document->AddMember("per_conn_metrics", per_conn_metrics, document->GetAllocator());
+  document->AddMember(
+      "per_conn_metrics", outbound_per_conn_metrics, document->GetAllocator());
   document->AddMember("num_outbound_calls_in_flight", num_outbound_calls_in_flight,
       document->GetAllocator());
 
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index ef3fbb2..a90bea8 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -58,6 +58,7 @@ add_library(Util
   histogram-metric.cc
   impalad-metrics.cc
   jni-util.cc
+  json-util.cc
   logging-support.cc
   mem-info.cc
   memory-metrics.cc
diff --git a/be/src/util/json-util.cc b/be/src/util/json-util.cc
new file mode 100644
index 0000000..eae1dae
--- /dev/null
+++ b/be/src/util/json-util.cc
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/json-util.h"
+
+#include <vector>
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/message.h>
+#include <rapidjson/document.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/writer.h>
+
+#include "util/redactor.h"
+
+#include "common/names.h"
+
+using rapidjson::Document;
+using rapidjson::Value;
+
+namespace impala {
+
+// Convert a repeated field to a JSON array and add it to 'obj'.
+static void RepeatedFieldToJson(const google::protobuf::Message& pb,
+    const google::protobuf::Reflection* reflection,
+    const google::protobuf::FieldDescriptor* field, Document* document, Value* obj) {
+  Value arr(rapidjson::kArrayType);
+  int size = reflection->FieldSize(pb, field);
+  for (int i = 0; i < size; i++) {
+    switch (field->cpp_type()) {
+      case google::protobuf::FieldDescriptor::CPPTYPE_INT32:
+        arr.PushBack(
+            reflection->GetRepeatedInt32(pb, field, i), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_INT64:
+        arr.PushBack(
+            reflection->GetRepeatedInt64(pb, field, i), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_UINT32:
+        arr.PushBack(
+            reflection->GetRepeatedUInt32(pb, field, i), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_UINT64:
+        arr.PushBack(
+            reflection->GetRepeatedUInt64(pb, field, i), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT:
+        arr.PushBack(
+            reflection->GetRepeatedFloat(pb, field, i), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE:
+        arr.PushBack(
+            reflection->GetRepeatedDouble(pb, field, i), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_BOOL:
+        arr.PushBack(reflection->GetRepeatedBool(pb, field, i), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: {
+        Value enum_str(reflection->GetRepeatedEnum(pb, field, i)->name().c_str(),
+            document->GetAllocator());
+        arr.PushBack(enum_str, document->GetAllocator());
+        break;
+      }
+      case google::protobuf::FieldDescriptor::CPPTYPE_STRING: {
+        string str = reflection->GetRepeatedString(pb, field, i);
+        Redact(&str, nullptr);
+        Value val_str(str.c_str(), document->GetAllocator());
+        arr.PushBack(val_str, document->GetAllocator());
+        break;
+      }
+      case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: {
+        Value child_obj(rapidjson::kObjectType);
+        ProtobufToJson(
+            reflection->GetRepeatedMessage(pb, field, i), document, &child_obj);
+        arr.PushBack(child_obj, document->GetAllocator());
+        break;
+      }
+      default:
+        DCHECK(false) << "Type NYI: " << field->cpp_type() << " " << field->name();
+    }
+  }
+  Value field_name(field->name().c_str(), document->GetAllocator());
+  obj->AddMember(field_name, arr, document->GetAllocator());
+}
+
+void ProtobufToJson(const google::protobuf::Message& pb, Document* document, Value* obj) {
+  const google::protobuf::Reflection* reflection = pb.GetReflection();
+  vector<const google::protobuf::FieldDescriptor*> fields;
+  reflection->ListFields(pb, &fields);
+  for (const google::protobuf::FieldDescriptor* field : fields) {
+    if (field->is_repeated()) {
+      RepeatedFieldToJson(pb, reflection, field, document, obj);
+      continue;
+    }
+    Value field_name(field->name().c_str(), document->GetAllocator());
+    switch (field->cpp_type()) {
+      case google::protobuf::FieldDescriptor::CPPTYPE_INT32:
+        obj->AddMember(
+            field_name, reflection->GetInt32(pb, field), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_INT64:
+        obj->AddMember(
+            field_name, reflection->GetInt64(pb, field), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_UINT32:
+        obj->AddMember(
+            field_name, reflection->GetUInt32(pb, field), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_UINT64:
+        obj->AddMember(
+            field_name, reflection->GetUInt64(pb, field), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT:
+        obj->AddMember(
+            field_name, reflection->GetFloat(pb, field), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE:
+        obj->AddMember(
+            field_name, reflection->GetDouble(pb, field), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_BOOL:
+        obj->AddMember(
+            field_name, reflection->GetBool(pb, field), document->GetAllocator());
+        break;
+      case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: {
+        Value enum_str(
+            reflection->GetEnum(pb, field)->name().c_str(), document->GetAllocator());
+        obj->AddMember(field_name, enum_str, document->GetAllocator());
+        break;
+      }
+      case google::protobuf::FieldDescriptor::CPPTYPE_STRING: {
+        string str = reflection->GetString(pb, field);
+        Redact(&str, nullptr);
+        Value val_str(str.c_str(), document->GetAllocator());
+        obj->AddMember(field_name, val_str, document->GetAllocator());
+        break;
+      }
+      case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: {
+        Value child_obj(rapidjson::kObjectType);
+        ProtobufToJson(reflection->GetMessage(pb, field), document, &child_obj);
+        obj->AddMember(field_name, child_obj, document->GetAllocator());
+        break;
+      }
+      default:
+        DCHECK(false) << "Type NYI: " << field->cpp_type() << " " << field->name();
+    }
+  }
+}
+} // namespace impala
diff --git a/be/src/util/json-util.h b/be/src/util/json-util.h
index cf6845a..deabe97 100644
--- a/be/src/util/json-util.h
+++ b/be/src/util/json-util.h
@@ -18,11 +18,17 @@
 #ifndef IMPALA_UTIL_JSON_UTIL_H
 #define IMPALA_UTIL_JSON_UTIL_H
 
-#include <rapidjson/rapidjson.h>
 #include <rapidjson/document.h>
+#include <rapidjson/rapidjson.h>
 
-#include "util/template-util.h"
 #include "util/pretty-printer.h"
+#include "util/template-util.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+}
+} // namespace google
 
 namespace impala {
 
@@ -58,6 +64,14 @@ ToJsonValue(const T& value, const TUnit::type unit, rapidjson::Document* documen
   }
 }
 
-}
+/// Uses reflection to append the fields from the protobuf 'pb' to the JSON object 'obj'.
+/// Recursively adds nested arrays and objects, i.e. makes a verbatim copy of 'pb'.
+/// 'document' must be the document containing 'obj'.
+/// Care must be taken when converting protobufs that may contain sensitive data, e.g.
+/// strings, so as to not leak it. Strings are automatically redacted if redaction
+/// is enabled.
+void ProtobufToJson(const google::protobuf::Message& pb, rapidjson::Document* document,
+    rapidjson::Value* obj);
+} // namespace impala
 
 #endif
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 47110c0..d6f80d3 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -20,6 +20,7 @@ from tests.common.file_utils import grep_dir
 from tests.common.skip import SkipIfBuildType
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
+import itertools
 import json
 import os
 import pytest
@@ -529,6 +530,16 @@ class TestWebPage(ImpalaTestSuite):
         functional.alltypestiny join functional.alltypessmall c2"
     SVC_NAME = 'impala.DataStreamService'
 
+    def get_per_conn_metrics(inbound):
+      """Get inbound or outbound per-connection metrics"""
+      rpcz = self.get_debug_page(self.RPCZ_URL)
+      if inbound:
+        key = "inbound_per_conn_metrics"
+      else:
+        key = "per_conn_metrics"
+      conns = rpcz[key]
+      return conns
+
     def get_svc_metrics(svc_name):
       rpcz = self.get_debug_page(self.RPCZ_URL)
       assert len(rpcz['services']) > 0
@@ -538,11 +549,28 @@ class TestWebPage(ImpalaTestSuite):
           return sorted(s['rpc_method_metrics'], key=lambda m: m['method_name'])
       assert False, 'Could not find metrics for %s' % svc_name
 
-    before = get_svc_metrics(SVC_NAME)
+    svc_before = get_svc_metrics(SVC_NAME)
+    inbound_before = get_per_conn_metrics(True)
+    outbound_before = get_per_conn_metrics(False)
     self.client.execute(TEST_QUERY)
-    after = get_svc_metrics(SVC_NAME)
-
-    assert before != after
+    svc_after = get_svc_metrics(SVC_NAME)
+    inbound_after = get_per_conn_metrics(True)
+    outbound_after = get_per_conn_metrics(False)
+
+    assert svc_before != svc_after
+    assert inbound_before != inbound_after
+    assert outbound_before != outbound_after
+
+    # Some connections should have metrics after executing query
+    assert len(inbound_after) > 0
+    assert len(outbound_after) > 0
+    # Spot-check some fields, including socket stats.
+    for conn in itertools.chain(inbound_after, outbound_after):
+      assert conn["remote_ip"] != ""
+      assert conn["num_calls_in_flight"] >= 0
+      assert conn["num_calls_in_flight"] == len(conn["calls_in_flight"])
+      assert conn["socket_stats"]["bytes_acked"] > 0, conn
+      assert conn["socket_stats"]["send_queue_bytes"] >= 0, conn
 
   @pytest.mark.execute_serially
   def test_admission_page(self):
diff --git a/www/rpcz.tmpl b/www/rpcz.tmpl
index 42d1f46..4dc5a7a 100644
--- a/www/rpcz.tmpl
+++ b/www/rpcz.tmpl
@@ -86,24 +86,92 @@ under the License.
 </table>
 {{/services}}
 
-<h3>Per connection metrics for KRPC</h3>
+<h3>Per connection metrics for KRPC outbound connections</h3>
 <table class="table table-bordered table-hover" id="per_conn_metrics">
   <thead>
     <tr>
       <th>Remote IP</th>
+      <th># Calls in Flight</th>
       <th>Outbound Queue Size (count)</th>
+      <th>Socket RTT (us)</th>
+      <th>Socket RTT Variance (us)</th>
+      <th>Sender Congestion Window</th>
+      <th># Retransmitted Packets</th>
+      <th>Pacing Rate</th>
+      <th>Max Pacing Rate</th>
+      <th>Bytes Acked</th>
+      <th>Bytes Received</th>
+      <th>Segments Sent</th>
+      <th>Segments Received</th>
+      <th>Send Queue Size (bytes)</th>
+      <th>Receive Queue Size (bytes)</th>
     </tr>
   </thead>
   <tbody>
     {{#per_conn_metrics}}
     <tr>
       <td>{{remote_ip}}</td>
+      <td>{{num_calls_in_flight}}</td>
       <td>{{outbound_queue_size}}</td>
+      <td>{{socket_stats.rtt}}</td>
+      <td>{{socket_stats.rttvar}}</td>
+      <td>{{socket_stats.snd_cwnd}}</td>
+      <td>{{socket_stats.total_retrans}}</td>
+      <td>{{socket_stats.pacing_rate}}</td>
+      <td>{{socket_stats.max_pacing_rate}}</td>
+      <td>{{socket_stats.bytes_acked}}</td>
+      <td>{{socket_stats.bytes_received}}</td>
+      <td>{{socket_stats.segs_out}}</td>
+      <td>{{socket_stats.segs_in}}</td>
+      <td>{{socket_stats.send_queue_bytes}}</td>
+      <td>{{socket_stats.receive_queue_bytes}}</td>
     </tr>
     {{/per_conn_metrics}}
   </tbody>
 </table>
 
+<h3>Per connection metrics for KRPC inbound connections</h3>
+<table class="table table-bordered table-hover" id="inbound_per_conn_metrics">
+  <thead>
+    <tr>
+      <th>Remote IP</th>
+      <th># Calls in Flight</th>
+      <th>Socket RTT (us)</th>
+      <th>Socket RTT Variance (us)</th>
+      <th>Sender Congestion Window</th>
+      <th># Retransmitted Packets</th>
+      <th>Pacing Rate</th>
+      <th>Max Pacing Rate</th>
+      <th>Bytes Acked</th>
+      <th>Bytes Received</th>
+      <th>Segments Sent</th>
+      <th>Segments Received</th>
+      <th>Send Queue Size (bytes)</th>
+      <th>Receive Queue Size (bytes)</th>
+    </tr>
+  </thead>
+  <tbody>
+    {{#inbound_per_conn_metrics}}
+    <tr>
+      <td>{{remote_ip}}</td>
+      <td>{{num_calls_in_flight}}</td>
+      <td>{{socket_stats.rtt}}</td>
+      <td>{{socket_stats.rttvar}}</td>
+      <td>{{socket_stats.snd_cwnd}}</td>
+      <td>{{socket_stats.total_retrans}}</td>
+      <td>{{socket_stats.pacing_rate}}</td>
+      <td>{{socket_stats.max_pacing_rate}}</td>
+      <td>{{socket_stats.bytes_acked}}</td>
+      <td>{{socket_stats.bytes_received}}</td>
+      <td>{{socket_stats.segs_out}}</td>
+      <td>{{socket_stats.segs_in}}</td>
+      <td>{{socket_stats.send_queue_bytes}}</td>
+      <td>{{socket_stats.receive_queue_bytes}}</td>
+    </tr>
+    {{/inbound_per_conn_metrics}}
+  </tbody>
+</table>
+
 {{?servers}}
 <h2>Impala Thrift RPC Services
   <button class="btn btn-warning btn-xs" onClick="reset_all();">
@@ -229,7 +297,40 @@ function update_krpc_services(json) {
 function update_krpc_conn_metrics_datatable(json) {
   var table = $('#per_conn_metrics').DataTable();
   var rows = $.map(json["per_conn_metrics"], function(row) {
-    return [[row["remote_ip"], row["outbound_queue_size"]]];
+    return [[row["remote_ip"], row["num_calls_in_flight"], row["outbound_queue_size"],
+             row["socket_stats"]["rtt"]],
+             row["socket_stats"]["rttvar"]],
+             row["socket_stats"]["cnd_cwnd"]],
+             row["socket_stats"]["total_retrans"]],
+             row["socket_stats"]["pacing_rate"]],
+             row["socket_stats"]["max_pacing_rate"]],
+             row["socket_stats"]["bytes_acked"]],
+             row["socket_stats"]["bytes_received"]],
+             row["socket_stats"]["segs_out"]],
+             row["socket_stats"]["segs_in"]],
+             row["socket_stats"]["send_queue_bytes"]],
+             row["socket_stats"]["receive_queue_bytes"]]];
+  });
+
+  table.clear().rows.add(rows).draw();
+}
+
+function update_krpc_inbound_conn_metrics_datatable(json) {
+  var table = $('#inbound_per_conn_metrics').DataTable();
+  var rows = $.map(json["inbounad_per_conn_metrics"], function(row) {
+    return [[row["remote_ip"], row["num_calls_in_flight"],
+             row["socket_stats"]["rtt"]],
+             row["socket_stats"]["rttvar"]],
+             row["socket_stats"]["cnd_cwnd"]],
+             row["socket_stats"]["total_retrans"]],
+             row["socket_stats"]["pacing_rate"]],
+             row["socket_stats"]["max_pacing_rate"]],
+             row["socket_stats"]["bytes_acked"]],
+             row["socket_stats"]["bytes_received"]],
+             row["socket_stats"]["segs_out"]],
+             row["socket_stats"]["segs_in"]],
+             row["socket_stats"]["send_queue_bytes"]],
+             row["socket_stats"]["receive_queue_bytes"]]];
   });
 
   table.clear().rows.add(rows).draw();
@@ -253,6 +354,7 @@ function refresh() {
     update_impala_services(json);
     update_krpc_services(json);
     update_krpc_conn_metrics_datatable(json);
+    update_krpc_inbound_conn_metrics_datatable(json);
     document.getElementById("last-updated").textContent = new Date();
   }
 


[impala] 01/03: [DOCS] Impala is not optimized for the IN operator when accessing HBASE

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit acddf76bcb3fe8b5f8cc1c60e8d3f15c6e164d5b
Author: Alex Rodoni <ar...@cloudera.com>
AuthorDate: Wed Dec 11 11:31:16 2019 -0800

    [DOCS] Impala is not optimized for the IN operator when accessing HBASE
    
    Change-Id: I37337a18c7add3c64795b3b2e49670493a9a8e44
    Reviewed-on: http://gerrit.cloudera.org:8080/14891
    Reviewed-by: Lars Volker <lv...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 docs/topics/impala_hbase.xml | 31 ++++++++++++++++---------------
 1 file changed, 16 insertions(+), 15 deletions(-)

diff --git a/docs/topics/impala_hbase.xml b/docs/topics/impala_hbase.xml
index 63f14af..aaf451c 100644
--- a/docs/topics/impala_hbase.xml
+++ b/docs/topics/impala_hbase.xml
@@ -110,12 +110,12 @@ under the License.
           the new table.)
         </li>
 
-        <li>
-          You issue queries against the Impala tables. For efficient queries, use <codeph>WHERE</codeph> clauses to
-          find a single key value or a range of key values wherever practical, by testing the Impala column
-          corresponding to the HBase row key. Avoid queries that do full-table scans, which are efficient for
-          regular Impala tables but inefficient in HBase.
-        </li>
+        <li> You issue queries against the Impala tables. For efficient queries,
+          use the <codeph>WHERE</codeph> clause to find a single key value or a
+          range of key values wherever practical, by testing the Impala column
+          corresponding to the HBase row key. Avoid queries that do full-table
+          scans, which are efficient for regular Impala tables but inefficient
+          in HBase. </li>
       </ul>
 
       <p>
@@ -180,15 +180,16 @@ under the License.
         key or value fields. All the type enforcement is done on the Impala side.
       </p>
 
-      <p>
-        For best performance of Impala queries against HBase tables, most queries will perform comparisons in the
-        <codeph>WHERE</codeph> against the column that corresponds to the HBase row key. When creating the table
-        through the Hive shell, use the <codeph>STRING</codeph> data type for the column that corresponds to the
-        HBase row key. Impala can translate conditional tests (through operators such as <codeph>=</codeph>,
-        <codeph>&lt;</codeph>, <codeph>BETWEEN</codeph>, and <codeph>IN</codeph>) against this column into fast
-        lookups in HBase, but this optimization (<q>predicate pushdown</q>) only works when that column is
-        defined as <codeph>STRING</codeph>.
-      </p>
+      <p> For best performance of Impala queries against HBase tables, most
+        queries will perform comparisons in the <codeph>WHERE</codeph> clause
+        against the column that corresponds to the HBase row key. When creating
+        the table through the Hive shell, use the <codeph>STRING</codeph> data
+        type for the column that corresponds to the HBase row key. Impala can
+        translate predicates (through operators such as <codeph>=</codeph>,
+          <codeph>&lt;</codeph>, and <codeph>BETWEEN</codeph>) against this
+        column into fast lookups in HBase, but this optimization (<q>predicate
+          pushdown</q>) only works when that column is defined as
+          <codeph>STRING</codeph>. </p>
 
       <p>
         Starting in Impala 1.1, Impala also supports reading and writing to columns that are defined in the Hive