You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/07/11 21:35:59 UTC

[impala] 01/02: IMPALA-8543: More diagnostics for TAcceptQueueServer

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e83f67e4e3ad24711703224c8efc42e291a0d89c
Author: Michael Ho <kw...@cloudera.com>
AuthorDate: Tue Jun 25 19:13:41 2019 -0700

    IMPALA-8543: More diagnostics for TAcceptQueueServer
    
    This change adds more logging information in TAcceptQueueServer
    to help diagnose issues at various stages of client connections
    establishment.
    
    Two new metrics are also added to measure the connection setup time
    and the wait time for service threads to be available.
    
    Change-Id: I33b32352b457a2c8ec7bae6da46bb9c555dc9c36
    Reviewed-on: http://gerrit.cloudera.org:8080/13790
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/TAcceptQueueServer.cpp |  70 +++++++++++++++------
 be/src/rpc/TAcceptQueueServer.h   |  21 ++++---
 common/thrift/metrics.json        | 126 +++++++++++++++++++++++++++++++++++++-
 3 files changed, 189 insertions(+), 28 deletions(-)

diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 09d7954..fa04d87 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -19,17 +19,21 @@
 // This file was copied from apache::thrift::server::TThreadedServer.cpp v0.9.0, with the
 // significant changes noted inline below.
 
-#include <algorithm>
 #include "rpc/TAcceptQueueServer.h"
 
+#include <gutil/walltime.h>
 #include <thrift/concurrency/PlatformThreadFactory.h>
 #include <thrift/transport/TSocket.h>
 
+#include "util/histogram-metric.h"
 #include "util/metrics.h"
+#include "util/stopwatch.h"
 #include "rpc/thrift-util.h"
 #include "rpc/thrift-server.h"
 #include "util/thread-pool.h"
 
+#include "common/names.h"
+
 DEFINE_int32(accepted_cnxn_queue_depth, 10000,
     "(Advanced) The size of the post-accept, pre-setup connection queue in each thrift "
     "server set up to service Impala internal and external connections.");
@@ -99,13 +103,13 @@ class TAcceptQueueServer::Task : public Runnable {
 
     try {
       input_->getTransport()->close();
-    } catch (TTransportException& ttx) {
+    } catch (const TTransportException& ttx) {
       string errStr = string("TAcceptQueueServer input close failed: ") + ttx.what();
       GlobalOutput(errStr.c_str());
     }
     try {
       output_->getTransport()->close();
-    } catch (TTransportException& ttx) {
+    } catch (const TTransportException& ttx) {
       string errStr = string("TAcceptQueueServer output close failed: ") + ttx.what();
       GlobalOutput(errStr.c_str());
     }
@@ -191,10 +195,6 @@ TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<TProcessor>& proc
 }
 
 void TAcceptQueueServer::init() {
-  stop_ = false;
-  metrics_enabled_ = false;
-  queue_size_metric_ = nullptr;
-
   if (!threadFactory_) {
     threadFactory_.reset(new PlatformThreadFactory);
   }
@@ -221,17 +221,28 @@ void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
   shared_ptr<TTransport> inputTransport;
   shared_ptr<TTransport> outputTransport;
   shared_ptr<TTransport> client = entry->client_;
+  const string& socket_info = reinterpret_cast<TSocket*>(client.get())->getSocketInfo();
+  VLOG(1) << Substitute("TAcceptQueueServer: $0 started connection setup for client $1",
+      name_, socket_info);
   try {
+    MonotonicStopWatch timer;
+    // Start timing for connection setup.
+    timer.Start();
     inputTransport = inputTransportFactory_->getTransport(client);
     outputTransport = outputTransportFactory_->getTransport(client);
     shared_ptr<TProtocol> inputProtocol =
         inputProtocolFactory_->getProtocol(inputTransport);
     shared_ptr<TProtocol> outputProtocol =
         outputProtocolFactory_->getProtocol(outputTransport);
-
     shared_ptr<TProcessor> processor =
         getProcessor(inputProtocol, outputProtocol, client);
 
+    if (metrics_enabled_) {
+      cnxns_setup_time_us_metric_->Update(timer.ElapsedTime() / NANOS_PER_MICRO);
+    }
+    VLOG(1) << Substitute("TAcceptQueueServer: $0 finished connection setup for "
+        "client $1", name_, socket_info);
+
     TAcceptQueueServer::Task* task = new TAcceptQueueServer::Task(
         *this, processor, inputProtocol, outputProtocol, client);
 
@@ -241,7 +252,9 @@ void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
     // Create a thread for this task
     shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
 
-    // Insert thread into the set of threads
+    // Insert thread into the set of threads.
+    // Start timing the wait duration for service threads.
+    timer.Reset();
     {
       Synchronized s(tasksMonitor_);
       int64_t wait_time_ms = 0;
@@ -257,7 +270,10 @@ void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
                   << "Waiting for " << wait_time_ms << " milliseconds.";
         int wait_result = tasksMonitor_.waitForTimeRelative(wait_time_ms);
         if (wait_result == THRIFT_ETIMEDOUT) {
-          if (metrics_enabled_) timedout_cnxns_metric_->Increment(1);
+          if (metrics_enabled_) {
+            thread_wait_time_us_metric_->Update(timer.ElapsedTime() / NANOS_PER_MICRO);
+            timedout_cnxns_metric_->Increment(1);
+          }
           LOG(INFO) << name_ << ": Server busy. Timing out connection request.";
           string errStr = "TAcceptQueueServer: " + name_ + " server busy";
           CleanupAndClose(errStr, inputTransport, outputTransport, client);
@@ -266,14 +282,19 @@ void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
       }
       tasks_.insert(task);
     }
+    if (metrics_enabled_) {
+      thread_wait_time_us_metric_->Update(timer.ElapsedTime() / NANOS_PER_MICRO);
+    }
 
     // Start the thread!
     thread->start();
-  } catch (TException& tx) {
-    string errStr = string("TAcceptQueueServer: Caught TException: ") + tx.what();
+  } catch (const TException& tx) {
+    string errStr = Substitute("TAcceptQueueServer: $0 connection setup failed for "
+        "client $1. Caught TException: $2", name_, socket_info, string(tx.what()));
     CleanupAndClose(errStr, inputTransport, outputTransport, client);
-  } catch (string s) {
-    string errStr = "TAcceptQueueServer: Unknown exception: " + s;
+  } catch (const string& s) {
+    string errStr = Substitute("TAcceptQueueServer: $0 connection setup failed for "
+        "client $1. Unknown exception: $2", name_, socket_info, s);
     CleanupAndClose(errStr, inputTransport, outputTransport, client);
   }
 }
@@ -313,6 +334,10 @@ void TAcceptQueueServer::serve() {
       // Fetch client from server
       shared_ptr<TTransport> client = serverTransport_->accept();
 
+      TSocket* socket = reinterpret_cast<TSocket*>(client.get());
+      VLOG(1) << Substitute("New connection to server $0 from client $1",
+          name_, socket->getSocketInfo());
+
       shared_ptr<TAcceptQueueEntry> entry{new TAcceptQueueEntry};
       entry->client_ = client;
       if (queue_timeout_ms_ > 0) {
@@ -328,18 +353,18 @@ void TAcceptQueueServer::serve() {
         break;
       }
       if (metrics_enabled_) queue_size_metric_->Increment(1);
-    } catch (TTransportException& ttx) {
+    } catch (const TTransportException& ttx) {
       if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
         string errStr =
             string("TAcceptQueueServer: TServerTransport died on accept: ") + ttx.what();
         GlobalOutput(errStr.c_str());
       }
       continue;
-    } catch (TException& tx) {
+    } catch (const TException& tx) {
       string errStr = string("TAcceptQueueServer: Caught TException: ") + tx.what();
       GlobalOutput(errStr.c_str());
       continue;
-    } catch (string s) {
+    } catch (const string& s) {
       string errStr = "TAcceptQueueServer: Unknown exception: " + s;
       GlobalOutput(errStr.c_str());
       break;
@@ -360,7 +385,7 @@ void TAcceptQueueServer::serve() {
       while (!tasks_.empty()) {
         tasksMonitor_.wait();
       }
-    } catch (TException& tx) {
+    } catch (const TException& tx) {
       string errStr =
           string("TAcceptQueueServer: Exception joining workers: ") + tx.what();
       GlobalOutput(errStr.c_str());
@@ -377,6 +402,15 @@ void TAcceptQueueServer::InitMetrics(MetricGroup* metrics, const string& key_pre
   stringstream timedout_cnxns_ss;
   timedout_cnxns_ss << key_prefix << ".timedout-cnxn-requests";
   timedout_cnxns_metric_ = metrics->AddGauge(timedout_cnxns_ss.str(), 0);
+  const int max_histogram_value_us = 5 * 60 * MICROS_PER_SEC;
+  stringstream cnxns_setup_time_ss;
+  cnxns_setup_time_ss << key_prefix << ".connection-setup-time";
+  cnxns_setup_time_us_metric_ = metrics->RegisterMetric(new HistogramMetric(
+      MetricDefs::Get(cnxns_setup_time_ss.str()), max_histogram_value_us, 1));
+  stringstream thread_wait_time_ss;
+  thread_wait_time_ss << key_prefix << ".svc-thread-wait-time";
+  thread_wait_time_us_metric_ = metrics->RegisterMetric(new HistogramMetric(
+      MetricDefs::Get(thread_wait_time_ss.str()), max_histogram_value_us, 1));
   metrics_enabled_ = true;
 }
 
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
index 08a7244..c82177b 100644
--- a/be/src/rpc/TAcceptQueueServer.h
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -93,7 +93,7 @@ class TAcceptQueueServer : public TServer {
       boost::shared_ptr<TTransport> client);
 
   boost::shared_ptr<ThreadFactory> threadFactory_;
-  volatile bool stop_;
+  volatile bool stop_ = false;
 
   /// Name of the thrift server.
   const std::string name_;
@@ -105,20 +105,27 @@ class TAcceptQueueServer : public TServer {
   // The maximum number of running tasks allowed at a time.
   const int32_t maxTasks_;
 
-  /// New - True if metrics are enabled
-  bool metrics_enabled_;
+  /// True if metrics are enabled
+  bool metrics_enabled_ = false;
 
-  /// New - Number of connections that have been accepted and are waiting to be setup.
-  impala::IntGauge* queue_size_metric_;
+  /// Number of connections that have been accepted and are waiting to be setup.
+  impala::IntGauge* queue_size_metric_ = nullptr;
 
   /// Number of connections rejected due to timeout.
-  impala::IntGauge* timedout_cnxns_metric_;
+  impala::IntGauge* timedout_cnxns_metric_ = nullptr;
+
+  /// Distribution of connection setup time in microseconds. This does not include the
+  /// time spent waiting for a service thread to be available.
+  impala::HistogramMetric* cnxns_setup_time_us_metric_ = nullptr;
+
+  /// Distribution of wait time in microseconds for service threads to be available.
+  impala::HistogramMetric* thread_wait_time_us_metric_ = nullptr;
 
   /// Amount of time in milliseconds after which a connection request will be timed out.
   /// Default value is 0, which means no timeout.
   int64_t queue_timeout_ms_;
 
-  /// Amount of time, in milliseconds, of client's inactivity before the service thread
+  /// Amount of time in milliseconds of client's inactivity before the service thread
   /// wakes up to check if the connection should be closed due to inactivity. If 0, no
   /// polling happens.
   int64_t idle_poll_period_ms_;
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 0cc7490..4a93c80 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -772,6 +772,26 @@
     "key": "impala.thrift-server.CatalogService.connections-in-use"
   },
   {
+    "description": "Amount of time clients of Catalog Service spent waiting for connection to be set up",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Catalog Service client connection setup time",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.CatalogService.connection-setup-time"
+  },
+  {
+    "description": "Amount of time clients of Catalog Service spent waiting for service threads",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Catalog Service clients' wait time for service threads",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.CatalogService.svc-thread-wait-time"
+  },
+  {
     "description": "The total number of connections made to this Catalog Server's catalog service  over its lifetime.",
     "contexts": [
       "CATALOGSERVER"
@@ -784,7 +804,7 @@
   {
     "description": "The number of connections to the Catalog Service that have been accepted and are waiting to be setup.",
     "contexts": [
-      "IMPALAD"
+      "CATALOGSERVER"
     ],
     "label": "Catalog Service Connections Queued for Setup",
     "units": "NONE",
@@ -794,7 +814,7 @@
   {
     "description": "The number of connection requests to the Catalog Service that have been timed out waiting to be setup.",
     "contexts": [
-      "IMPALAD"
+      "CATALOGSERVER"
     ],
     "label": "Catalog Service Connection Requests Timed Out",
     "units": "NONE",
@@ -802,7 +822,7 @@
     "key": "impala.thrift-server.CatalogService.timedout-cnxn-requests"
   },
   {
-    "description": "The number of active connections to this StateStore's StateStore service.",
+    "description": "The number of active connections to this StateStore's service.",
     "contexts": [
       "STATESTORE"
     ],
@@ -812,6 +832,26 @@
     "key": "impala.thrift-server.StatestoreService.connections-in-use"
   },
   {
+    "description": "Amount of time clients of StateStore Service spent waiting for connection to be set up",
+    "contexts": [
+      "STATESTORE"
+    ],
+    "label": "StateStore Service client connection setup time",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.StatestoreService.connection-setup-time"
+  },
+  {
+    "description": "Amount of time clients of StateStore Service spent waiting for service threads",
+    "contexts": [
+      "STATESTORE"
+    ],
+    "label": "StateStore Service clients' wait time for service threads",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.StatestoreService.svc-thread-wait-time"
+  },
+  {
     "description": "The total number of connections made to this StateStore's StateStore service over its lifetime.",
     "contexts": [
       "STATESTORE"
@@ -852,6 +892,26 @@
     "key": "impala.thrift-server.backend.connections-in-use"
   },
   {
+    "description": "Amount of time clients of Impala Backend Server spent waiting for connection to be set up",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Backend client connections setup time",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.backend.connection-setup-time"
+  },
+  {
+    "description": "Amount of time clients of Impala Backend Server spent waiting for service threads",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Backend Server clients' wait time for service threads",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.backend.svc-thread-wait-time"
+  },
+  {
     "description": "The total number of Impala Backend client connections made to this Impala Daemon over its lifetime.",
     "contexts": [
       "IMPALAD"
@@ -892,6 +952,26 @@
     "key": "impala.thrift-server.beeswax-frontend.connections-in-use"
   },
   {
+    "description": "Amount of time clients of Beeswax API spent waiting for connection to be set up",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Beeswax API client connection setup time",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.beeswax-frontend.connection-setup-time"
+  },
+  {
+    "description": "Amount of time clients of Beeswax API spent waiting for service threads",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Beeswax API clients' wait time for service threads",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.beeswax-frontend.svc-thread-wait-time"
+  },
+  {
     "description": "The total number of Beeswax API connections made to this Impala Daemon over its lifetime.",
     "contexts": [
       "IMPALAD"
@@ -932,6 +1012,26 @@
     "key": "impala.thrift-server.hiveserver2-frontend.connections-in-use"
   },
   {
+    "description": "Amount of time clients of HiveServer2 API spent waiting for connection to be set up",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 API client connection setup time",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.hiveserver2-frontend.connection-setup-time"
+  },
+  {
+    "description": "Amount of time clients of HiveServer2 API spent waiting for service threads",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 API clients' wait time for service threads",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.hiveserver2-frontend.svc-thread-wait-time"
+  },
+  {
     "description": "The total number of HiveServer2 API connections made to this Impala Daemon over its lifetime.",
     "contexts": [
       "IMPALAD"
@@ -972,6 +1072,26 @@
     "key": "impala.thrift-server.hiveserver2-http-frontend.connections-in-use"
   },
   {
+    "description": "Amount of time clients of HiveServer2 HTTP API spent waiting for connection to be set up",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API client connection setup time",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.connection-setup-time"
+  },
+  {
+    "description": "Amount of time clients of HiveServer2 HTTP API spent waiting for service threads",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API clients' wait time for service threads",
+    "units": "TIME_US",
+    "kind": "HISTOGRAM",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.svc-thread-wait-time"
+  },
+  {
     "description": "The total number of HiveServer2 HTTP API connections made to this Impala Daemon over its lifetime.",
     "contexts": [
       "IMPALAD"