You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ga...@apache.org on 2019/04/01 11:54:56 UTC

[impala] branch master updated (7adb411 -> eb483dd)

This is an automated email from the ASF dual-hosted git repository.

gaborkaszab pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 7adb411  IMPALA-8330: Impala shell config file should support flag names
     new a2a4de0  Update version to 3.3.0-SNAPSHOT
     new b2a8779  IMPALA-8312 : Alter database operations have race condition
     new 1fcea9e  IMPALA-7982: Add host network usage to profile
     new eb483dd  IMPALA-7800: Time out new connections after --fe_service_threads

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/rpc/TAcceptQueueServer.cpp                  |  89 +++++---
 be/src/rpc/TAcceptQueueServer.h                    |  25 ++-
 be/src/rpc/thrift-server.cc                        |   7 +-
 be/src/rpc/thrift-server.h                         |  18 +-
 be/src/runtime/query-state.cc                      |   9 +
 be/src/service/impala-server.cc                    |   7 +
 be/src/util/system-state-info-test.cc              |  60 ++++-
 be/src/util/system-state-info.cc                   | 116 ++++++++--
 be/src/util/system-state-info.h                    |  81 ++++++-
 bin/save-version.sh                                |   2 +-
 common/thrift/metrics.json                         |  50 +++++
 .../impala/catalog/CatalogServiceCatalog.java      |  29 ++-
 fe/src/main/java/org/apache/impala/catalog/Db.java |  35 ++-
 .../apache/impala/service/CatalogOpExecutor.java   |  50 ++---
 .../apache/impala/catalog/AlterDatabaseTest.java   | 241 +++++++++++++++++++++
 .../test_frontend_connection_limit.py              |  89 ++++++++
 tests/query_test/test_observability.py             |  39 ++--
 17 files changed, 827 insertions(+), 120 deletions(-)
 create mode 100644 fe/src/test/java/org/apache/impala/catalog/AlterDatabaseTest.java
 create mode 100644 tests/custom_cluster/test_frontend_connection_limit.py


[impala] 03/04: IMPALA-7982: Add host network usage to profile

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaborkaszab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1fcea9e4b677f07dfaf6b0314ac763241a029e66
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Wed Mar 13 15:04:32 2019 -0700

    IMPALA-7982: Add host network usage to profile
    
    This change adds host network usage to profiles. For each host that
    participates in the query execution it adds the incoming and outgoing
    bandwidth across all interfaces excluding the local loopback interface.
    This includes all data transmitted by the host as part of the execution
    of a query, other queries, and other processes running on the same
    system.
    
    The change adds tests for the added functionality to the backend and end
    to end tests.
    
    Change-Id: I2cc74f87374080a74a13b7fb6e4da44a11d828ef
    Reviewed-on: http://gerrit.cloudera.org:8080/12747
    Reviewed-by: Lars Volker <lv...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/query-state.cc          |   9 +++
 be/src/util/system-state-info-test.cc  |  60 +++++++++++++++--
 be/src/util/system-state-info.cc       | 116 ++++++++++++++++++++++++++++-----
 be/src/util/system-state-info.h        |  81 +++++++++++++++++++++--
 tests/query_test/test_observability.py |  39 ++++++-----
 5 files changed, 263 insertions(+), 42 deletions(-)

diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index beef32c..0d33a12 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -161,6 +161,15 @@ Status QueryState::Init(const TExecQueryFInstancesParams& exec_rpc_params) {
         "HostCpuIoWaitPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
         return system_state_info->GetCpuUsageRatios().iowait;
         });
+    // Add network usage
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostNetworkRx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
+        return system_state_info->GetNetworkUsage().rx_rate;
+        });
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostNetworkTx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
+        return system_state_info->GetNetworkUsage().tx_rate;
+        });
   }
 
   // Starting a new query creates threads and consumes a non-trivial amount of memory.
diff --git a/be/src/util/system-state-info-test.cc b/be/src/util/system-state-info-test.cc
index 9bae942..c80d577 100644
--- a/be/src/util/system-state-info-test.cc
+++ b/be/src/util/system-state-info-test.cc
@@ -32,23 +32,36 @@ class SystemStateInfoTest : public testing::Test {
 TEST_F(SystemStateInfoTest, FirstCallReturnsZero) {
   const SystemStateInfo::CpuUsageRatios& r = info.GetCpuUsageRatios();
   EXPECT_EQ(0, r.user + r.system + r.iowait);
+
+  const SystemStateInfo::NetworkUsage& n = info.GetNetworkUsage();
+  EXPECT_EQ(0, n.rx_rate + n.tx_rate);
 }
 
 // Smoke test to make sure that we read non-zero values from /proc/stat.
 TEST_F(SystemStateInfoTest, ReadProcStat) {
   info.ReadCurrentProcStat();
-  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cur_val_idx_];
+  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cpu_val_idx_];
   EXPECT_GT(state[SystemStateInfo::CPU_USER], 0);
   EXPECT_GT(state[SystemStateInfo::CPU_SYSTEM], 0);
   EXPECT_GT(state[SystemStateInfo::CPU_IDLE], 0);
   EXPECT_GT(state[SystemStateInfo::CPU_IOWAIT], 0);
 }
 
+// Smoke test to make sure that we read non-zero values from /proc/net/dev.
+TEST_F(SystemStateInfoTest, ReadProcNetDev) {
+  info.ReadCurrentProcNetDev();
+  const SystemStateInfo::NetworkValues& state = info.network_values_[info.net_val_idx_];
+  EXPECT_GT(state[SystemStateInfo::NET_RX_BYTES], 0);
+  EXPECT_GT(state[SystemStateInfo::NET_RX_PACKETS], 0);
+  EXPECT_GT(state[SystemStateInfo::NET_TX_BYTES], 0);
+  EXPECT_GT(state[SystemStateInfo::NET_TX_PACKETS], 0);
+}
+
 // Tests parsing a line similar to the first line of /proc/stat.
 TEST_F(SystemStateInfoTest, ParseProcStat) {
   // Fields are: user nice system idle iowait irq softirq steal guest guest_nice
   info.ReadProcStatString("cpu  20 30 10 70 100 0 0 0 0 0");
-  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cur_val_idx_];
+  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cpu_val_idx_];
   EXPECT_EQ(state[SystemStateInfo::CPU_USER], 20);
   EXPECT_EQ(state[SystemStateInfo::CPU_SYSTEM], 10);
   EXPECT_EQ(state[SystemStateInfo::CPU_IDLE], 70);
@@ -56,10 +69,26 @@ TEST_F(SystemStateInfoTest, ParseProcStat) {
 
   // Test that values larger than INT_MAX parse without error.
   info.ReadProcStatString("cpu  3000000000 30 10 70 100 0 0 0 0 0");
-  const SystemStateInfo::CpuValues& changed_state = info.cpu_values_[info.cur_val_idx_];
+  const SystemStateInfo::CpuValues& changed_state = info.cpu_values_[info.cpu_val_idx_];
   EXPECT_EQ(changed_state[SystemStateInfo::CPU_USER], 3000000000);
 }
 
+// Tests parsing a string similar to the contents of /proc/net/dev.
+TEST_F(SystemStateInfoTest, ParseProcNetDevString) {
+  // Fields are: user nice system idle iowait irq softirq steal guest guest_nice
+  string dev_net = R"(Inter-|   Receive                                                |  Transmit
+   face |bytes    packets errs drop fifo frame compressed multicast|bytes    packets errs drop fifo colls carrier compressed
+       lo: 12178099975318 24167151765    0    0    0     0          0         0 12178099975318 24167151765    0    0    0     0       0          0
+       br-c4d9b4cafca2: 1025814     409    0    0    0     0          0         0 70616330 1638104    0    0    0     0       0          0
+         eth0: 366039609986 388580561    0    2    0     0          0  62231368 95492744135 174535524    0    0    0     0       0          0)";
+  info.ReadProcNetDevString(dev_net);
+  const SystemStateInfo::NetworkValues& state = info.network_values_[info.net_val_idx_];
+  EXPECT_EQ(state[SystemStateInfo::NET_RX_BYTES], 366040635800);
+  EXPECT_EQ(state[SystemStateInfo::NET_RX_PACKETS], 388580970);
+  EXPECT_EQ(state[SystemStateInfo::NET_TX_BYTES], 95563360465);
+  EXPECT_EQ(state[SystemStateInfo::NET_TX_PACKETS], 176173628);
+}
+
 // Tests that computing CPU ratios doesn't overflow
 TEST_F(SystemStateInfoTest, ComputeCpuRatiosIntOverflow) {
   // Load old and new values for CPU counters. These values are from a system where we
@@ -88,7 +117,7 @@ TEST_F(SystemStateInfoTest, GetCpuUsageRatios) {
   t.join();
 }
 
-// Tests the computation logic.
+// Tests the computation logic for CPU ratios.
 TEST_F(SystemStateInfoTest, ComputeCpuRatios) {
   info.ReadProcStatString("cpu  20 30 10 70 100 0 0 0 0 0");
   info.ReadProcStatString("cpu  30 30 20 70 100 0 0 0 0 0");
@@ -99,5 +128,28 @@ TEST_F(SystemStateInfoTest, ComputeCpuRatios) {
   EXPECT_EQ(r.iowait, 0);
 }
 
+// Tests the computation logic for network usage.
+TEST_F(SystemStateInfoTest, ComputeNetworkUsage) {
+  // Two sets of values in the format of /proc/net/dev
+  string dev_net_1 = R"(Inter-|   Receive                                                |  Transmit
+   face |bytes    packets errs drop fifo frame compressed multicast|bytes    packets errs drop fifo colls carrier compressed
+       lo: 1000 2000    0    0    0     0          0         0 3000 4000    0    0    0     0       0          0
+       br-c4d9b4cafca2: 5000     409    0    0    0     0          0         0 6000 7000    0    0    0     0       0          0
+         eth0: 8000 9000    0    2    0     0          0  10000 11000 12000    0    0    0     0       0          0)";
+  string dev_net_2 = R"(Inter-|   Receive                                                |  Transmit
+   face |bytes    packets errs drop fifo frame compressed multicast|bytes    packets errs drop fifo colls carrier compressed
+       lo: 2000 4000    0    0    0     0          0         0 6000 8000    0    0    0     0       0          0
+       br-c4d9b4cafca2: 7000     609    0    0    0     0          0         0 12000 14000    0    0    0     0       0          0
+         eth0: 10000 11000    0    2    0     0          0  10000 11000 12000    0    0    0     0       0          0)";
+
+  info.ReadProcNetDevString(dev_net_1);
+  info.ReadProcNetDevString(dev_net_2);
+  int period_ms = 500;
+  info.ComputeNetworkUsage(period_ms);
+  const SystemStateInfo::NetworkUsage& n = info.GetNetworkUsage();
+  EXPECT_EQ(n.rx_rate, 8000);
+  EXPECT_EQ(n.tx_rate, 12000);
+}
+
 } // namespace impala
 
diff --git a/be/src/util/system-state-info.cc b/be/src/util/system-state-info.cc
index 62a1104..400a063 100644
--- a/be/src/util/system-state-info.cc
+++ b/be/src/util/system-state-info.cc
@@ -16,9 +16,13 @@
 // under the License.
 
 #include "gutil/strings/split.h"
+#include "gutil/strings/util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
 #include "util/error-util.h"
 #include "util/string-parser.h"
 #include "util/system-state-info.h"
+#include "util/time.h"
 
 #include <algorithm>
 #include <fstream>
@@ -27,19 +31,31 @@
 #include "common/names.h"
 
 using std::accumulate;
+using kudu::Env;
+using kudu::faststring;
+using kudu::ReadFileToString;
+using strings::Split;
 
 namespace impala {
 
 // Partially initializing cpu_ratios_ will default-initialize the remaining members.
 SystemStateInfo::SystemStateInfo() {
   memset(&cpu_ratios_, 0, sizeof(cpu_ratios_));
+  memset(&network_usage_, 0, sizeof(network_usage_));
   ReadCurrentProcStat();
+  ReadCurrentProcNetDev();
+  last_net_update_ms_ = MonotonicMillis();
 }
 
 void SystemStateInfo::CaptureSystemStateSnapshot() {
   // Capture and compute CPU usage
   ReadCurrentProcStat();
   ComputeCpuRatios();
+  int64_t now = MonotonicMillis();
+  ReadCurrentProcNetDev();
+  DCHECK_GT(last_net_update_ms_, 0L);
+  ComputeNetworkUsage(now - last_net_update_ms_);
+  last_net_update_ms_ = now;
 }
 
 int64_t SystemStateInfo::ParseInt64(const string& val) const {
@@ -49,21 +65,18 @@ int64_t SystemStateInfo::ParseInt64(const string& val) const {
   return -1;
 }
 
-void SystemStateInfo::ReadFirstLineFromFile(const char* path, string* out) const {
-  ifstream proc_file(path);
-  if (!proc_file.is_open()) {
-    LOG(WARNING) << "Could not open " << path << ": " << GetStrErrMsg() << endl;
+void SystemStateInfo::ReadCurrentProcStat() {
+  string path = "/proc/stat";
+  faststring buf;
+  kudu::Status status = ReadFileToString(Env::Default(), path, &buf);
+  if (!status.ok()) {
+    LOG(WARNING) << "Could not open " << path << ": " << status.message() << endl;
     return;
   }
-  DCHECK(proc_file.is_open());
-  DCHECK(out != nullptr);
-  getline(proc_file, *out);
-}
-
-void SystemStateInfo::ReadCurrentProcStat() {
-  string line;
-  ReadFirstLineFromFile("/proc/stat", &line);
-  ReadProcStatString(line);
+  StringPiece buf_sp(reinterpret_cast<const char*>(buf.data()), buf.size());
+  vector<StringPiece> lines = strings::Split(buf_sp, "\n");
+  DCHECK_GT(lines.size(), 0);
+  ReadProcStatString(lines[0].ToString());
 }
 
 void SystemStateInfo::ReadProcStatString(const string& stat_string) {
@@ -72,8 +85,8 @@ void SystemStateInfo::ReadProcStatString(const string& stat_string) {
   // Skip the first value ('cpu  ');
   ss.ignore(5);
 
-  cur_val_idx_ = 1 - cur_val_idx_;
-  CpuValues& next_values = cpu_values_[cur_val_idx_];
+  cpu_val_idx_ = 1 - cpu_val_idx_;
+  CpuValues& next_values = cpu_values_[cpu_val_idx_];
 
   for (int i = 0; i < NUM_CPU_VALUES; ++i) {
     int64_t v = -1;
@@ -85,8 +98,8 @@ void SystemStateInfo::ReadProcStatString(const string& stat_string) {
 }
 
 void SystemStateInfo::ComputeCpuRatios() {
-  const CpuValues& cur = cpu_values_[cur_val_idx_];
-  const CpuValues& old = cpu_values_[1 - cur_val_idx_];
+  const CpuValues& cur = cpu_values_[cpu_val_idx_];
+  const CpuValues& old = cpu_values_[1 - cpu_val_idx_];
 
   // Sum up all counters
   int64_t cur_sum = accumulate(cur.begin(), cur.end(), 0L);
@@ -107,4 +120,73 @@ void SystemStateInfo::ComputeCpuRatios() {
   cpu_ratios_.iowait = ((cur[CPU_IOWAIT] - old[CPU_IOWAIT]) * BASIS_MAX) / total_tics;
 }
 
+void SystemStateInfo::ReadCurrentProcNetDev() {
+  string path = "/proc/net/dev";
+  faststring buf;
+  kudu::Status status = ReadFileToString(Env::Default(), path, &buf);
+  if (!status.ok()) {
+    LOG(WARNING) << "Could not open " << path << ": " << status.message() << endl;
+    return;
+  }
+  ReadProcNetDevString(buf.ToString());
+}
+
+void SystemStateInfo::ReadProcNetDevString(const string& dev_string) {
+  stringstream ss(dev_string);
+  string line;
+
+  // Discard the first two lines that contain the header
+  getline(ss, line);
+  getline(ss, line);
+
+  net_val_idx_ = 1 - net_val_idx_;
+  NetworkValues& sum_values = network_values_[net_val_idx_];
+  memset(&sum_values, 0, sizeof(sum_values));
+
+  while (getline(ss, line)) {
+    NetworkValues line_values;
+    ReadProcNetDevLine(line, &line_values);
+    AddNetworkValues(line_values, &sum_values);
+  }
+}
+
+void SystemStateInfo::ReadProcNetDevLine(
+    const string& dev_string, NetworkValues* result) {
+  stringstream ss(dev_string);
+
+  // Read the interface name
+  string if_name;
+  ss >> if_name;
+
+  // Don't include the loopback interface
+  if (HasPrefixString(if_name, "lo:")) {
+    memset(result, 0, sizeof(*result));
+    return;
+  }
+
+  for (int i = 0; i < NUM_NET_VALUES; ++i) {
+    int64_t v = -1;
+    ss >> v;
+    DCHECK_GE(v, 0) << "Value " << i << ": " << v;
+    // Clamp invalid entries at 0
+    (*result)[i] = max(v, 0L);
+  }
+}
+
+void SystemStateInfo::AddNetworkValues(const NetworkValues& a, NetworkValues* b) {
+  for (int i = 0; i < NUM_NET_VALUES; ++i) (*b)[i] += a[i];
+}
+
+void SystemStateInfo::ComputeNetworkUsage(int64_t period_ms) {
+  if (period_ms == 0) return;
+  // Compute network throughput in bytes per second
+  const NetworkValues& cur = network_values_[net_val_idx_];
+  const NetworkValues& old = network_values_[1 - net_val_idx_];
+  int64_t rx_bytes = cur[NET_RX_BYTES] - old[NET_RX_BYTES];
+  network_usage_.rx_rate = rx_bytes * 1000L / period_ms;
+
+  int64_t tx_bytes = cur[NET_TX_BYTES] - old[NET_TX_BYTES];
+  network_usage_.tx_rate = tx_bytes * 1000L / period_ms;
+}
+
 } // namespace impala
diff --git a/be/src/util/system-state-info.h b/be/src/util/system-state-info.h
index 230f868..1b36e0c 100644
--- a/be/src/util/system-state-info.h
+++ b/be/src/util/system-state-info.h
@@ -46,19 +46,29 @@ class SystemStateInfo {
     int32_t system;
     int32_t iowait;
   };
+
   /// Returns a struct containing the CPU usage ratios for the interval between the last
   /// two calls to CaptureSystemStateSnapshot().
   const CpuUsageRatios& GetCpuUsageRatios() { return cpu_ratios_; }
 
+  /// Network usage rates in bytes per second.
+  struct NetworkUsage {
+    int64_t rx_rate;
+    int64_t tx_rate;
+  };
+
+  /// Returns a struct containing the network usage for the interval between the last two
+  /// calls to CaptureSystemStateSnapshot().
+  const NetworkUsage& GetNetworkUsage() { return network_usage_; }
+
  private:
   int64_t ParseInt64(const std::string& val) const;
-  void ReadFirstLineFromFile(const char* path, std::string* out) const;
 
-  /// Rotates 'cur_val_idx_' and reads the current CPU usage values from /proc/stat into
+  /// Rotates 'cpu_val_idx_' and reads the current CPU usage values from /proc/stat into
   /// the current set of values.
   void ReadCurrentProcStat();
 
-  /// Rotates 'cur_val_idx_' and reads the CPU usage values from 'stat_string' into the
+  /// Rotates 'cpu_val_idx_' and reads the CPU usage values from 'stat_string' into the
   /// current set of values.
   void ReadProcStatString(const string& stat_string);
 
@@ -66,6 +76,7 @@ class SystemStateInfo {
   /// CaptureSystemStateSnapshot() and stores the result in 'cpu_ratios_'.
   void ComputeCpuRatios();
 
+  /// The enum names correspond to the fields of /proc/stat.
   enum PROC_STAT_CPU_VALUES {
     CPU_USER = 0,
     CPU_NICE,
@@ -80,15 +91,77 @@ class SystemStateInfo {
   typedef std::array<int64_t, NUM_CPU_VALUES> CpuValues;
   /// Two buffers to keep the current and previous set of CPU usage values.
   CpuValues cpu_values_[2];
-  int cur_val_idx_ = 0;
+  /// Index into cpu_values_ that points to the current set of values. We maintain a
+  /// separate index for CPU and network to be able to update them independently, e.g. in
+  /// tests.
+  int cpu_val_idx_ = 0;
 
   /// The computed CPU usage ratio between the current and previous snapshots in
   /// cpu_values_. Updated in ComputeCpuRatios().
   CpuUsageRatios cpu_ratios_;
 
+  /// The enum names correspond to the fields of /proc/net/dev
+  enum PROC_NET_DEV_VALUES {
+    NET_RX_BYTES = 0,
+    NET_RX_PACKETS,
+    NET_RX_ERRS,
+    NET_RX_DROP,
+    NET_RX_FIFO,
+    NET_RX_FRAME,
+    NET_RX_COMPRESSED,
+    NET_RX_MULTICAST,
+    NET_TX_BYTES,
+    NET_TX_PACKETS,
+    NET_TX_ERRS,
+    NET_TX_DROP,
+    NET_TX_COLLS,
+    NET_TX_CARRIER,
+    NET_TX_COMPRESSED,
+    NUM_NET_VALUES
+  };
+
+  /// We store these values in an array so that we can iterate over them, e.g. when
+  /// reading them from a file or summing them up.
+  typedef std::array<int64_t, NUM_NET_VALUES> NetworkValues;
+  /// Two buffers to keep the current and previous set of network counter values.
+  NetworkValues network_values_[2];
+  /// Index into network_values_ that points to the current set of values. We maintain a
+  /// separate index for CPU and network to be able to update them independently, e.g. in
+  /// tests.
+  int net_val_idx_ = 0;
+
+  /// Rotates net_val_idx_ and reads the current set of values from /proc/net/dev into
+  /// network_values_.
+  void ReadCurrentProcNetDev();
+
+  /// Rotates net_val_idx_ and parses the content of 'dev_string' into network_values_.
+  /// dev_string must be in the format of /proc/net/dev.
+  void ReadProcNetDevString(const string& dev_string);
+
+  /// Parses a single line as they appear in /proc/net/dev into 'result'. Entries are set
+  /// to 0 for the local loopback interface and for invalid entries.
+  void ReadProcNetDevLine(const string& dev_string, NetworkValues* result);
+
+  /// Computes b = b + a.
+  void AddNetworkValues(const NetworkValues& a, NetworkValues* b);
+
+  /// Compute the network usage.
+  void ComputeNetworkUsage(int64_t period_ms);
+
+  /// The compute network usage between the current and previous snapshots in
+  /// network_values_. Updated in ComputeNetworkUsage().
+  NetworkUsage network_usage_;
+
+  /// The last time of reading network usage values from /proc/net/dev. Used by
+  /// CaptureSystemStateSnapshot().
+  int64_t last_net_update_ms_;
+
   FRIEND_TEST(SystemStateInfoTest, ComputeCpuRatios);
   FRIEND_TEST(SystemStateInfoTest, ComputeCpuRatiosIntOverflow);
+  FRIEND_TEST(SystemStateInfoTest, ComputeNetworkUsage);
+  FRIEND_TEST(SystemStateInfoTest, ParseProcNetDevString);
   FRIEND_TEST(SystemStateInfoTest, ParseProcStat);
+  FRIEND_TEST(SystemStateInfoTest, ReadProcNetDev);
   FRIEND_TEST(SystemStateInfoTest, ReadProcStat);
 };
 
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 3c190e2..a1dca7f 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -431,25 +431,29 @@ class TestObservability(ImpalaTestSuite):
     expected_str = "Per Node Profiles:"
     assert any(expected_str in line for line in profile.splitlines())
 
-  def test_query_profile_host_cpu_usage_off(self):
-    """Tests that the query profile does not contain CPU metrics by default or when
-    disabled explicitly."""
+  def test_query_profile_host_resource_metrics_off(self):
+    """Tests that the query profile does not contain resource usage metrics by default or
+       when disabled explicitly."""
     query = "select count(*), sleep(1000) from functional.alltypes"
     for query_opts in [None, {'resource_trace_ratio': 0.0}]:
       profile = self.execute_query(query, query_opts).runtime_profile
-      # Assert that no CPU counters exist in the profile
+      # Assert that no host resource counters exist in the profile
       for line in profile.splitlines():
         assert not re.search("HostCpu.*Percentage", line)
+        assert not re.search("HostNetworkRx", line)
 
-  def test_query_profile_contains_host_cpu_usage(self):
-    """Tests that the query profile contains various CPU metrics."""
+  def test_query_profile_contains_host_resource_metrics(self):
+    """Tests that the query profile contains various CPU and network metrics."""
     query_opts = {'resource_trace_ratio': 1.0}
     query = "select count(*), sleep(1000) from functional.alltypes"
     profile = self.execute_query(query, query_opts).runtime_profile
-    # We check for 500ms because a query with 1s duration won't hit the 64 values limit.
+    # We check for 500ms because a query with 1s duration won't hit the 64 values limit
+    # that would trigger resampling.
     expected_strs = ["HostCpuIoWaitPercentage (500.000ms):",
                      "HostCpuSysPercentage (500.000ms):",
-                     "HostCpuUserPercentage (500.000ms):"]
+                     "HostCpuUserPercentage (500.000ms):",
+                     "HostNetworkRx (500.000ms):",
+                     "HostNetworkTx (500.000ms):"]
 
     # Assert that all expected counters exist in the profile.
     for expected_str in expected_strs:
@@ -482,19 +486,20 @@ class TestObservability(ImpalaTestSuite):
     return thrift_profile
 
   @pytest.mark.execute_serially
-  def test_thrift_profile_contains_cpu_usage(self):
-    """Tests that the thrift profile contains a time series counter for CPU resource
-       usage."""
+  def test_thrift_profile_contains_host_resource_metrics(self):
+    """Tests that the thrift profile contains time series counters for CPU and network
+       resource usage."""
     query_opts = {'resource_trace_ratio': 1.0}
     result = self.execute_query("select sleep(2000)", query_opts)
     thrift_profile = self._get_thrift_profile(result.query_id)
 
-    cpu_key = "HostCpuUserPercentage"
-    cpu_counters = self._find_ts_counters_in_thrift_profile(thrift_profile, cpu_key)
-    # The query will run on a single node, we will only find the counter once.
-    assert len(cpu_counters) == 1
-    cpu_counter = cpu_counters[0]
-    assert len(cpu_counter.values) > 0
+    expected_keys = ["HostCpuUserPercentage", "HostNetworkRx"]
+    for key in expected_keys:
+      counters = self._find_ts_counters_in_thrift_profile(thrift_profile, key)
+      # The query will run on a single node, we will only find the counter once.
+      assert len(counters) == 1
+      counter = counters[0]
+      assert len(counter.values) > 0
 
   @pytest.mark.execute_serially
   def test_query_profile_thrift_timestamps(self):


[impala] 04/04: IMPALA-7800: Time out new connections after --fe_service_threads

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaborkaszab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eb483dddaf8fa8b9143deac23a583a6cb1a6aa27
Author: Zoram Thanga <zo...@cloudera.com>
AuthorDate: Mon Feb 25 11:15:50 2019 -0800

    IMPALA-7800: Time out new connections after --fe_service_threads
    
    The current implementation of the FE thrift server waits
    indefinitely to open the new session, if the maximum number of
    FE service threads specified by --fe_service_threads has been
    allocated.
    
    This patch introduces a startup flag to control how the server
    should treat new connection requests if we have run out of the
    configured number of server threads.
    
    If --accepted_client_cnxn_timeout > 0, new connection requests are
    rejected by the server if we can't get a server thread within
    the specified timeout.
    
    We set the default timeout to be 5 minutes. The old behavior
    can be restored by setting --accepted_client_cnxn_timeout=0,
    i.e., no timeout. The timeout applies only to client facing thrift
    servers, i.e., HS2 and Beeswax servers.
    
    Testing:
    
    Added a new custom cluster test suite to exercise the
    new code.
    
    Ran core and exhaustive tests.
    
    Change-Id: Idb345c1d84cc2f691f54ded467f253e758f87e64
    Reviewed-on: http://gerrit.cloudera.org:8080/12579
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/TAcceptQueueServer.cpp                  | 89 ++++++++++++++--------
 be/src/rpc/TAcceptQueueServer.h                    | 25 +++++-
 be/src/rpc/thrift-server.cc                        |  7 +-
 be/src/rpc/thrift-server.h                         | 18 ++++-
 be/src/service/impala-server.cc                    |  7 ++
 common/thrift/metrics.json                         | 50 ++++++++++++
 .../test_frontend_connection_limit.py              | 89 ++++++++++++++++++++++
 7 files changed, 249 insertions(+), 36 deletions(-)

diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 8f4a01a..8d7babe 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -19,6 +19,7 @@
 // This file was copied from apache::thrift::server::TThreadedServer.cpp v0.9.0, with the
 // significant changes noted inline below.
 
+#include <algorithm>
 #include "rpc/TAcceptQueueServer.h"
 
 #include <thrift/concurrency/PlatformThreadFactory.h>
@@ -125,10 +126,11 @@ TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<TProcessor>& proc
     const boost::shared_ptr<TServerTransport>& serverTransport,
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
-    const boost::shared_ptr<ThreadFactory>& threadFactory,
-    int32_t maxTasks)
+    const boost::shared_ptr<ThreadFactory>& threadFactory, const string& name,
+    int32_t maxTasks, int64_t timeout_ms)
     : TServer(processor, serverTransport, transportFactory, protocolFactory),
-      threadFactory_(threadFactory), maxTasks_(maxTasks) {
+      threadFactory_(threadFactory), name_(name), maxTasks_(maxTasks),
+      queue_timeout_ms_(timeout_ms) {
   init();
 }
 
@@ -142,11 +144,27 @@ void TAcceptQueueServer::init() {
   }
 }
 
+void TAcceptQueueServer::CleanupAndClose(const string& error,
+    shared_ptr<TTransport> input, shared_ptr<TTransport> output,
+    shared_ptr<TTransport> client) {
+  if (input != nullptr) {
+    input->close();
+  }
+  if (output != nullptr) {
+    output->close();
+  }
+  if (client != nullptr) {
+    client->close();
+  }
+  GlobalOutput(error.c_str());
+}
+
 // New.
-void TAcceptQueueServer::SetupConnection(boost::shared_ptr<TTransport> client) {
+void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry) {
   if (metrics_enabled_) queue_size_metric_->Increment(-1);
   shared_ptr<TTransport> inputTransport;
   shared_ptr<TTransport> outputTransport;
+  shared_ptr<TTransport> client = entry->client_;
   try {
     inputTransport = inputTransportFactory_->getTransport(client);
     outputTransport = outputTransportFactory_->getTransport(client);
@@ -170,8 +188,25 @@ void TAcceptQueueServer::SetupConnection(boost::shared_ptr<TTransport> client) {
     // Insert thread into the set of threads
     {
       Synchronized s(tasksMonitor_);
+      int64_t wait_time_ms = 0;
+
       while (maxTasks_ > 0 && tasks_.size() >= maxTasks_) {
-        tasksMonitor_.wait();
+        if (entry->expiration_time_ != 0) {
+          // We don't want wait_time to 'accidentally' go non-positive,
+          // so wait for at least 1ms.
+          wait_time_ms = std::max(1L, entry->expiration_time_ - MonotonicMillis());
+        }
+        LOG_EVERY_N(INFO, 10) << name_ <<": All " << maxTasks_
+                  << " server threads are in use. "
+                  << "Waiting for " << wait_time_ms << " milliseconds.";
+        int wait_result = tasksMonitor_.waitForTimeRelative(wait_time_ms);
+        if (wait_result == THRIFT_ETIMEDOUT) {
+          if (metrics_enabled_) timedout_cnxns_metric_->Increment(1);
+          LOG(INFO) << name_ << ": Server busy. Timing out connection request.";
+          string errStr = "TAcceptQueueServer: " + name_ + " server busy";
+          CleanupAndClose(errStr, inputTransport, outputTransport, client);
+          return;
+        }
       }
       tasks_.insert(task);
     }
@@ -179,29 +214,11 @@ void TAcceptQueueServer::SetupConnection(boost::shared_ptr<TTransport> client) {
     // Start the thread!
     thread->start();
   } catch (TException& tx) {
-    if (inputTransport != nullptr) {
-      inputTransport->close();
-    }
-    if (outputTransport != nullptr) {
-      outputTransport->close();
-    }
-    if (client != nullptr) {
-      client->close();
-    }
     string errStr = string("TAcceptQueueServer: Caught TException: ") + tx.what();
-    GlobalOutput(errStr.c_str());
+    CleanupAndClose(errStr, inputTransport, outputTransport, client);
   } catch (string s) {
-    if (inputTransport != nullptr) {
-      inputTransport->close();
-    }
-    if (outputTransport != nullptr) {
-      outputTransport->close();
-    }
-    if (client != nullptr) {
-      client->close();
-    }
     string errStr = "TAcceptQueueServer: Unknown exception: " + s;
-    GlobalOutput(errStr.c_str());
+    CleanupAndClose(errStr, inputTransport, outputTransport, client);
   }
 }
 
@@ -218,10 +235,12 @@ void TAcceptQueueServer::serve() {
     LOG(INFO) << "connection_setup_thread_pool_size is set to "
               << FLAGS_accepted_cnxn_setup_thread_pool_size;
   }
+
   // New - this is the thread pool used to process the internal accept queue.
-  ThreadPool<shared_ptr<TTransport>> connection_setup_pool("setup-server", "setup-worker",
-      FLAGS_accepted_cnxn_setup_thread_pool_size, FLAGS_accepted_cnxn_queue_depth,
-      [this](int tid, const shared_ptr<TTransport>& item) {
+  ThreadPool<shared_ptr<TAcceptQueueEntry>> connection_setup_pool("setup-server",
+      "setup-worker", FLAGS_accepted_cnxn_setup_thread_pool_size,
+      FLAGS_accepted_cnxn_queue_depth,
+      [this](int tid, const shared_ptr<TAcceptQueueEntry>& item) {
         this->SetupConnection(item);
       });
   // Initialize the thread pool
@@ -238,8 +257,15 @@ void TAcceptQueueServer::serve() {
       // Fetch client from server
       shared_ptr<TTransport> client = serverTransport_->accept();
 
-      // New - the work done to setup the connection has been moved to SetupConnection.
-      if (!connection_setup_pool.Offer(std::move(client))) {
+      shared_ptr<TAcceptQueueEntry> entry{new TAcceptQueueEntry};
+      entry->client_ = client;
+      if (queue_timeout_ms_ > 0) {
+        entry->expiration_time_ = MonotonicMillis() + queue_timeout_ms_;
+      }
+
+      // New - the work done to set up the connection has been moved to SetupConnection.
+      // Note that we move() entry so it's owned by SetupConnection thread.
+      if (!connection_setup_pool.Offer(std::move(entry))) {
         string errStr = string("TAcceptQueueServer: thread pool unexpectedly shut down.");
         GlobalOutput(errStr.c_str());
         stop_ = true;
@@ -292,6 +318,9 @@ void TAcceptQueueServer::InitMetrics(MetricGroup* metrics, const string& key_pre
   stringstream queue_size_ss;
   queue_size_ss << key_prefix << ".connection-setup-queue-size";
   queue_size_metric_ = metrics->AddGauge(queue_size_ss.str(), 0);
+  stringstream timedout_cnxns_ss;
+  timedout_cnxns_ss << key_prefix << ".timedout-cnxn-requests";
+  timedout_cnxns_metric_ = metrics->AddGauge(timedout_cnxns_ss.str(), 0);
   metrics_enabled_ = true;
 }
 
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
index 4d066ca..f5f281f 100644
--- a/be/src/rpc/TAcceptQueueServer.h
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -41,6 +41,11 @@ using apache::thrift::transport::TTransportFactory;
 using apache::thrift::concurrency::Monitor;
 using apache::thrift::concurrency::ThreadFactory;
 
+struct TAcceptQueueEntry {
+  boost::shared_ptr<TTransport> client_;
+  int64_t expiration_time_ = 0LL;
+};
+
 /**
  * In TAcceptQueueServer, the main server thread calls accept() and then immediately
  * places the returned TTransport on a queue to be processed by a separate thread,
@@ -57,7 +62,7 @@ class TAcceptQueueServer : public TServer {
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
       const boost::shared_ptr<ThreadFactory>& threadFactory,
-      int32_t maxTasks = 0);
+      const std::string& name, int32_t maxTasks = 0, int64_t timeout_ms = 0);
 
   ~TAcceptQueueServer() override = default;
 
@@ -78,11 +83,20 @@ class TAcceptQueueServer : public TServer {
   // This is the work function for the thread pool, which does the work of setting up the
   // connection and starting a thread to handle it. Will block if there are currently
   // maxTasks_ connections and maxTasks_ is non-zero.
-  void SetupConnection(boost::shared_ptr<TTransport> client);
+  void SetupConnection(boost::shared_ptr<TAcceptQueueEntry> entry);
+
+  // Helper function to close a client connection in case of server side errors.
+  void CleanupAndClose(const std::string& error,
+      boost::shared_ptr<TTransport> input,
+      boost::shared_ptr<TTransport> output,
+      boost::shared_ptr<TTransport> client);
 
   boost::shared_ptr<ThreadFactory> threadFactory_;
   volatile bool stop_;
 
+  /// Name of the thrift server.
+  const std::string name_;
+
   // Monitor protecting tasks_, notified on removal.
   Monitor tasksMonitor_;
   std::set<Task*> tasks_;
@@ -95,6 +109,13 @@ class TAcceptQueueServer : public TServer {
 
   /// New - Number of connections that have been accepted and are waiting to be setup.
   impala::IntGauge* queue_size_metric_;
+
+  /// Number of connections rejected due to timeout.
+  impala::IntGauge* timedout_cnxns_metric_;
+
+  /// Amount of time in milliseconds after which a connection request will be timed out.
+  /// Default value is 0, which means no timeout.
+  int64_t queue_timeout_ms_;
 };
 
 } // namespace server
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 8542493..070e665 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -321,11 +321,12 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext
 
 ThriftServer::ThriftServer(const string& name,
     const boost::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider,
-    MetricGroup* metrics, int max_concurrent_connections)
+    MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms)
   : started_(false),
     port_(port),
     ssl_enabled_(false),
     max_concurrent_connections_(max_concurrent_connections),
+    queue_timeout_ms_(queue_timeout_ms),
     name_(name),
     server_(NULL),
     processor_(processor),
@@ -480,8 +481,10 @@ Status ThriftServer::Start() {
   boost::shared_ptr<TTransportFactory> transport_factory;
   RETURN_IF_ERROR(CreateSocket(&server_socket));
   RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(&transport_factory));
+
   server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
-      protocol_factory, thread_factory, max_concurrent_connections_));
+      protocol_factory, thread_factory, name_, max_concurrent_connections_,
+      queue_timeout_ms_));
   if (metrics_ != NULL) {
     (static_cast<TAcceptQueueServer*>(server_.get()))->InitMetrics(metrics_,
         Substitute("impala.thrift-server.$0", name_));
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 866d9c5..e7d861b 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -142,10 +142,13 @@ class ThriftServer {
   ///  - metrics: if not nullptr, the server will register metrics on this object
   ///  - max_concurrent_connections: The maximum number of concurrent connections allowed.
   ///    If 0, there will be no enforced limit on the number of concurrent connections.
+  ///  - amount of time in milliseconds an accepted client connection will be held in
+  ///    the accepted queue, after which the request will be rejected if a server
+  ///    thread can't be found. If 0, no timeout is enforced.
   ThriftServer(const std::string& name,
       const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
       AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr,
-      int max_concurrent_connections = 0);
+      int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0);
 
   /// Enables secure access over SSL. Must be called before Start(). The first three
   /// arguments are the minimum SSL/TLS version, and paths to certificate and private key
@@ -193,6 +196,11 @@ class ThriftServer {
   /// limit.
   int max_concurrent_connections_;
 
+  /// Amount of time in milliseconds an accepted client connection will be kept in the
+  /// accept queue before it is timed out. If 0, there is no timeout.
+  /// Used in TAcceptQueueServer.
+  int64_t queue_timeout_ms_;
+
   /// User-specified identifier that shows up in logs
   const std::string name_;
 
@@ -266,6 +274,11 @@ class ThriftServerBuilder {
     return *this;
   }
 
+  ThriftServerBuilder& queue_timeout(int64_t timeout_ms) {
+    queue_timeout_ms_ = timeout_ms;
+    return *this;
+  }
+
   /// Enables SSL for this server.
   ThriftServerBuilder& ssl(
       const std::string& certificate, const std::string& private_key) {
@@ -301,7 +314,7 @@ class ThriftServerBuilder {
   /// '*server'.
   Status Build(ThriftServer** server) {
     std::unique_ptr<ThriftServer> ptr(new ThriftServer(name_, processor_, port_,
-        auth_provider_, metrics_, max_concurrent_connections_));
+        auth_provider_, metrics_, max_concurrent_connections_, queue_timeout_ms_));
     if (enable_ssl_) {
       RETURN_IF_ERROR(ptr->EnableSsl(
           version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
@@ -311,6 +324,7 @@ class ThriftServerBuilder {
   }
 
  private:
+  int64_t queue_timeout_ms_ = 0;
   int max_concurrent_connections_ = 0;
   std::string name_;
   boost::shared_ptr<apache::thrift::TProcessor> processor_;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index fda10bf..5434282 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -236,6 +236,11 @@ DEFINE_int64(shutdown_deadline_s, 60 * 60, "Default time limit in seconds for th
       "milliseconds. Only used for testing.");
 #endif
 
+DEFINE_int64(accepted_client_cnxn_timeout, 300000,
+    "(Advanced) The amount of time in milliseconds an accepted connection will wait in "
+    "the post-accept, pre-setup connection queue before it is timed out and the "
+    "connection request is rejected. A value of 0 means there is no timeout.");
+
 DECLARE_bool(compact_catalog_topic);
 
 namespace impala {
@@ -2324,6 +2329,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
           builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
           .metrics(exec_env_->metrics())
           .max_concurrent_connections(FLAGS_fe_service_threads)
+          .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
           .Build(&server));
       beeswax_server_.reset(server);
       beeswax_server_->SetConnectionHandler(this);
@@ -2351,6 +2357,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
           builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
           .metrics(exec_env_->metrics())
           .max_concurrent_connections(FLAGS_fe_service_threads)
+          .queue_timeout(FLAGS_accepted_client_cnxn_timeout)
           .Build(&server));
       hs2_server_.reset(server);
       hs2_server_->SetConnectionHandler(this);
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index b18bc1f..1cdca7b 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -692,6 +692,16 @@
     "key": "impala.thrift-server.CatalogService.connection-setup-queue-size"
   },
   {
+    "description": "The number of connection requests to the Catalog Service that have been timed out waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Catalog Service Connection Requests Timed Out",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.CatalogService.timedout-cnxn-requests"
+  },
+  {
     "description": "The number of active connections to this StateStore's StateStore service.",
     "contexts": [
       "STATESTORE"
@@ -722,6 +732,16 @@
     "key": "impala.thrift-server.StatestoreService.connection-setup-queue-size"
   },
   {
+    "description": "The number of connection requests to the Statestore Service that have been timed out waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Statestore Service Connection Requests Timed Out",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.StatestoreService.timedout-cnxn-requests"
+  },
+  {
     "description": "The number of active Impala Backend client connections to this Impala Daemon.",
     "contexts": [
       "IMPALAD"
@@ -752,6 +772,16 @@
     "key": "impala.thrift-server.backend.connection-setup-queue-size"
   },
   {
+    "description": "The number of connection requests to the Impala Backend Server that have been timed out waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Backend Server Connection Requests Timed Out",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.backend.timedout-cnxn-requests"
+  },
+  {
     "description": "The number of active Beeswax API connections to this Impala Daemon.",
     "contexts": [
       "IMPALAD"
@@ -782,6 +812,16 @@
     "key": "impala.thrift-server.beeswax-frontend.connection-setup-queue-size"
   },
   {
+    "description": "The number of Beeswax API connection requests to this Impala Daemon that have been timed out waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Beeswax API Connection Requests Timed Out",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.beeswax-frontend.timedout-cnxn-requests"
+  },
+  {
     "description": "The number of active HiveServer2 API connections to this Impala Daemon.",
     "contexts": [
       "IMPALAD"
@@ -812,6 +852,16 @@
     "key": "impala.thrift-server.hiveserver2-frontend.connection-setup-queue-size"
   },
   {
+    "description": "The number of HiveServer2 API connection requests to this Impala Daemon that have been timed out waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 API Connection Requests Timed Out",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.hiveserver2-frontend.timedout-cnxn-requests"
+  },
+  {
     "description": "The amount of memory freed by the last memory tracker garbage collection.",
     "contexts": [
       "IMPALAD"
diff --git a/tests/custom_cluster/test_frontend_connection_limit.py b/tests/custom_cluster/test_frontend_connection_limit.py
new file mode 100644
index 0000000..54d1a7f
--- /dev/null
+++ b/tests/custom_cluster/test_frontend_connection_limit.py
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from threading import Thread
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+
+# This custom cluster test exercises the behavior of the front end thrift
+# server on how a new client connection request is handled, after the maximum
+# number of front end service threads (--fe_service_threads) has been
+# allocated. If "--accepted_client_cnxn_timeout" > 0, new connection
+# requests are rejected if they wait in the accepted queue for more than the
+# the specified timeout.
+# See IMPALA-7800.
+
+
+class TestFrontendConnectionLimit(CustomClusterTestSuite):
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestFrontendConnectionLimit, cls).add_test_dimensions()
+
+  def _connect_and_query(self, query, impalad):
+    client = impalad.service.create_beeswax_client()
+    try:
+      client.execute(query)
+    except Exception as e:
+      client.close()
+      raise ImpalaBeeswaxException(e.message)
+    client.close()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--fe_service_threads=1 --accepted_client_cnxn_timeout=0")
+  def test_no_connection_is_rejected(self):
+    """ IMPALA-7800: New connection request should not be rejected if
+        --accepted_client_cnxn_timeout=0"""
+
+    query = "select sleep(2000)"
+    impalad = self.cluster.get_any_impalad()
+    q1 = Thread(target=self._connect_and_query, args=(query, impalad,))
+    q2 = Thread(target=self._connect_and_query, args=(query, impalad,))
+    q1.start()
+    q2.start()
+    q1.join()
+    q2.join()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--fe_service_threads=1 --accepted_client_cnxn_timeout=5000")
+  def test_server_busy(self):
+    """ IMPALA-7800: Reject new incoming connections if --accepted_client_cnxn_timeout > 0
+        and the request spent too much time waiting in the accepted queue."""
+
+    client = self.create_impala_client()
+    client.execute_async("select sleep(7000)")
+
+    # This step should fail to open a session.
+    # create_impala_client() does not throw an error on connection failure
+    # The only way to detect the connection is invalid is to perform a
+    # query in it
+    client1 = self.create_impala_client()
+    caught_exception = False
+    try:
+      client1.execute("select sleep(8000)")
+    except Exception:
+      caught_exception = True
+    client.close()
+    assert caught_exception, 'Query on client1 did not fail as expected'


[impala] 02/04: IMPALA-8312 : Alter database operations have race condition

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaborkaszab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b2a87797a8be076a8e57a91e8db2692ca693e2f3
Author: Vihang Karajgaonkar <vi...@cloudera.com>
AuthorDate: Thu Mar 14 16:56:26 2019 -0700

    IMPALA-8312 : Alter database operations have race condition
    
    This patch fixes a race condition in the alter database implementation
    in the catalogOpExecutor. The original implementation did a in-place
    modification of the metastore database object in the Db. This can lead
    to partially updated database object becoming visible to a reading
    thread causing potential problems. In order to fix the race, the
    patch makes a copy of the existing database object, modifies the copy
    and then atomically switches the actual database object with the
    modified copy. This is done while holding the metastoreddlLock, and
    then taking the write lock on the catalog version object which makes
    it consistent with the other catalog write operations.
    
    Added a test which consistently reproduces the race. The test creating
    many reader threads and a writer thread which continuously keeps
    changing the owner name and its type by issuing a alter database
    operation. The test fails without the patch. After the patch the test
    passes. The race also applies to the alter database set comment
    operation, although its hard to write a test for that code-path.
    
    Change-Id: I32c8c96a6029bf9d9db37ea8315f6c9603b5a2fc
    Reviewed-on: http://gerrit.cloudera.org:8080/12789
    Reviewed-by: Fredy Wijaya <fw...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      |  29 ++-
 fe/src/main/java/org/apache/impala/catalog/Db.java |  35 ++-
 .../apache/impala/service/CatalogOpExecutor.java   |  50 ++---
 .../apache/impala/catalog/AlterDatabaseTest.java   | 241 +++++++++++++++++++++
 4 files changed, 314 insertions(+), 41 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 25b0b96..99689e4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.analysis.TableName;
@@ -972,6 +973,32 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Updates the Db with the given metastore database object. Useful to doing in-place
+   * updates to the HMS db like in case of changing owner, adding comment or setting
+   * certain properties
+   * @param msDb The HMS database object to be used to update
+   * @return The updated Db object
+   * @throws DatabaseNotFoundException if Db with the name provided by given Database
+   * is not found in Catalog
+   */
+  public Db updateDb(Database msDb) throws DatabaseNotFoundException {
+    Preconditions.checkNotNull(msDb);
+    Preconditions.checkNotNull(msDb.getName());
+    versionLock_.writeLock().lock();
+    try {
+      Db db = getDb(msDb.getName());
+      if (db == null) {
+        throw new DatabaseNotFoundException("Database " + msDb.getName() + " not found");
+      }
+      db.setMetastoreDb(msDb.getName(), msDb);
+      db.setCatalogVersion(incrementAndGetCatalogVersion());
+      return db;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
    * Adds a table in the topic update if its version is in the range
    * ('ctx.fromVersion', 'ctx.toVersion']. If the table's version is larger than
    * 'ctx.toVersion' and the table has skipped a topic update
@@ -1219,7 +1246,7 @@ public class CatalogServiceCatalog extends Catalog {
       // Contains native functions in it's params map.
       org.apache.hadoop.hive.metastore.api.Database msDb =
           msClient.getHiveClient().getDatabase(dbName);
-      tmpDb = new Db(dbName, null);
+      tmpDb = new Db(dbName, msDb);
       // Load native UDFs into the temporary db.
       loadFunctionsFromDbParams(tmpDb, msDb);
       // Load Java UDFs from HMS into the temporary db.
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 5878b95..cc1a569 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.KuduPartitionParam;
 import org.apache.impala.common.ImpalaException;
@@ -66,7 +68,10 @@ import com.google.common.collect.Lists;
  */
 public class Db extends CatalogObjectImpl implements FeDb {
   private static final Logger LOG = LoggerFactory.getLogger(Db.class);
-  private final TDatabase thriftDb_;
+  // TODO: We should have a consistent synchronization model for Db and Table
+  // Right now, we synchronize functions and thriftDb_ object in-place and do
+  // not take read lock on catalogVersion. See IMPALA-8366 for details
+  private final AtomicReference<TDatabase> thriftDb_ = new AtomicReference<>();
 
   public static final String FUNCTION_INDEX_PREFIX = "impala_registered_function_";
 
@@ -91,9 +96,8 @@ public class Db extends CatalogObjectImpl implements FeDb {
   private boolean isSystemDb_ = false;
 
   public Db(String name, org.apache.hadoop.hive.metastore.api.Database msDb) {
-    thriftDb_ = new TDatabase(name.toLowerCase());
-    thriftDb_.setMetastore_db(msDb);
-    tableCache_ = new CatalogObjectCache<Table>();
+    setMetastoreDb(name, msDb);
+    tableCache_ = new CatalogObjectCache<>();
     functions_ = new HashMap<>();
   }
 
@@ -110,7 +114,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
    * Updates the hms parameters map by adding the input <k,v> pair.
    */
   private void putToHmsParameters(String k, String v) {
-    org.apache.hadoop.hive.metastore.api.Database msDb = thriftDb_.metastore_db;
+    org.apache.hadoop.hive.metastore.api.Database msDb = thriftDb_.get().metastore_db;
     Preconditions.checkNotNull(msDb);
     Map<String, String> hmsParams = msDb.getParameters();
     if (hmsParams == null) hmsParams = new HashMap<>();
@@ -124,7 +128,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
    * corresponding to input k and it is removed, false otherwise.
    */
   private boolean removeFromHmsParameters(String k) {
-    org.apache.hadoop.hive.metastore.api.Database msDb = thriftDb_.metastore_db;
+    org.apache.hadoop.hive.metastore.api.Database msDb = thriftDb_.get().metastore_db;
     Preconditions.checkNotNull(msDb);
     if (msDb.getParameters() == null) return false;
     return msDb.getParameters().remove(k) != null;
@@ -133,9 +137,9 @@ public class Db extends CatalogObjectImpl implements FeDb {
   @Override // FeDb
   public boolean isSystemDb() { return isSystemDb_; }
   @Override // FeDb
-  public TDatabase toThrift() { return thriftDb_; }
+  public TDatabase toThrift() { return thriftDb_.get(); }
   @Override // FeDb
-  public String getName() { return thriftDb_.getDb_name(); }
+  public String getName() { return thriftDb_.get().getDb_name(); }
   @Override
   public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.DATABASE; }
 
@@ -193,7 +197,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
 
   @Override // FeDb
   public org.apache.hadoop.hive.metastore.api.Database getMetaStoreDb() {
-    return thriftDb_.getMetastore_db();
+    return thriftDb_.get().getMetastore_db();
   }
 
   @Override // FeDb
@@ -465,4 +469,17 @@ public class Db extends CatalogObjectImpl implements FeDb {
     }
     return resp;
   }
+
+  /**
+   * Replaces the metastore db object of this Db with the given Metastore Database object
+   * @param msDb
+   */
+  public void setMetastoreDb(String name, Database msDb) {
+    Preconditions.checkNotNull(name);
+    Preconditions.checkNotNull(msDb);
+    // create the TDatabase first before atomically replacing setting it in the thriftDb_
+    TDatabase tDatabase = new TDatabase(name.toLowerCase());
+    tDatabase.setMetastore_db(msDb);
+    thriftDb_.set(tDatabase);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 705aa88..9b1e7ee 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1212,7 +1212,7 @@ public class CatalogOpExecutor {
       } else {
         if (catalog_.addFunction(fn)) {
           // Flush DB changes to metastore
-          applyAlterDatabase(catalog_.getDb(fn.dbName()));
+          applyAlterDatabase(db.getMetaStoreDb());
           addedFunctions.add(fn.toTCatalogObject());
         }
       }
@@ -1745,7 +1745,7 @@ public class CatalogOpExecutor {
           }
         } else {
           // Flush DB changes to metastore
-          applyAlterDatabase(catalog_.getDb(fn.dbName()));
+          applyAlterDatabase(db.getMetaStoreDb());
           removedFunctions.add(fn.toTCatalogObject());
         }
       }
@@ -3122,10 +3122,10 @@ public class CatalogOpExecutor {
   /**
    * Updates the database object in the metastore.
    */
-  private void applyAlterDatabase(Db db)
+  private void applyAlterDatabase(Database msDb)
       throws ImpalaRuntimeException {
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      msClient.getHiveClient().alterDatabase(db.getName(), db.getMetaStoreDb());
+      msClient.getHiveClient().alterDatabase(msDb.getName(), msDb);
     } catch (TException e) {
       throw new ImpalaRuntimeException(
           String.format(HMS_RPC_ERROR_FORMAT_STR, "alterDatabase"), e);
@@ -3753,17 +3753,16 @@ public class CatalogOpExecutor {
       throw new CatalogException("Database: " + dbName + " does not exist.");
     }
     synchronized (metastoreDdlLock_) {
-      Database msDb = db.getMetaStoreDb();
-      String originalComment = msDb.getDescription();
+      Database msDb = db.getMetaStoreDb().deepCopy();
       msDb.setDescription(comment);
       try {
-        applyAlterDatabase(db);
+        applyAlterDatabase(msDb);
       } catch (ImpalaRuntimeException e) {
-        msDb.setDescription(originalComment);
         throw e;
       }
+      Db updatedDb = catalog_.updateDb(msDb);
+      addDbToCatalogUpdate(updatedDb, response.result);
     }
-    addDbToCatalogUpdate(db, response.result);
     addSummary(response, "Updated database.");
   }
 
@@ -3788,45 +3787,34 @@ public class CatalogOpExecutor {
     Preconditions.checkNotNull(params.owner_name);
     Preconditions.checkNotNull(params.owner_type);
     synchronized (metastoreDdlLock_) {
-      Database msDb = db.getMetaStoreDb();
+      Database msDb = db.getMetaStoreDb().deepCopy();
       String originalOwnerName = msDb.getOwnerName();
       PrincipalType originalOwnerType = msDb.getOwnerType();
       msDb.setOwnerName(params.owner_name);
       msDb.setOwnerType(PrincipalType.valueOf(params.owner_type.name()));
       try {
-        applyAlterDatabase(db);
+        applyAlterDatabase(msDb);
       } catch (ImpalaRuntimeException e) {
-        msDb.setOwnerName(originalOwnerName);
-        msDb.setOwnerType(originalOwnerType);
         throw e;
       }
       if (authzConfig_.isEnabled()) {
         authzManager_.updateDatabaseOwnerPrivilege(params.server_name, db.getName(),
-            originalOwnerName, originalOwnerType, db.getMetaStoreDb().getOwnerName(),
-            db.getMetaStoreDb().getOwnerType(), response);
+            originalOwnerName, originalOwnerType, msDb.getOwnerName(),
+            msDb.getOwnerType(), response);
       }
+      Db updatedDb = catalog_.updateDb(msDb);
+      addDbToCatalogUpdate(updatedDb, response.result);
     }
-    addDbToCatalogUpdate(db, response.result);
     addSummary(response, "Updated database.");
   }
 
   private void addDbToCatalogUpdate(Db db, TCatalogUpdateResult result) {
     Preconditions.checkNotNull(db);
-    // Updating the new catalog version and setting it to the DB catalog version while
-    // holding the catalog version lock for an atomic operation. Most DB operations are
-    // short-lived. It is unnecessary to have a fine-grained DB lock.
-    catalog_.getLock().writeLock().lock();
-    try {
-      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
-      db.setCatalogVersion(newCatalogVersion);
-      TCatalogObject updatedCatalogObject = db.toTCatalogObject();
-      updatedCatalogObject.setCatalog_version(newCatalogVersion);
-      // TODO(todd): if client is a 'v2' impalad, only send back invalidation
-      result.addToUpdated_catalog_objects(updatedCatalogObject);
-      result.setVersion(updatedCatalogObject.getCatalog_version());
-    } finally {
-      catalog_.getLock().writeLock().unlock();
-    }
+    TCatalogObject updatedCatalogObject = db.toTCatalogObject();
+    updatedCatalogObject.setCatalog_version(updatedCatalogObject.getCatalog_version());
+    // TODO(todd): if client is a 'v2' impalad, only send back invalidation
+    result.addToUpdated_catalog_objects(updatedCatalogObject);
+    result.setVersion(updatedCatalogObject.getCatalog_version());
   }
 
   private void alterCommentOnTableOrView(TableName tableName, String comment,
diff --git a/fe/src/test/java/org/apache/impala/catalog/AlterDatabaseTest.java b/fe/src/test/java/org/apache/impala/catalog/AlterDatabaseTest.java
new file mode 100644
index 0000000..5313d22
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/AlterDatabaseTest.java
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.impala.authorization.NoneAuthorizationFactory;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.CatalogOpExecutor;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.ImpaladTestCatalog;
+import org.apache.impala.thrift.TAlterDbParams;
+import org.apache.impala.thrift.TAlterDbSetOwnerParams;
+import org.apache.impala.thrift.TAlterDbType;
+import org.apache.impala.thrift.TCreateDbParams;
+import org.apache.impala.thrift.TDdlExecRequest;
+import org.apache.impala.thrift.TDdlType;
+import org.apache.impala.thrift.TDropDbParams;
+import org.apache.impala.thrift.TOwnerType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test issues concurrent alter database operations to reproduce the race described in
+ * IMPALA-8312
+ */
+public class AlterDatabaseTest {
+  private static final String TEST_OWNER_1 = "user_1";
+  private static final PrincipalType TEST_TYPE_1 = PrincipalType.USER;
+
+  private static final String TEST_OWNER_2 = "user_2";
+  private static final PrincipalType TEST_TYPE_2 = PrincipalType.ROLE;
+
+  private static ImpaladTestCatalog catalog_;
+  private static CatalogOpExecutor catalogOpExecutor_;
+  private static final String TEST_ALTER_DB = "testAlterdb";
+  // number of reader threads which query the test database
+  private static final int NUM_READERS = 10;
+  // number of writer threads which change the database. We only need one currently
+  // since all the alterDatabase calls are serialized by metastoreDdlLock_ in
+  // CatalogOpExecutor
+  private static final int NUM_WRITERS = 1;
+
+  // barrier to make sure that readers and writers start at the same time
+  private final CyclicBarrier barrier_ =
+      new CyclicBarrier(NUM_READERS + NUM_WRITERS);
+  // toggle switch to change the database owner from user_1 to user_2 in each
+  // consecutive alter database call
+  private final AtomicBoolean toggler_ = new AtomicBoolean(false);
+
+  /**
+   * Sets up the test class by instantiating the catalog service
+   * @throws ImpalaException
+   */
+  @BeforeClass
+  public static void setUpTest() {
+    catalog_ = new ImpaladTestCatalog(CatalogServiceTestCatalog.create());
+    catalogOpExecutor_ =
+        new CatalogOpExecutor(catalog_.getSrcCatalog(), new NoneAuthorizationFactory());
+  }
+
+  /**
+   * Clean-up the database once the test completes
+   * @throws ImpalaException
+   */
+  @After
+  public void cleanUp() throws ImpalaException {
+    catalogOpExecutor_.execDdlRequest(dropDbRequest());
+  }
+
+  @Before
+  public void setUpDatabase() throws ImpalaException {
+    // cleanup and recreate any pre-existing testdb
+    catalogOpExecutor_.execDdlRequest(dropDbRequest());
+    catalogOpExecutor_.execDdlRequest(createDbRequest());
+    Db db = catalog_.getDb(TEST_ALTER_DB);
+    assertNotNull(db);
+    catalogOpExecutor_.execDdlRequest(getNextDdlRequest());
+    assertNotNull(catalog_.getDb(TEST_ALTER_DB));
+    String owner = db.getMetaStoreDb().getOwnerName();
+    assertTrue(owner.equals(TEST_OWNER_1) || owner.equals(TEST_OWNER_2));
+  }
+
+  /**
+   * Drops the test db from the test catalog
+   */
+  private static TDdlExecRequest dropDbRequest() {
+    TDdlExecRequest request = new TDdlExecRequest();
+    request.setDdl_type(TDdlType.DROP_DATABASE);
+    TDropDbParams dropDbParams = new TDropDbParams();
+    dropDbParams.setDb(TEST_ALTER_DB);
+    dropDbParams.setIf_exists(true);
+    dropDbParams.setCascade(true);
+    request.setDrop_db_params(dropDbParams);
+    return request;
+  }
+
+  /**
+   * Creates the test db in the catalog. Sets the owner to <code>TEST_OWNER_1</code>
+   */
+  private static TDdlExecRequest createDbRequest() {
+    TDdlExecRequest request = new TDdlExecRequest();
+    request.setDdl_type(TDdlType.CREATE_DATABASE);
+    TCreateDbParams createDbParams = new TCreateDbParams();
+    createDbParams.setDb(TEST_ALTER_DB);
+    createDbParams.setComment("test comment");
+    createDbParams.setOwner(TEST_OWNER_1);
+    request.setCreate_db_params(createDbParams);
+    return request;
+  }
+
+  /**
+   * Reader task to be used by the read threads. Calls into Catalog and validates if the
+   * owner and ownerType is valid
+   */
+  private class ValidateDbOwnerTask implements Callable<Void> {
+    @Override
+    public Void call() throws Exception {
+      barrier_.await();
+      for (int i = 0; i < 100; i++) {
+        Db testDb = catalog_.getDb(TEST_ALTER_DB);
+        validateOwner(testDb.getMetaStoreDb());
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Writer task to be used by the write threads. The task loops and issues many alter
+   * database set owner requests. Each call flips the owner and owner type from user_1
+   * (PrincipleType.USER) to user_2 (PrincipleType.ROLE) and vice versa.
+   */
+  private class SetOwnerTask implements Callable<Void> {
+    @Override
+    public Void call() throws Exception {
+      barrier_.await();
+      for (int i = 0; i < 100; i++) {
+        catalogOpExecutor_.execDdlRequest(getNextDdlRequest());
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Test creates multiple reader and writer threads which operate on the test database.
+   * The readers fetch the Db object and validate its owner information while the writer
+   * thread changes the owner of the database concurrently
+   */
+  @Test
+  public void testConcurrentAlterDbOps() throws Exception {
+    ExecutorService threadPool = Executors.newFixedThreadPool(NUM_READERS + NUM_WRITERS);
+    List<Future<Void>> results = new ArrayList<>(NUM_READERS + NUM_WRITERS);
+    for (int i = 0; i < NUM_WRITERS; i++) {
+      results.add(threadPool.submit(new SetOwnerTask()));
+    }
+    for (int i = 0; i < NUM_READERS; i++) {
+      results.add(threadPool.submit(new ValidateDbOwnerTask()));
+    }
+    try {
+      for (Future<Void> result : results) {
+        result.get(100, TimeUnit.SECONDS);
+      }
+    } finally {
+      threadPool.shutdownNow();
+    }
+  }
+
+  /**
+   * Creates ddl request to alter database set owner. Each invocation changes the owner
+   * from user_1 to user_2 and vice-versa.
+   */
+  private TDdlExecRequest getNextDdlRequest() {
+    TAlterDbSetOwnerParams alterDbSetOwnerParams = new TAlterDbSetOwnerParams();
+    if (toggler_.get()) {
+      alterDbSetOwnerParams.setOwner_name(TEST_OWNER_1);
+      alterDbSetOwnerParams.setOwner_type(TOwnerType.findByValue(0));
+      assertTrue(toggler_.compareAndSet(true, false));
+    } else {
+      alterDbSetOwnerParams.setOwner_name(TEST_OWNER_2);
+      alterDbSetOwnerParams.setOwner_type(TOwnerType.findByValue(1));
+      assertTrue(toggler_.compareAndSet(false, true));
+    }
+    TAlterDbParams alterDbParams = new TAlterDbParams();
+    alterDbParams.setDb(TEST_ALTER_DB);
+    alterDbParams.setAlter_type(TAlterDbType.SET_OWNER);
+    alterDbParams.setSet_owner_params(alterDbSetOwnerParams);
+    TDdlExecRequest request = new TDdlExecRequest();
+    request.setDdl_type(TDdlType.ALTER_DATABASE);
+    request.setAlter_db_params(alterDbParams);
+    return request;
+  }
+
+  /**
+   * Validates the owner information of the database. Makes sure that if the owner is
+   * user_1 its type is USER and if the owner is user_2 its type is ROLE
+   */
+  private void validateOwner(Database msDb) {
+    assertNotNull(msDb.getOwnerName());
+    assertNotNull(msDb.getOwnerType());
+    if (TEST_OWNER_1.equals(msDb.getOwnerName())) {
+      assertEquals("Owner " + TEST_OWNER_1 + " should have the type " + TEST_TYPE_1,
+          msDb.getOwnerType(), TEST_TYPE_1);
+    } else if (TEST_OWNER_2.equals(msDb.getOwnerName())) {
+      assertEquals("Owner " + TEST_OWNER_2 + " should have the type " + TEST_TYPE_2,
+          msDb.getOwnerType(), TEST_TYPE_2);
+    } else {
+      fail("Unknown owner for the database " + msDb.getOwnerName());
+    }
+  }
+}


[impala] 01/04: Update version to 3.3.0-SNAPSHOT

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaborkaszab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a2a4de00145742ceb380510c9c53e6e22e221988
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Thu Mar 28 21:03:27 2019 +0100

    Update version to 3.3.0-SNAPSHOT
    
    Change-Id: I91ecdb584645fd723faad4fe0d4f76245b644ef9
    Reviewed-on: http://gerrit.cloudera.org:8080/12880
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/save-version.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/bin/save-version.sh b/bin/save-version.sh
index 6024dfa..f170db3 100755
--- a/bin/save-version.sh
+++ b/bin/save-version.sh
@@ -21,7 +21,7 @@
 # Note: for internal (aka pre-release) versions, the version should have
 # "-INTERNAL" appended. Parts of the code will look for this to distinguish
 # between released and internal versions.
-VERSION=3.2.0-SNAPSHOT
+VERSION=3.3.0-SNAPSHOT
 GIT_HASH=$(git rev-parse HEAD 2> /dev/null)
 if [ -z $GIT_HASH ]
 then