You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2019/01/17 15:06:08 UTC

[impala] 03/03: IMPALA-6964: Track stats about column and page sizes in Parquet reader

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8da44ce16bb190dadab2ff3d22e5df726d1128e3
Author: stakiar <ta...@gmail.com>
AuthorDate: Fri Oct 19 07:48:16 2018 -0500

    IMPALA-6964: Track stats about column and page sizes in Parquet reader
    
    Adds the following new stats:
    
    * ParquetCompressedPageSize - a summary (average, min, max) counter that
    tracks the size of compressed pages read, if no compressed pages are
    read then this counter is empty
    * ParquetUncompressedPageSize - a summary counter that tracks the size
    of uncompressed pages read, it is updated in two places: (1) when a
    compressed page is de-compressed, and (2) when a page that is not
    compressed is read
    * ParquetCompressedDataReadPerColumn - a summary counter that tracks the
    amount of compressed data read per column for a scan node
    * ParquetUncompressedDataReadPerColumn - a summary counter that tracks
    the amount of uncompressed data read per column for a scan node
    
    The PerColumn counters are calculated by aggregating the number of bytes
    read for each column across all scan ranges processed by a scan node.
    Each sample in the counter is the size of a single column.
    
    Here is an example of what the updated HDFS scan profile looks like:
    
    - ParquetCompressedDataReadPerColumn: (Avg: 227.56 KB (233018) ;
    Min: 225.14 KB (230540) ; Max: 229.98 KB (235496) ; Number of samples: 2)
    - ParquetUncompressedDataReadPerColumn: (Avg: 227.96 KB (233426) ;
    Min: 224.91 KB (230306) ; Max: 231.00 KB (236547) ; Number of samples: 2)
    - ParquetCompressedPageSize: (Avg: 4.46 KB (4568) ; Min: 3.86 KB (3955) ;
    Max: 5.19 KB (5315) ; Number of samples: 102)
    - ParquetDecompressedPageSize: (Avg: 4.47 KB (4576) ; Min: 3.86 KB (3950)
     ; Max: 5.22 KB (5349) ; Number of samples: 102)
    
    Testing:
    * Added new tests to test_scanners.py that do some basic validation of
    the new counters above
    
    Change-Id: I322f9b324b6828df28e5caf79529085c43d7c817
    Reviewed-on: http://gerrit.cloudera.org:8080/11575
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-scan-node-base.cc            |  45 ++++++++++++
 be/src/exec/hdfs-scan-node-base.h             |  33 +++++++++
 be/src/exec/parquet/hdfs-parquet-scanner.cc   |  15 ++++
 be/src/exec/parquet/hdfs-parquet-scanner.h    |  18 +++++
 be/src/exec/parquet/parquet-column-readers.cc |  11 +++
 be/src/util/runtime-profile.cc                |   2 +-
 tests/infra/test_utils.py                     |  17 +++++
 tests/query_test/test_scanners.py             | 101 +++++++++++++++++++++++---
 tests/util/parse_util.py                      |  51 +++++++++++++
 9 files changed, 283 insertions(+), 10 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 1e627b4..3470124 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -373,6 +373,11 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   initial_range_actual_reservation_stats_ = ADD_SUMMARY_STATS_COUNTER(runtime_profile(),
       "InitialRangeActualReservation", TUnit::BYTES);
 
+  uncompressed_bytes_read_per_column_counter_ = ADD_SUMMARY_STATS_COUNTER(
+      runtime_profile(), "ParquetUncompressedBytesReadPerColumn", TUnit::BYTES);
+  compressed_bytes_read_per_column_counter_ = ADD_SUMMARY_STATS_COUNTER(
+      runtime_profile(), "ParquetCompressedBytesReadPerColumn", TUnit::BYTES);
+
   bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
       TUnit::BYTES);
   bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit",
@@ -892,6 +897,25 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
   runtime_profile()->AppendExecOption(
       Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
 
+  // Locking here should not be necessary since bytes_read_per_col_ is only updated inside
+  // column readers, and all column readers should have completed at this point; however,
+  // we acquire a read lock in case the update semantics of bytes_read_per_col_ change
+  {
+    shared_lock<shared_mutex> bytes_read_per_col_guard_read_lock(
+        bytes_read_per_col_lock_);
+    for (const auto& bytes_read : bytes_read_per_col_) {
+      int64_t uncompressed_bytes_read = bytes_read.second.uncompressed_bytes_read.Load();
+      if (uncompressed_bytes_read > 0) {
+        uncompressed_bytes_read_per_column_counter_->UpdateCounter(
+            uncompressed_bytes_read);
+      }
+      int64_t compressed_bytes_read = bytes_read.second.compressed_bytes_read.Load();
+      if (compressed_bytes_read > 0) {
+        compressed_bytes_read_per_column_counter_->UpdateCounter(compressed_bytes_read);
+      }
+    }
+  }
+
   if (reader_context_ != nullptr) {
     bytes_read_local_->Set(reader_context_->bytes_read_local());
     bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
@@ -926,3 +950,24 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
 Status HdfsScanNodeBase::ScanNodeDebugAction(TExecNodePhase::type phase) {
   return ExecDebugAction(phase, runtime_state_);
 }
+
+void HdfsScanNodeBase::UpdateBytesRead(
+    SlotId slot_id, int64_t uncompressed_bytes_read, int64_t compressed_bytes_read) {
+  // Acquire a read lock first and check if the slot_id is in bytes_read_per_col_, if it
+  // is then update the value and release the read lock; if not then release the read
+  // lock, acquire the write lock, and then initialize the slot_id with the give value for
+  // bytes_read
+  shared_lock<shared_mutex> bytes_read_per_col_guard_read_lock(
+      bytes_read_per_col_lock_);
+  auto bytes_read_itr = bytes_read_per_col_.find(slot_id);
+  if (bytes_read_itr != bytes_read_per_col_.end()) {
+    bytes_read_itr->second.uncompressed_bytes_read.Add(uncompressed_bytes_read);
+    bytes_read_itr->second.compressed_bytes_read.Add(compressed_bytes_read);
+  } else {
+    bytes_read_per_col_guard_read_lock.unlock();
+    lock_guard<shared_mutex> bytes_read_per_col_guard_write_lock(
+        bytes_read_per_col_lock_);
+    bytes_read_per_col_[slot_id].uncompressed_bytes_read.Add(uncompressed_bytes_read);
+    bytes_read_per_col_[slot_id].compressed_bytes_read.Add(compressed_bytes_read);
+  }
+}
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 7da66cb..168bd18 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -363,6 +363,12 @@ class HdfsScanNodeBase : public ScanNode {
   int64_t IncreaseReservationIncrementally(int64_t curr_reservation,
       int64_t ideal_reservation);
 
+  /// Update the number of [un]compressed bytes read for the given SlotId. This is used
+  /// to track the number of bytes read per column and is meant to be called by
+  /// individual scanner classes.
+  void UpdateBytesRead(
+      SlotId slot_id, int64_t uncompressed_bytes_read, int64_t compressed_bytes_read);
+
  protected:
   friend class ScannerContext;
   friend class HdfsScanner;
@@ -545,6 +551,15 @@ class HdfsScanNodeBase : public ScanNode {
   RuntimeProfile::SummaryStatsCounter* initial_range_ideal_reservation_stats_ = nullptr;
   RuntimeProfile::SummaryStatsCounter* initial_range_actual_reservation_stats_ = nullptr;
 
+  /// Track stats about the number of bytes read per column. Each sample in the counter is
+  /// the size of a single column that is scanned by the scan node. The scan node tracks
+  /// the number of bytes read for each column it processes, and when the scan node is
+  /// closed, it updates these counters with the size of each column.
+  RuntimeProfile::SummaryStatsCounter* compressed_bytes_read_per_column_counter_ =
+      nullptr;
+  RuntimeProfile::SummaryStatsCounter* uncompressed_bytes_read_per_column_counter_ =
+      nullptr;
+
   /// Pool for allocating some amounts of memory that is shared between scanners.
   /// e.g. partition key tuple and their string buffers
   boost::scoped_ptr<MemPool> scan_node_pool_;
@@ -554,6 +569,24 @@ class HdfsScanNodeBase : public ScanNode {
   /// scanner threads.
   Status status_;
 
+  /// Struct that tracks the uncompressed and compressed bytes read. Used by the map
+  /// bytes_read_per_col_ to track the [un]compressed bytes read per column. Types are
+  /// atomic as the struct may be updated concurrently.
+  struct BytesRead {
+    AtomicInt64 uncompressed_bytes_read;
+    AtomicInt64 compressed_bytes_read;
+  };
+
+  /// Map from SlotId (column identifer) to a pair where the first entry is the number of
+  /// uncompressed bytes read for the column and the second entry is the number of
+  /// compressed bytes read for the column. This map is used to update the
+  /// [un]compressed_bytes_read_per_column counter.
+  std::unordered_map<SlotId, BytesRead> bytes_read_per_col_;
+
+  /// Lock that controls access to bytes_read_per_col_ so that multiple scanners
+  /// can update the map concurrently
+  boost::shared_mutex bytes_read_per_col_lock_;
+
   /// Performs dynamic partition pruning, i.e., applies runtime filters to files, and
   /// issues initial ranges for all file types. Waits for runtime filters if necessary.
   /// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true.
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 9cfcdb8..4fe9914 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -100,6 +100,8 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
     num_row_groups_counter_(nullptr),
     num_scanners_with_no_reads_counter_(nullptr),
     num_dict_filtered_row_groups_counter_(nullptr),
+    parquet_compressed_page_size_counter_(nullptr),
+    parquet_uncompressed_page_size_counter_(nullptr),
     coll_items_read_counter_(0),
     codegend_process_scratch_batch_fn_(nullptr) {
   assemble_rows_timer_.Stop();
@@ -121,6 +123,10 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
       ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", TUnit::UNIT);
   process_footer_timer_stats_ =
       ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "FooterProcessingTime");
+  parquet_compressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER(
+      scan_node_->runtime_profile(), "ParquetCompressedPageSize", TUnit::BYTES);
+  parquet_uncompressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER(
+      scan_node_->runtime_profile(), "ParquetUncompressedPageSize", TUnit::BYTES);
 
   codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
       scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
@@ -1681,4 +1687,13 @@ ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder(
   return ParquetTimestampDecoder(element, &state_->local_time_zone(),
       timestamp_conversion_needed_for_int96_timestamps);
 }
+
+void HdfsParquetScanner::UpdateCompressedPageSizeCounter(int64_t compressed_page_size) {
+  parquet_compressed_page_size_counter_->UpdateCounter(compressed_page_size);
+}
+
+void HdfsParquetScanner::UpdateUncompressedPageSizeCounter(
+    int64_t uncompressed_page_size) {
+  parquet_uncompressed_page_size_counter_->UpdateCounter(uncompressed_page_size);
+}
 }
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 94d4714..09da2bb 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -452,6 +452,16 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of row groups skipped due to dictionary filter
   RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_;
 
+  /// Tracks the size of any compressed pages read. If no compressed pages are read, this
+  /// counter is empty
+  RuntimeProfile::SummaryStatsCounter* parquet_compressed_page_size_counter_;
+
+  /// Tracks the size of any de-compressed pages reads. This counter is updated in two
+  /// places: (1) when a compressed page is de-compressed, the de-compressed size is added
+  /// to this counter, (2) when a page that is not compressed is read, its size is added
+  /// to this counter
+  RuntimeProfile::SummaryStatsCounter* parquet_uncompressed_page_size_counter_;
+
   /// Number of collection items read in current row batch. It is a scanner-local counter
   /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
   /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
@@ -640,6 +650,14 @@ class HdfsParquetScanner : public HdfsScanner {
   /// no values that pass the relevant conjuncts, then the row group can be skipped.
   Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
       bool* skip_row_group) WARN_UNUSED_RESULT;
+
+  /// Updates the counter parquet_compressed_page_size_counter_ with the given compressed
+  /// page size. Called by ParquetColumnReader for each page read.
+  void UpdateCompressedPageSizeCounter(int64_t compressed_page_size);
+
+  /// Updates the counter parquet_uncompressed_page_size_counter_ with the given
+  /// uncompressed page size. Called by ParquetColumnReader for each page read.
+  void UpdateUncompressedPageSizeCounter(int64_t uncompressed_page_size);
 };
 
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index fce14b1..f6ad734 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -1355,6 +1355,13 @@ Status BaseScalarColumnReader::ReadDataPage() {
       data_ = decompressed_buffer;
       data_size = current_page_header_.uncompressed_page_size;
       data_end_ = data_ + data_size;
+      if (slot_desc() != nullptr) {
+        parent_->scan_node_->UpdateBytesRead(slot_desc()->id(), uncompressed_size,
+            current_page_header_.compressed_page_size);
+        parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
+        parent_->UpdateCompressedPageSizeCounter(
+            current_page_header_.compressed_page_size);
+      }
     } else {
       DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
       if (current_page_header_.compressed_page_size != uncompressed_size) {
@@ -1373,6 +1380,10 @@ Status BaseScalarColumnReader::ReadDataPage() {
         data_ = copy_buffer;
         data_end_ = data_ + uncompressed_size;
       }
+      if (slot_desc() != nullptr) {
+        parent_->scan_node_->UpdateBytesRead(slot_desc()->id(), uncompressed_size, 0);
+        parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
+      }
     }
 
     // Initialize the repetition level data
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index cdb9dd9..9892c28 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -795,7 +795,7 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
     for (const SummaryStatsCounterMap::value_type& v: summary_stats_map_) {
       if (v.second->TotalNumValues() == 0) {
         // No point printing all the stats if number of samples is zero.
-        stream << prefix << "  - " << v.first << ": "
+        stream << prefix << "   - " << v.first << ": "
                << PrettyPrinter::Print(v.second->value(), v.second->unit(), true)
                << " (Number of samples: " << v.second->TotalNumValues() << ")" << endl;
       } else {
diff --git a/tests/infra/test_utils.py b/tests/infra/test_utils.py
index c998703..1c2e282 100644
--- a/tests/infra/test_utils.py
+++ b/tests/infra/test_utils.py
@@ -18,6 +18,8 @@
 # This module contains tests for some of the tests/util code.
 
 from tests.util.filesystem_utils import prepend_with_fs
+from tests.util.parse_util import get_bytes_summary_stats_counter
+
 
 def test_filesystem_utils():
   # Verify that empty FS prefix gives back the same path.
@@ -29,3 +31,18 @@ def test_filesystem_utils():
   path = "/fake-warehouse"
   assert prepend_with_fs(fs, path) == fs + path
   assert prepend_with_fs(fs, prepend_with_fs(fs, path)) == fs + path
+
+
+def test_get_bytes_summary_stats_counter():
+  """Test get_bytes_summary_stats_counter(counter_name, runtime_profile) using a dummy
+     runtime profile.
+  """
+  runtime_profile = "- ExampleCounter: (Avg: 8.00 KB (8192) ; " \
+                    "Min: 6.00 KB (6144) ; " \
+                    "Max: 10.00 KB (10240) ; " \
+                    "Number of samples: 4)"
+  summary_stats = get_bytes_summary_stats_counter("ExampleCounter",
+                                                  runtime_profile)
+  assert len(summary_stats) == 1
+  assert summary_stats[0].sum == 32768 and summary_stats[0].min_value == 6144 and \
+         summary_stats[0].max_value == 10240 and summary_stats[0].total_num_values == 4
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 35fc4d8..6d03269 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -55,6 +55,7 @@ from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
 from tests.util.hdfs_util import NAMENODE
 from tests.util.get_parquet_metadata import get_parquet_metadata
+from tests.util.parse_util import get_bytes_summary_stats_counter
 from tests.util.test_file_parser import QueryTestSectionReader
 
 # Test scanners with denial of reservations at varying frequency. This will affect the
@@ -535,9 +536,8 @@ class TestParquet(ImpalaTestSuite):
     assert (not result.log and not log_prefix) or \
         (log_prefix and result.log.startswith(log_prefix))
 
-    runtime_profile = str(result.runtime_profile)
     num_scanners_with_no_reads_list = re.findall(
-        'NumScannersWithNoReads: ([0-9]*)', runtime_profile)
+        'NumScannersWithNoReads: ([0-9]*)', result.runtime_profile)
 
     # This will fail if the number of impalads != 3
     # The fourth fragment is the "Averaged Fragment"
@@ -594,11 +594,11 @@ class TestParquet(ImpalaTestSuite):
     assert len(result.data) == 1
     assert result.data[0] == str(rows_in_table)
 
-    runtime_profile = str(result.runtime_profile)
-    num_row_groups_list = re.findall('NumRowGroups: ([0-9]*)', runtime_profile)
+    num_row_groups_list = re.findall('NumRowGroups: ([0-9]*)', result.runtime_profile)
     scan_ranges_complete_list = re.findall(
-        'ScanRangesComplete: ([0-9]*)', runtime_profile)
-    num_rows_read_list = re.findall('RowsRead: [0-9.K]* \(([0-9]*)\)', runtime_profile)
+        'ScanRangesComplete: ([0-9]*)', result.runtime_profile)
+    num_rows_read_list = re.findall('RowsRead: [0-9.K]* \(([0-9]*)\)',
+        result.runtime_profile)
 
     REGEX_UNIT_SECOND = "[0-9]*[s]*[0-9]*[.]*[0-9]*[nm]*[s]*"
     REGEX_MIN_MAX_FOOTER_PROCESSING_TIME = \
@@ -606,7 +606,7 @@ class TestParquet(ImpalaTestSuite):
             "Number of samples: %s\)" % (REGEX_UNIT_SECOND, REGEX_UNIT_SECOND,
             REGEX_UNIT_SECOND, "[0-9]*"))
     footer_processing_time_list = re.findall(
-        REGEX_MIN_MAX_FOOTER_PROCESSING_TIME, runtime_profile)
+        REGEX_MIN_MAX_FOOTER_PROCESSING_TIME, result.runtime_profile)
 
     # This will fail if the number of impalads != 3
     # The fourth fragment is the "Averaged Fragment"
@@ -807,6 +807,90 @@ class TestParquet(ImpalaTestSuite):
     self.run_test_case(
         'QueryTest/parquet-int64-timestamps', vector, unique_database)
 
+  def _is_summary_stats_counter_empty(self, counter):
+    """Returns true if the given TSummaryStatCounter is empty, false otherwise"""
+    return counter.max_value == counter.min_value == counter.sum ==\
+           counter.total_num_values == 0
+
+  def test_page_size_counters(self, vector):
+    """IMPALA-6964: Test that the counter Parquet[Un]compressedPageSize is updated
+       when reading [un]compressed Parquet files, and that the counter
+       Parquet[Un]compressedPageSize is not updated."""
+    # lineitem_sixblocks is not compressed so ParquetCompressedPageSize should be empty,
+    # but ParquetUncompressedPageSize should have been updated
+    result = self.client.execute("select * from functional_parquet.lineitem_sixblocks"
+                                 " limit 10")
+
+    compressed_page_size_summaries = get_bytes_summary_stats_counter(
+        "ParquetCompressedPageSize", result.runtime_profile)
+
+    assert len(compressed_page_size_summaries) > 0
+    for summary in compressed_page_size_summaries:
+      assert self._is_summary_stats_counter_empty(summary)
+
+    uncompressed_page_size_summaries = get_bytes_summary_stats_counter(
+        "ParquetUncompressedPageSize", result.runtime_profile)
+
+    # validate that some uncompressed data has been read; we don't validate the exact
+    # amount as the value can change depending on Parquet format optimizations, Impala
+    # scanner optimizations, etc.
+    assert len(uncompressed_page_size_summaries) > 0
+    for summary in uncompressed_page_size_summaries:
+      assert not self._is_summary_stats_counter_empty(summary)
+
+    # alltypestiny is compressed so both ParquetCompressedPageSize and
+    # ParquetUncompressedPageSize should have been updated
+    result = self.client.execute("select * from functional_parquet.alltypestiny"
+                                 " limit 10")
+
+    for summary_name in ("ParquetCompressedPageSize", "ParquetUncompressedPageSize"):
+      page_size_summaries = get_bytes_summary_stats_counter(
+          summary_name, result.runtime_profile)
+      assert len(page_size_summaries) > 0
+      for summary in page_size_summaries:
+        assert not self._is_summary_stats_counter_empty(summary)
+
+  def test_bytes_read_per_column(self, vector):
+    """IMPALA-6964: Test that the counter Parquet[Un]compressedBytesReadPerColumn is
+       updated when reading [un]compressed Parquet files, and that the counter
+       Parquet[Un]CompressedBytesReadPerColumn is not updated."""
+    # lineitem_sixblocks is not compressed so ParquetCompressedBytesReadPerColumn should
+    # be empty, but ParquetUncompressedBytesReadPerColumn should have been updated
+    result = self.client.execute("select * from functional_parquet.lineitem_sixblocks"
+                                 " limit 10")
+
+    compressed_bytes_read_per_col_summaries = get_bytes_summary_stats_counter(
+        "ParquetCompressedBytesReadPerColumn", result.runtime_profile)
+
+    assert len(compressed_bytes_read_per_col_summaries) > 0
+    for summary in compressed_bytes_read_per_col_summaries:
+      assert self._is_summary_stats_counter_empty(summary)
+
+    uncompressed_bytes_read_per_col_summaries = get_bytes_summary_stats_counter(
+        "ParquetUncompressedBytesReadPerColumn", result.runtime_profile)
+
+    assert len(uncompressed_bytes_read_per_col_summaries) > 0
+    for summary in uncompressed_bytes_read_per_col_summaries:
+      assert not self._is_summary_stats_counter_empty(summary)
+      # There are 16 columns in lineitem_sixblocks so there should be 16 samples
+      assert summary.total_num_values == 16
+
+    # alltypestiny is compressed so both ParquetCompressedBytesReadPerColumn and
+    # ParquetUncompressedBytesReadPerColumn should have been updated
+    result = self.client.execute("select * from functional_parquet.alltypestiny"
+                                 " limit 10")
+
+    for summary_name in ("ParquetCompressedBytesReadPerColumn",
+                         "ParquetUncompressedBytesReadPerColumn"):
+      bytes_read_per_col_summaries = get_bytes_summary_stats_counter(summary_name,
+          result.runtime_profile)
+      assert len(bytes_read_per_col_summaries) > 0
+      for summary in bytes_read_per_col_summaries:
+        assert not self._is_summary_stats_counter_empty(summary)
+        # There are 11 columns in alltypestiny so there should be 11 samples
+        assert summary.total_num_values == 11
+
+
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise:
 # 1. default scan range
@@ -1121,9 +1205,8 @@ class TestOrc(ImpalaTestSuite):
     result = self.client.execute(query)
     assert len(result.data) == rows_in_table
 
-    runtime_profile = str(result.runtime_profile)
     num_scanners_with_no_reads_list = re.findall(
-      'NumScannersWithNoReads: ([0-9]*)', runtime_profile)
+      'NumScannersWithNoReads: ([0-9]*)', result.runtime_profile)
 
     # This will fail if the number of impalads != 3
     # The fourth fragment is the "Averaged Fragment"
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 3d3c950..5dbaff0 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -17,6 +17,7 @@
 
 import re
 from datetime import datetime
+from RuntimeProfile.ttypes import TSummaryStatsCounter
 
 # IMPALA-6715: Every so often the stress test or the TPC workload directories get
 # changed, and the stress test loses the ability to run the full set of queries. Set
@@ -125,3 +126,53 @@ def match_memory_estimate(explain_lines):
   if None in (mem_limit, units):
     raise Exception('could not parse explain string:\n' + '\n'.join(explain_lines))
   return mem_limit, units
+
+
+def get_bytes_summary_stats_counter(counter_name, runtime_profile):
+  """Extracts a list of TSummaryStatsCounters from a given runtime profile where the units
+     are in bytes. Each entry in the returned list corresponds to a single occurrence of
+     the counter in the profile. If the counter is present, but it has not been updated,
+     an empty TSummaryStatsCounter is returned for that entry. If the counter is not in
+     the given profile, an empty list is returned. Here is an example of how this method
+     should be used:
+
+       # A single line in a runtime profile used for example purposes.
+       runtime_profile = "- ExampleCounter: (Avg: 8.00 KB (8192) ; " \
+                                            "Min: 8.00 KB (8192) ; " \
+                                            "Max: 8.00 KB (8192) ; " \
+                                            "Number of samples: 4)"
+       summary_stats = get_bytes_summary_stats_counter("ExampleCounter",
+                                                      runtime_profile)
+       assert len(summary_stats) == 1
+       assert summary_stats[0].sum == summary_stats[0].min_value == \
+              summary_stats[0].max_value == 8192 and \
+              summary_stats[0].total_num_values == 1
+  """
+
+  regex_summary_stat = re.compile(r"""\(
+    Avg:[^\(]*\((?P<avg>[0-9]+)\)\s;\s # Matches Avg: [?].[?] [?]B (?)
+    Min:[^\(]*\((?P<min>[0-9]+)\)\s;\s # Matches Min: [?].[?] [?]B (?)
+    Max:[^\(]*\((?P<max>[0-9]+)\)\s;\s # Matches Max: [?].[?] [?]B (?)
+    Number\sof\ssamples:\s(?P<samples>[0-9]+)\) # Matches Number of samples: ?)""",
+                                  re.VERBOSE)
+
+  # First, find all lines that contain the counter name, and then extract the summary
+  # stats from each line. If the summary stats cannot be extracted, return a dictionary
+  # with values of 0 for all keys.
+  summary_stats = []
+  for counter in re.findall(counter_name + ".*", runtime_profile):
+    summary_stat = re.search(regex_summary_stat, counter)
+    # We need to special-case when the counter has not been updated at all because empty
+    # summary counters have a different format than updated ones.
+    if not summary_stat:
+      assert "0 (Number of samples: 0)" in counter
+      summary_stats.append(TSummaryStatsCounter(sum=0, total_num_values=0, min_value=0,
+                                                max_value=0))
+    else:
+      summary_stat = summary_stat.groupdict()
+      num_samples = int(summary_stat['samples'])
+      summary_stats.append(TSummaryStatsCounter(sum=num_samples *
+          int(summary_stat['avg']), total_num_values=num_samples,
+          min_value=int(summary_stat['min']), max_value=int(summary_stat['max'])))
+
+  return summary_stats