You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ga...@apache.org on 2019/04/01 11:54:59 UTC

[impala] 03/04: IMPALA-7982: Add host network usage to profile

This is an automated email from the ASF dual-hosted git repository.

gaborkaszab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1fcea9e4b677f07dfaf6b0314ac763241a029e66
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Wed Mar 13 15:04:32 2019 -0700

    IMPALA-7982: Add host network usage to profile
    
    This change adds host network usage to profiles. For each host that
    participates in the query execution it adds the incoming and outgoing
    bandwidth across all interfaces excluding the local loopback interface.
    This includes all data transmitted by the host as part of the execution
    of a query, other queries, and other processes running on the same
    system.
    
    The change adds tests for the added functionality to the backend and end
    to end tests.
    
    Change-Id: I2cc74f87374080a74a13b7fb6e4da44a11d828ef
    Reviewed-on: http://gerrit.cloudera.org:8080/12747
    Reviewed-by: Lars Volker <lv...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/query-state.cc          |   9 +++
 be/src/util/system-state-info-test.cc  |  60 +++++++++++++++--
 be/src/util/system-state-info.cc       | 116 ++++++++++++++++++++++++++++-----
 be/src/util/system-state-info.h        |  81 +++++++++++++++++++++--
 tests/query_test/test_observability.py |  39 ++++++-----
 5 files changed, 263 insertions(+), 42 deletions(-)

diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index beef32c..0d33a12 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -161,6 +161,15 @@ Status QueryState::Init(const TExecQueryFInstancesParams& exec_rpc_params) {
         "HostCpuIoWaitPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
         return system_state_info->GetCpuUsageRatios().iowait;
         });
+    // Add network usage
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostNetworkRx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
+        return system_state_info->GetNetworkUsage().rx_rate;
+        });
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostNetworkTx", TUnit::BYTES_PER_SECOND, [system_state_info] () {
+        return system_state_info->GetNetworkUsage().tx_rate;
+        });
   }
 
   // Starting a new query creates threads and consumes a non-trivial amount of memory.
diff --git a/be/src/util/system-state-info-test.cc b/be/src/util/system-state-info-test.cc
index 9bae942..c80d577 100644
--- a/be/src/util/system-state-info-test.cc
+++ b/be/src/util/system-state-info-test.cc
@@ -32,23 +32,36 @@ class SystemStateInfoTest : public testing::Test {
 TEST_F(SystemStateInfoTest, FirstCallReturnsZero) {
   const SystemStateInfo::CpuUsageRatios& r = info.GetCpuUsageRatios();
   EXPECT_EQ(0, r.user + r.system + r.iowait);
+
+  const SystemStateInfo::NetworkUsage& n = info.GetNetworkUsage();
+  EXPECT_EQ(0, n.rx_rate + n.tx_rate);
 }
 
 // Smoke test to make sure that we read non-zero values from /proc/stat.
 TEST_F(SystemStateInfoTest, ReadProcStat) {
   info.ReadCurrentProcStat();
-  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cur_val_idx_];
+  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cpu_val_idx_];
   EXPECT_GT(state[SystemStateInfo::CPU_USER], 0);
   EXPECT_GT(state[SystemStateInfo::CPU_SYSTEM], 0);
   EXPECT_GT(state[SystemStateInfo::CPU_IDLE], 0);
   EXPECT_GT(state[SystemStateInfo::CPU_IOWAIT], 0);
 }
 
+// Smoke test to make sure that we read non-zero values from /proc/net/dev.
+TEST_F(SystemStateInfoTest, ReadProcNetDev) {
+  info.ReadCurrentProcNetDev();
+  const SystemStateInfo::NetworkValues& state = info.network_values_[info.net_val_idx_];
+  EXPECT_GT(state[SystemStateInfo::NET_RX_BYTES], 0);
+  EXPECT_GT(state[SystemStateInfo::NET_RX_PACKETS], 0);
+  EXPECT_GT(state[SystemStateInfo::NET_TX_BYTES], 0);
+  EXPECT_GT(state[SystemStateInfo::NET_TX_PACKETS], 0);
+}
+
 // Tests parsing a line similar to the first line of /proc/stat.
 TEST_F(SystemStateInfoTest, ParseProcStat) {
   // Fields are: user nice system idle iowait irq softirq steal guest guest_nice
   info.ReadProcStatString("cpu  20 30 10 70 100 0 0 0 0 0");
-  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cur_val_idx_];
+  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cpu_val_idx_];
   EXPECT_EQ(state[SystemStateInfo::CPU_USER], 20);
   EXPECT_EQ(state[SystemStateInfo::CPU_SYSTEM], 10);
   EXPECT_EQ(state[SystemStateInfo::CPU_IDLE], 70);
@@ -56,10 +69,26 @@ TEST_F(SystemStateInfoTest, ParseProcStat) {
 
   // Test that values larger than INT_MAX parse without error.
   info.ReadProcStatString("cpu  3000000000 30 10 70 100 0 0 0 0 0");
-  const SystemStateInfo::CpuValues& changed_state = info.cpu_values_[info.cur_val_idx_];
+  const SystemStateInfo::CpuValues& changed_state = info.cpu_values_[info.cpu_val_idx_];
   EXPECT_EQ(changed_state[SystemStateInfo::CPU_USER], 3000000000);
 }
 
+// Tests parsing a string similar to the contents of /proc/net/dev.
+TEST_F(SystemStateInfoTest, ParseProcNetDevString) {
+  // Fields are: user nice system idle iowait irq softirq steal guest guest_nice
+  string dev_net = R"(Inter-|   Receive                                                |  Transmit
+   face |bytes    packets errs drop fifo frame compressed multicast|bytes    packets errs drop fifo colls carrier compressed
+       lo: 12178099975318 24167151765    0    0    0     0          0         0 12178099975318 24167151765    0    0    0     0       0          0
+       br-c4d9b4cafca2: 1025814     409    0    0    0     0          0         0 70616330 1638104    0    0    0     0       0          0
+         eth0: 366039609986 388580561    0    2    0     0          0  62231368 95492744135 174535524    0    0    0     0       0          0)";
+  info.ReadProcNetDevString(dev_net);
+  const SystemStateInfo::NetworkValues& state = info.network_values_[info.net_val_idx_];
+  EXPECT_EQ(state[SystemStateInfo::NET_RX_BYTES], 366040635800);
+  EXPECT_EQ(state[SystemStateInfo::NET_RX_PACKETS], 388580970);
+  EXPECT_EQ(state[SystemStateInfo::NET_TX_BYTES], 95563360465);
+  EXPECT_EQ(state[SystemStateInfo::NET_TX_PACKETS], 176173628);
+}
+
 // Tests that computing CPU ratios doesn't overflow
 TEST_F(SystemStateInfoTest, ComputeCpuRatiosIntOverflow) {
   // Load old and new values for CPU counters. These values are from a system where we
@@ -88,7 +117,7 @@ TEST_F(SystemStateInfoTest, GetCpuUsageRatios) {
   t.join();
 }
 
-// Tests the computation logic.
+// Tests the computation logic for CPU ratios.
 TEST_F(SystemStateInfoTest, ComputeCpuRatios) {
   info.ReadProcStatString("cpu  20 30 10 70 100 0 0 0 0 0");
   info.ReadProcStatString("cpu  30 30 20 70 100 0 0 0 0 0");
@@ -99,5 +128,28 @@ TEST_F(SystemStateInfoTest, ComputeCpuRatios) {
   EXPECT_EQ(r.iowait, 0);
 }
 
+// Tests the computation logic for network usage.
+TEST_F(SystemStateInfoTest, ComputeNetworkUsage) {
+  // Two sets of values in the format of /proc/net/dev
+  string dev_net_1 = R"(Inter-|   Receive                                                |  Transmit
+   face |bytes    packets errs drop fifo frame compressed multicast|bytes    packets errs drop fifo colls carrier compressed
+       lo: 1000 2000    0    0    0     0          0         0 3000 4000    0    0    0     0       0          0
+       br-c4d9b4cafca2: 5000     409    0    0    0     0          0         0 6000 7000    0    0    0     0       0          0
+         eth0: 8000 9000    0    2    0     0          0  10000 11000 12000    0    0    0     0       0          0)";
+  string dev_net_2 = R"(Inter-|   Receive                                                |  Transmit
+   face |bytes    packets errs drop fifo frame compressed multicast|bytes    packets errs drop fifo colls carrier compressed
+       lo: 2000 4000    0    0    0     0          0         0 6000 8000    0    0    0     0       0          0
+       br-c4d9b4cafca2: 7000     609    0    0    0     0          0         0 12000 14000    0    0    0     0       0          0
+         eth0: 10000 11000    0    2    0     0          0  10000 11000 12000    0    0    0     0       0          0)";
+
+  info.ReadProcNetDevString(dev_net_1);
+  info.ReadProcNetDevString(dev_net_2);
+  int period_ms = 500;
+  info.ComputeNetworkUsage(period_ms);
+  const SystemStateInfo::NetworkUsage& n = info.GetNetworkUsage();
+  EXPECT_EQ(n.rx_rate, 8000);
+  EXPECT_EQ(n.tx_rate, 12000);
+}
+
 } // namespace impala
 
diff --git a/be/src/util/system-state-info.cc b/be/src/util/system-state-info.cc
index 62a1104..400a063 100644
--- a/be/src/util/system-state-info.cc
+++ b/be/src/util/system-state-info.cc
@@ -16,9 +16,13 @@
 // under the License.
 
 #include "gutil/strings/split.h"
+#include "gutil/strings/util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
 #include "util/error-util.h"
 #include "util/string-parser.h"
 #include "util/system-state-info.h"
+#include "util/time.h"
 
 #include <algorithm>
 #include <fstream>
@@ -27,19 +31,31 @@
 #include "common/names.h"
 
 using std::accumulate;
+using kudu::Env;
+using kudu::faststring;
+using kudu::ReadFileToString;
+using strings::Split;
 
 namespace impala {
 
 // Partially initializing cpu_ratios_ will default-initialize the remaining members.
 SystemStateInfo::SystemStateInfo() {
   memset(&cpu_ratios_, 0, sizeof(cpu_ratios_));
+  memset(&network_usage_, 0, sizeof(network_usage_));
   ReadCurrentProcStat();
+  ReadCurrentProcNetDev();
+  last_net_update_ms_ = MonotonicMillis();
 }
 
 void SystemStateInfo::CaptureSystemStateSnapshot() {
   // Capture and compute CPU usage
   ReadCurrentProcStat();
   ComputeCpuRatios();
+  int64_t now = MonotonicMillis();
+  ReadCurrentProcNetDev();
+  DCHECK_GT(last_net_update_ms_, 0L);
+  ComputeNetworkUsage(now - last_net_update_ms_);
+  last_net_update_ms_ = now;
 }
 
 int64_t SystemStateInfo::ParseInt64(const string& val) const {
@@ -49,21 +65,18 @@ int64_t SystemStateInfo::ParseInt64(const string& val) const {
   return -1;
 }
 
-void SystemStateInfo::ReadFirstLineFromFile(const char* path, string* out) const {
-  ifstream proc_file(path);
-  if (!proc_file.is_open()) {
-    LOG(WARNING) << "Could not open " << path << ": " << GetStrErrMsg() << endl;
+void SystemStateInfo::ReadCurrentProcStat() {
+  string path = "/proc/stat";
+  faststring buf;
+  kudu::Status status = ReadFileToString(Env::Default(), path, &buf);
+  if (!status.ok()) {
+    LOG(WARNING) << "Could not open " << path << ": " << status.message() << endl;
     return;
   }
-  DCHECK(proc_file.is_open());
-  DCHECK(out != nullptr);
-  getline(proc_file, *out);
-}
-
-void SystemStateInfo::ReadCurrentProcStat() {
-  string line;
-  ReadFirstLineFromFile("/proc/stat", &line);
-  ReadProcStatString(line);
+  StringPiece buf_sp(reinterpret_cast<const char*>(buf.data()), buf.size());
+  vector<StringPiece> lines = strings::Split(buf_sp, "\n");
+  DCHECK_GT(lines.size(), 0);
+  ReadProcStatString(lines[0].ToString());
 }
 
 void SystemStateInfo::ReadProcStatString(const string& stat_string) {
@@ -72,8 +85,8 @@ void SystemStateInfo::ReadProcStatString(const string& stat_string) {
   // Skip the first value ('cpu  ');
   ss.ignore(5);
 
-  cur_val_idx_ = 1 - cur_val_idx_;
-  CpuValues& next_values = cpu_values_[cur_val_idx_];
+  cpu_val_idx_ = 1 - cpu_val_idx_;
+  CpuValues& next_values = cpu_values_[cpu_val_idx_];
 
   for (int i = 0; i < NUM_CPU_VALUES; ++i) {
     int64_t v = -1;
@@ -85,8 +98,8 @@ void SystemStateInfo::ReadProcStatString(const string& stat_string) {
 }
 
 void SystemStateInfo::ComputeCpuRatios() {
-  const CpuValues& cur = cpu_values_[cur_val_idx_];
-  const CpuValues& old = cpu_values_[1 - cur_val_idx_];
+  const CpuValues& cur = cpu_values_[cpu_val_idx_];
+  const CpuValues& old = cpu_values_[1 - cpu_val_idx_];
 
   // Sum up all counters
   int64_t cur_sum = accumulate(cur.begin(), cur.end(), 0L);
@@ -107,4 +120,73 @@ void SystemStateInfo::ComputeCpuRatios() {
   cpu_ratios_.iowait = ((cur[CPU_IOWAIT] - old[CPU_IOWAIT]) * BASIS_MAX) / total_tics;
 }
 
+void SystemStateInfo::ReadCurrentProcNetDev() {
+  string path = "/proc/net/dev";
+  faststring buf;
+  kudu::Status status = ReadFileToString(Env::Default(), path, &buf);
+  if (!status.ok()) {
+    LOG(WARNING) << "Could not open " << path << ": " << status.message() << endl;
+    return;
+  }
+  ReadProcNetDevString(buf.ToString());
+}
+
+void SystemStateInfo::ReadProcNetDevString(const string& dev_string) {
+  stringstream ss(dev_string);
+  string line;
+
+  // Discard the first two lines that contain the header
+  getline(ss, line);
+  getline(ss, line);
+
+  net_val_idx_ = 1 - net_val_idx_;
+  NetworkValues& sum_values = network_values_[net_val_idx_];
+  memset(&sum_values, 0, sizeof(sum_values));
+
+  while (getline(ss, line)) {
+    NetworkValues line_values;
+    ReadProcNetDevLine(line, &line_values);
+    AddNetworkValues(line_values, &sum_values);
+  }
+}
+
+void SystemStateInfo::ReadProcNetDevLine(
+    const string& dev_string, NetworkValues* result) {
+  stringstream ss(dev_string);
+
+  // Read the interface name
+  string if_name;
+  ss >> if_name;
+
+  // Don't include the loopback interface
+  if (HasPrefixString(if_name, "lo:")) {
+    memset(result, 0, sizeof(*result));
+    return;
+  }
+
+  for (int i = 0; i < NUM_NET_VALUES; ++i) {
+    int64_t v = -1;
+    ss >> v;
+    DCHECK_GE(v, 0) << "Value " << i << ": " << v;
+    // Clamp invalid entries at 0
+    (*result)[i] = max(v, 0L);
+  }
+}
+
+void SystemStateInfo::AddNetworkValues(const NetworkValues& a, NetworkValues* b) {
+  for (int i = 0; i < NUM_NET_VALUES; ++i) (*b)[i] += a[i];
+}
+
+void SystemStateInfo::ComputeNetworkUsage(int64_t period_ms) {
+  if (period_ms == 0) return;
+  // Compute network throughput in bytes per second
+  const NetworkValues& cur = network_values_[net_val_idx_];
+  const NetworkValues& old = network_values_[1 - net_val_idx_];
+  int64_t rx_bytes = cur[NET_RX_BYTES] - old[NET_RX_BYTES];
+  network_usage_.rx_rate = rx_bytes * 1000L / period_ms;
+
+  int64_t tx_bytes = cur[NET_TX_BYTES] - old[NET_TX_BYTES];
+  network_usage_.tx_rate = tx_bytes * 1000L / period_ms;
+}
+
 } // namespace impala
diff --git a/be/src/util/system-state-info.h b/be/src/util/system-state-info.h
index 230f868..1b36e0c 100644
--- a/be/src/util/system-state-info.h
+++ b/be/src/util/system-state-info.h
@@ -46,19 +46,29 @@ class SystemStateInfo {
     int32_t system;
     int32_t iowait;
   };
+
   /// Returns a struct containing the CPU usage ratios for the interval between the last
   /// two calls to CaptureSystemStateSnapshot().
   const CpuUsageRatios& GetCpuUsageRatios() { return cpu_ratios_; }
 
+  /// Network usage rates in bytes per second.
+  struct NetworkUsage {
+    int64_t rx_rate;
+    int64_t tx_rate;
+  };
+
+  /// Returns a struct containing the network usage for the interval between the last two
+  /// calls to CaptureSystemStateSnapshot().
+  const NetworkUsage& GetNetworkUsage() { return network_usage_; }
+
  private:
   int64_t ParseInt64(const std::string& val) const;
-  void ReadFirstLineFromFile(const char* path, std::string* out) const;
 
-  /// Rotates 'cur_val_idx_' and reads the current CPU usage values from /proc/stat into
+  /// Rotates 'cpu_val_idx_' and reads the current CPU usage values from /proc/stat into
   /// the current set of values.
   void ReadCurrentProcStat();
 
-  /// Rotates 'cur_val_idx_' and reads the CPU usage values from 'stat_string' into the
+  /// Rotates 'cpu_val_idx_' and reads the CPU usage values from 'stat_string' into the
   /// current set of values.
   void ReadProcStatString(const string& stat_string);
 
@@ -66,6 +76,7 @@ class SystemStateInfo {
   /// CaptureSystemStateSnapshot() and stores the result in 'cpu_ratios_'.
   void ComputeCpuRatios();
 
+  /// The enum names correspond to the fields of /proc/stat.
   enum PROC_STAT_CPU_VALUES {
     CPU_USER = 0,
     CPU_NICE,
@@ -80,15 +91,77 @@ class SystemStateInfo {
   typedef std::array<int64_t, NUM_CPU_VALUES> CpuValues;
   /// Two buffers to keep the current and previous set of CPU usage values.
   CpuValues cpu_values_[2];
-  int cur_val_idx_ = 0;
+  /// Index into cpu_values_ that points to the current set of values. We maintain a
+  /// separate index for CPU and network to be able to update them independently, e.g. in
+  /// tests.
+  int cpu_val_idx_ = 0;
 
   /// The computed CPU usage ratio between the current and previous snapshots in
   /// cpu_values_. Updated in ComputeCpuRatios().
   CpuUsageRatios cpu_ratios_;
 
+  /// The enum names correspond to the fields of /proc/net/dev
+  enum PROC_NET_DEV_VALUES {
+    NET_RX_BYTES = 0,
+    NET_RX_PACKETS,
+    NET_RX_ERRS,
+    NET_RX_DROP,
+    NET_RX_FIFO,
+    NET_RX_FRAME,
+    NET_RX_COMPRESSED,
+    NET_RX_MULTICAST,
+    NET_TX_BYTES,
+    NET_TX_PACKETS,
+    NET_TX_ERRS,
+    NET_TX_DROP,
+    NET_TX_COLLS,
+    NET_TX_CARRIER,
+    NET_TX_COMPRESSED,
+    NUM_NET_VALUES
+  };
+
+  /// We store these values in an array so that we can iterate over them, e.g. when
+  /// reading them from a file or summing them up.
+  typedef std::array<int64_t, NUM_NET_VALUES> NetworkValues;
+  /// Two buffers to keep the current and previous set of network counter values.
+  NetworkValues network_values_[2];
+  /// Index into network_values_ that points to the current set of values. We maintain a
+  /// separate index for CPU and network to be able to update them independently, e.g. in
+  /// tests.
+  int net_val_idx_ = 0;
+
+  /// Rotates net_val_idx_ and reads the current set of values from /proc/net/dev into
+  /// network_values_.
+  void ReadCurrentProcNetDev();
+
+  /// Rotates net_val_idx_ and parses the content of 'dev_string' into network_values_.
+  /// dev_string must be in the format of /proc/net/dev.
+  void ReadProcNetDevString(const string& dev_string);
+
+  /// Parses a single line as they appear in /proc/net/dev into 'result'. Entries are set
+  /// to 0 for the local loopback interface and for invalid entries.
+  void ReadProcNetDevLine(const string& dev_string, NetworkValues* result);
+
+  /// Computes b = b + a.
+  void AddNetworkValues(const NetworkValues& a, NetworkValues* b);
+
+  /// Compute the network usage.
+  void ComputeNetworkUsage(int64_t period_ms);
+
+  /// The compute network usage between the current and previous snapshots in
+  /// network_values_. Updated in ComputeNetworkUsage().
+  NetworkUsage network_usage_;
+
+  /// The last time of reading network usage values from /proc/net/dev. Used by
+  /// CaptureSystemStateSnapshot().
+  int64_t last_net_update_ms_;
+
   FRIEND_TEST(SystemStateInfoTest, ComputeCpuRatios);
   FRIEND_TEST(SystemStateInfoTest, ComputeCpuRatiosIntOverflow);
+  FRIEND_TEST(SystemStateInfoTest, ComputeNetworkUsage);
+  FRIEND_TEST(SystemStateInfoTest, ParseProcNetDevString);
   FRIEND_TEST(SystemStateInfoTest, ParseProcStat);
+  FRIEND_TEST(SystemStateInfoTest, ReadProcNetDev);
   FRIEND_TEST(SystemStateInfoTest, ReadProcStat);
 };
 
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 3c190e2..a1dca7f 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -431,25 +431,29 @@ class TestObservability(ImpalaTestSuite):
     expected_str = "Per Node Profiles:"
     assert any(expected_str in line for line in profile.splitlines())
 
-  def test_query_profile_host_cpu_usage_off(self):
-    """Tests that the query profile does not contain CPU metrics by default or when
-    disabled explicitly."""
+  def test_query_profile_host_resource_metrics_off(self):
+    """Tests that the query profile does not contain resource usage metrics by default or
+       when disabled explicitly."""
     query = "select count(*), sleep(1000) from functional.alltypes"
     for query_opts in [None, {'resource_trace_ratio': 0.0}]:
       profile = self.execute_query(query, query_opts).runtime_profile
-      # Assert that no CPU counters exist in the profile
+      # Assert that no host resource counters exist in the profile
       for line in profile.splitlines():
         assert not re.search("HostCpu.*Percentage", line)
+        assert not re.search("HostNetworkRx", line)
 
-  def test_query_profile_contains_host_cpu_usage(self):
-    """Tests that the query profile contains various CPU metrics."""
+  def test_query_profile_contains_host_resource_metrics(self):
+    """Tests that the query profile contains various CPU and network metrics."""
     query_opts = {'resource_trace_ratio': 1.0}
     query = "select count(*), sleep(1000) from functional.alltypes"
     profile = self.execute_query(query, query_opts).runtime_profile
-    # We check for 500ms because a query with 1s duration won't hit the 64 values limit.
+    # We check for 500ms because a query with 1s duration won't hit the 64 values limit
+    # that would trigger resampling.
     expected_strs = ["HostCpuIoWaitPercentage (500.000ms):",
                      "HostCpuSysPercentage (500.000ms):",
-                     "HostCpuUserPercentage (500.000ms):"]
+                     "HostCpuUserPercentage (500.000ms):",
+                     "HostNetworkRx (500.000ms):",
+                     "HostNetworkTx (500.000ms):"]
 
     # Assert that all expected counters exist in the profile.
     for expected_str in expected_strs:
@@ -482,19 +486,20 @@ class TestObservability(ImpalaTestSuite):
     return thrift_profile
 
   @pytest.mark.execute_serially
-  def test_thrift_profile_contains_cpu_usage(self):
-    """Tests that the thrift profile contains a time series counter for CPU resource
-       usage."""
+  def test_thrift_profile_contains_host_resource_metrics(self):
+    """Tests that the thrift profile contains time series counters for CPU and network
+       resource usage."""
     query_opts = {'resource_trace_ratio': 1.0}
     result = self.execute_query("select sleep(2000)", query_opts)
     thrift_profile = self._get_thrift_profile(result.query_id)
 
-    cpu_key = "HostCpuUserPercentage"
-    cpu_counters = self._find_ts_counters_in_thrift_profile(thrift_profile, cpu_key)
-    # The query will run on a single node, we will only find the counter once.
-    assert len(cpu_counters) == 1
-    cpu_counter = cpu_counters[0]
-    assert len(cpu_counter.values) > 0
+    expected_keys = ["HostCpuUserPercentage", "HostNetworkRx"]
+    for key in expected_keys:
+      counters = self._find_ts_counters_in_thrift_profile(thrift_profile, key)
+      # The query will run on a single node, we will only find the counter once.
+      assert len(counters) == 1
+      counter = counters[0]
+      assert len(counter.values) > 0
 
   @pytest.mark.execute_serially
   def test_query_profile_thrift_timestamps(self):