You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2019/01/17 15:06:08 UTC
[impala] 03/03: IMPALA-6964: Track stats about column and page
sizes in Parquet reader
This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8da44ce16bb190dadab2ff3d22e5df726d1128e3
Author: stakiar <ta...@gmail.com>
AuthorDate: Fri Oct 19 07:48:16 2018 -0500
IMPALA-6964: Track stats about column and page sizes in Parquet reader
Adds the following new stats:
* ParquetCompressedPageSize - a summary (average, min, max) counter that
tracks the size of compressed pages read, if no compressed pages are
read then this counter is empty
* ParquetUncompressedPageSize - a summary counter that tracks the size
of uncompressed pages read, it is updated in two places: (1) when a
compressed page is de-compressed, and (2) when a page that is not
compressed is read
* ParquetCompressedDataReadPerColumn - a summary counter that tracks the
amount of compressed data read per column for a scan node
* ParquetUncompressedDataReadPerColumn - a summary counter that tracks
the amount of uncompressed data read per column for a scan node
The PerColumn counters are calculated by aggregating the number of bytes
read for each column across all scan ranges processed by a scan node.
Each sample in the counter is the size of a single column.
Here is an example of what the updated HDFS scan profile looks like:
- ParquetCompressedDataReadPerColumn: (Avg: 227.56 KB (233018) ;
Min: 225.14 KB (230540) ; Max: 229.98 KB (235496) ; Number of samples: 2)
- ParquetUncompressedDataReadPerColumn: (Avg: 227.96 KB (233426) ;
Min: 224.91 KB (230306) ; Max: 231.00 KB (236547) ; Number of samples: 2)
- ParquetCompressedPageSize: (Avg: 4.46 KB (4568) ; Min: 3.86 KB (3955) ;
Max: 5.19 KB (5315) ; Number of samples: 102)
- ParquetDecompressedPageSize: (Avg: 4.47 KB (4576) ; Min: 3.86 KB (3950)
; Max: 5.22 KB (5349) ; Number of samples: 102)
Testing:
* Added new tests to test_scanners.py that do some basic validation of
the new counters above
Change-Id: I322f9b324b6828df28e5caf79529085c43d7c817
Reviewed-on: http://gerrit.cloudera.org:8080/11575
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/hdfs-scan-node-base.cc | 45 ++++++++++++
be/src/exec/hdfs-scan-node-base.h | 33 +++++++++
be/src/exec/parquet/hdfs-parquet-scanner.cc | 15 ++++
be/src/exec/parquet/hdfs-parquet-scanner.h | 18 +++++
be/src/exec/parquet/parquet-column-readers.cc | 11 +++
be/src/util/runtime-profile.cc | 2 +-
tests/infra/test_utils.py | 17 +++++
tests/query_test/test_scanners.py | 101 +++++++++++++++++++++++---
tests/util/parse_util.py | 51 +++++++++++++
9 files changed, 283 insertions(+), 10 deletions(-)
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 1e627b4..3470124 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -373,6 +373,11 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
initial_range_actual_reservation_stats_ = ADD_SUMMARY_STATS_COUNTER(runtime_profile(),
"InitialRangeActualReservation", TUnit::BYTES);
+ uncompressed_bytes_read_per_column_counter_ = ADD_SUMMARY_STATS_COUNTER(
+ runtime_profile(), "ParquetUncompressedBytesReadPerColumn", TUnit::BYTES);
+ compressed_bytes_read_per_column_counter_ = ADD_SUMMARY_STATS_COUNTER(
+ runtime_profile(), "ParquetCompressedBytesReadPerColumn", TUnit::BYTES);
+
bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal",
TUnit::BYTES);
bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit",
@@ -892,6 +897,25 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
runtime_profile()->AppendExecOption(
Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
+ // Locking here should not be necessary since bytes_read_per_col_ is only updated inside
+ // column readers, and all column readers should have completed at this point; however,
+ // we acquire a read lock in case the update semantics of bytes_read_per_col_ change
+ {
+ shared_lock<shared_mutex> bytes_read_per_col_guard_read_lock(
+ bytes_read_per_col_lock_);
+ for (const auto& bytes_read : bytes_read_per_col_) {
+ int64_t uncompressed_bytes_read = bytes_read.second.uncompressed_bytes_read.Load();
+ if (uncompressed_bytes_read > 0) {
+ uncompressed_bytes_read_per_column_counter_->UpdateCounter(
+ uncompressed_bytes_read);
+ }
+ int64_t compressed_bytes_read = bytes_read.second.compressed_bytes_read.Load();
+ if (compressed_bytes_read > 0) {
+ compressed_bytes_read_per_column_counter_->UpdateCounter(compressed_bytes_read);
+ }
+ }
+ }
+
if (reader_context_ != nullptr) {
bytes_read_local_->Set(reader_context_->bytes_read_local());
bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
@@ -926,3 +950,24 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
Status HdfsScanNodeBase::ScanNodeDebugAction(TExecNodePhase::type phase) {
return ExecDebugAction(phase, runtime_state_);
}
+
+void HdfsScanNodeBase::UpdateBytesRead(
+ SlotId slot_id, int64_t uncompressed_bytes_read, int64_t compressed_bytes_read) {
+ // Acquire a read lock first and check if the slot_id is in bytes_read_per_col_, if it
+ // is then update the value and release the read lock; if not then release the read
+ // lock, acquire the write lock, and then initialize the slot_id with the give value for
+ // bytes_read
+ shared_lock<shared_mutex> bytes_read_per_col_guard_read_lock(
+ bytes_read_per_col_lock_);
+ auto bytes_read_itr = bytes_read_per_col_.find(slot_id);
+ if (bytes_read_itr != bytes_read_per_col_.end()) {
+ bytes_read_itr->second.uncompressed_bytes_read.Add(uncompressed_bytes_read);
+ bytes_read_itr->second.compressed_bytes_read.Add(compressed_bytes_read);
+ } else {
+ bytes_read_per_col_guard_read_lock.unlock();
+ lock_guard<shared_mutex> bytes_read_per_col_guard_write_lock(
+ bytes_read_per_col_lock_);
+ bytes_read_per_col_[slot_id].uncompressed_bytes_read.Add(uncompressed_bytes_read);
+ bytes_read_per_col_[slot_id].compressed_bytes_read.Add(compressed_bytes_read);
+ }
+}
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 7da66cb..168bd18 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -363,6 +363,12 @@ class HdfsScanNodeBase : public ScanNode {
int64_t IncreaseReservationIncrementally(int64_t curr_reservation,
int64_t ideal_reservation);
+ /// Update the number of [un]compressed bytes read for the given SlotId. This is used
+ /// to track the number of bytes read per column and is meant to be called by
+ /// individual scanner classes.
+ void UpdateBytesRead(
+ SlotId slot_id, int64_t uncompressed_bytes_read, int64_t compressed_bytes_read);
+
protected:
friend class ScannerContext;
friend class HdfsScanner;
@@ -545,6 +551,15 @@ class HdfsScanNodeBase : public ScanNode {
RuntimeProfile::SummaryStatsCounter* initial_range_ideal_reservation_stats_ = nullptr;
RuntimeProfile::SummaryStatsCounter* initial_range_actual_reservation_stats_ = nullptr;
+ /// Track stats about the number of bytes read per column. Each sample in the counter is
+ /// the size of a single column that is scanned by the scan node. The scan node tracks
+ /// the number of bytes read for each column it processes, and when the scan node is
+ /// closed, it updates these counters with the size of each column.
+ RuntimeProfile::SummaryStatsCounter* compressed_bytes_read_per_column_counter_ =
+ nullptr;
+ RuntimeProfile::SummaryStatsCounter* uncompressed_bytes_read_per_column_counter_ =
+ nullptr;
+
/// Pool for allocating some amounts of memory that is shared between scanners.
/// e.g. partition key tuple and their string buffers
boost::scoped_ptr<MemPool> scan_node_pool_;
@@ -554,6 +569,24 @@ class HdfsScanNodeBase : public ScanNode {
/// scanner threads.
Status status_;
+ /// Struct that tracks the uncompressed and compressed bytes read. Used by the map
+ /// bytes_read_per_col_ to track the [un]compressed bytes read per column. Types are
+ /// atomic as the struct may be updated concurrently.
+ struct BytesRead {
+ AtomicInt64 uncompressed_bytes_read;
+ AtomicInt64 compressed_bytes_read;
+ };
+
+ /// Map from SlotId (column identifer) to a pair where the first entry is the number of
+ /// uncompressed bytes read for the column and the second entry is the number of
+ /// compressed bytes read for the column. This map is used to update the
+ /// [un]compressed_bytes_read_per_column counter.
+ std::unordered_map<SlotId, BytesRead> bytes_read_per_col_;
+
+ /// Lock that controls access to bytes_read_per_col_ so that multiple scanners
+ /// can update the map concurrently
+ boost::shared_mutex bytes_read_per_col_lock_;
+
/// Performs dynamic partition pruning, i.e., applies runtime filters to files, and
/// issues initial ranges for all file types. Waits for runtime filters if necessary.
/// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true.
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 9cfcdb8..4fe9914 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -100,6 +100,8 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
num_row_groups_counter_(nullptr),
num_scanners_with_no_reads_counter_(nullptr),
num_dict_filtered_row_groups_counter_(nullptr),
+ parquet_compressed_page_size_counter_(nullptr),
+ parquet_uncompressed_page_size_counter_(nullptr),
coll_items_read_counter_(0),
codegend_process_scratch_batch_fn_(nullptr) {
assemble_rows_timer_.Stop();
@@ -121,6 +123,10 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", TUnit::UNIT);
process_footer_timer_stats_ =
ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "FooterProcessingTime");
+ parquet_compressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER(
+ scan_node_->runtime_profile(), "ParquetCompressedPageSize", TUnit::BYTES);
+ parquet_uncompressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER(
+ scan_node_->runtime_profile(), "ParquetUncompressedPageSize", TUnit::BYTES);
codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
@@ -1681,4 +1687,13 @@ ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder(
return ParquetTimestampDecoder(element, &state_->local_time_zone(),
timestamp_conversion_needed_for_int96_timestamps);
}
+
+void HdfsParquetScanner::UpdateCompressedPageSizeCounter(int64_t compressed_page_size) {
+ parquet_compressed_page_size_counter_->UpdateCounter(compressed_page_size);
+}
+
+void HdfsParquetScanner::UpdateUncompressedPageSizeCounter(
+ int64_t uncompressed_page_size) {
+ parquet_uncompressed_page_size_counter_->UpdateCounter(uncompressed_page_size);
+}
}
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 94d4714..09da2bb 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -452,6 +452,16 @@ class HdfsParquetScanner : public HdfsScanner {
/// Number of row groups skipped due to dictionary filter
RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_;
+ /// Tracks the size of any compressed pages read. If no compressed pages are read, this
+ /// counter is empty
+ RuntimeProfile::SummaryStatsCounter* parquet_compressed_page_size_counter_;
+
+ /// Tracks the size of any de-compressed pages reads. This counter is updated in two
+ /// places: (1) when a compressed page is de-compressed, the de-compressed size is added
+ /// to this counter, (2) when a page that is not compressed is read, its size is added
+ /// to this counter
+ RuntimeProfile::SummaryStatsCounter* parquet_uncompressed_page_size_counter_;
+
/// Number of collection items read in current row batch. It is a scanner-local counter
/// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
/// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
@@ -640,6 +650,14 @@ class HdfsParquetScanner : public HdfsScanner {
/// no values that pass the relevant conjuncts, then the row group can be skipped.
Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
bool* skip_row_group) WARN_UNUSED_RESULT;
+
+ /// Updates the counter parquet_compressed_page_size_counter_ with the given compressed
+ /// page size. Called by ParquetColumnReader for each page read.
+ void UpdateCompressedPageSizeCounter(int64_t compressed_page_size);
+
+ /// Updates the counter parquet_uncompressed_page_size_counter_ with the given
+ /// uncompressed page size. Called by ParquetColumnReader for each page read.
+ void UpdateUncompressedPageSizeCounter(int64_t uncompressed_page_size);
};
} // namespace impala
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index fce14b1..f6ad734 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -1355,6 +1355,13 @@ Status BaseScalarColumnReader::ReadDataPage() {
data_ = decompressed_buffer;
data_size = current_page_header_.uncompressed_page_size;
data_end_ = data_ + data_size;
+ if (slot_desc() != nullptr) {
+ parent_->scan_node_->UpdateBytesRead(slot_desc()->id(), uncompressed_size,
+ current_page_header_.compressed_page_size);
+ parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
+ parent_->UpdateCompressedPageSizeCounter(
+ current_page_header_.compressed_page_size);
+ }
} else {
DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
if (current_page_header_.compressed_page_size != uncompressed_size) {
@@ -1373,6 +1380,10 @@ Status BaseScalarColumnReader::ReadDataPage() {
data_ = copy_buffer;
data_end_ = data_ + uncompressed_size;
}
+ if (slot_desc() != nullptr) {
+ parent_->scan_node_->UpdateBytesRead(slot_desc()->id(), uncompressed_size, 0);
+ parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
+ }
}
// Initialize the repetition level data
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index cdb9dd9..9892c28 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -795,7 +795,7 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
for (const SummaryStatsCounterMap::value_type& v: summary_stats_map_) {
if (v.second->TotalNumValues() == 0) {
// No point printing all the stats if number of samples is zero.
- stream << prefix << " - " << v.first << ": "
+ stream << prefix << " - " << v.first << ": "
<< PrettyPrinter::Print(v.second->value(), v.second->unit(), true)
<< " (Number of samples: " << v.second->TotalNumValues() << ")" << endl;
} else {
diff --git a/tests/infra/test_utils.py b/tests/infra/test_utils.py
index c998703..1c2e282 100644
--- a/tests/infra/test_utils.py
+++ b/tests/infra/test_utils.py
@@ -18,6 +18,8 @@
# This module contains tests for some of the tests/util code.
from tests.util.filesystem_utils import prepend_with_fs
+from tests.util.parse_util import get_bytes_summary_stats_counter
+
def test_filesystem_utils():
# Verify that empty FS prefix gives back the same path.
@@ -29,3 +31,18 @@ def test_filesystem_utils():
path = "/fake-warehouse"
assert prepend_with_fs(fs, path) == fs + path
assert prepend_with_fs(fs, prepend_with_fs(fs, path)) == fs + path
+
+
+def test_get_bytes_summary_stats_counter():
+ """Test get_bytes_summary_stats_counter(counter_name, runtime_profile) using a dummy
+ runtime profile.
+ """
+ runtime_profile = "- ExampleCounter: (Avg: 8.00 KB (8192) ; " \
+ "Min: 6.00 KB (6144) ; " \
+ "Max: 10.00 KB (10240) ; " \
+ "Number of samples: 4)"
+ summary_stats = get_bytes_summary_stats_counter("ExampleCounter",
+ runtime_profile)
+ assert len(summary_stats) == 1
+ assert summary_stats[0].sum == 32768 and summary_stats[0].min_value == 6144 and \
+ summary_stats[0].max_value == 10240 and summary_stats[0].total_num_values == 4
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 35fc4d8..6d03269 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -55,6 +55,7 @@ from tests.common.test_vector import ImpalaTestDimension
from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
from tests.util.hdfs_util import NAMENODE
from tests.util.get_parquet_metadata import get_parquet_metadata
+from tests.util.parse_util import get_bytes_summary_stats_counter
from tests.util.test_file_parser import QueryTestSectionReader
# Test scanners with denial of reservations at varying frequency. This will affect the
@@ -535,9 +536,8 @@ class TestParquet(ImpalaTestSuite):
assert (not result.log and not log_prefix) or \
(log_prefix and result.log.startswith(log_prefix))
- runtime_profile = str(result.runtime_profile)
num_scanners_with_no_reads_list = re.findall(
- 'NumScannersWithNoReads: ([0-9]*)', runtime_profile)
+ 'NumScannersWithNoReads: ([0-9]*)', result.runtime_profile)
# This will fail if the number of impalads != 3
# The fourth fragment is the "Averaged Fragment"
@@ -594,11 +594,11 @@ class TestParquet(ImpalaTestSuite):
assert len(result.data) == 1
assert result.data[0] == str(rows_in_table)
- runtime_profile = str(result.runtime_profile)
- num_row_groups_list = re.findall('NumRowGroups: ([0-9]*)', runtime_profile)
+ num_row_groups_list = re.findall('NumRowGroups: ([0-9]*)', result.runtime_profile)
scan_ranges_complete_list = re.findall(
- 'ScanRangesComplete: ([0-9]*)', runtime_profile)
- num_rows_read_list = re.findall('RowsRead: [0-9.K]* \(([0-9]*)\)', runtime_profile)
+ 'ScanRangesComplete: ([0-9]*)', result.runtime_profile)
+ num_rows_read_list = re.findall('RowsRead: [0-9.K]* \(([0-9]*)\)',
+ result.runtime_profile)
REGEX_UNIT_SECOND = "[0-9]*[s]*[0-9]*[.]*[0-9]*[nm]*[s]*"
REGEX_MIN_MAX_FOOTER_PROCESSING_TIME = \
@@ -606,7 +606,7 @@ class TestParquet(ImpalaTestSuite):
"Number of samples: %s\)" % (REGEX_UNIT_SECOND, REGEX_UNIT_SECOND,
REGEX_UNIT_SECOND, "[0-9]*"))
footer_processing_time_list = re.findall(
- REGEX_MIN_MAX_FOOTER_PROCESSING_TIME, runtime_profile)
+ REGEX_MIN_MAX_FOOTER_PROCESSING_TIME, result.runtime_profile)
# This will fail if the number of impalads != 3
# The fourth fragment is the "Averaged Fragment"
@@ -807,6 +807,90 @@ class TestParquet(ImpalaTestSuite):
self.run_test_case(
'QueryTest/parquet-int64-timestamps', vector, unique_database)
+ def _is_summary_stats_counter_empty(self, counter):
+ """Returns true if the given TSummaryStatCounter is empty, false otherwise"""
+ return counter.max_value == counter.min_value == counter.sum ==\
+ counter.total_num_values == 0
+
+ def test_page_size_counters(self, vector):
+ """IMPALA-6964: Test that the counter Parquet[Un]compressedPageSize is updated
+ when reading [un]compressed Parquet files, and that the counter
+ Parquet[Un]compressedPageSize is not updated."""
+ # lineitem_sixblocks is not compressed so ParquetCompressedPageSize should be empty,
+ # but ParquetUncompressedPageSize should have been updated
+ result = self.client.execute("select * from functional_parquet.lineitem_sixblocks"
+ " limit 10")
+
+ compressed_page_size_summaries = get_bytes_summary_stats_counter(
+ "ParquetCompressedPageSize", result.runtime_profile)
+
+ assert len(compressed_page_size_summaries) > 0
+ for summary in compressed_page_size_summaries:
+ assert self._is_summary_stats_counter_empty(summary)
+
+ uncompressed_page_size_summaries = get_bytes_summary_stats_counter(
+ "ParquetUncompressedPageSize", result.runtime_profile)
+
+ # validate that some uncompressed data has been read; we don't validate the exact
+ # amount as the value can change depending on Parquet format optimizations, Impala
+ # scanner optimizations, etc.
+ assert len(uncompressed_page_size_summaries) > 0
+ for summary in uncompressed_page_size_summaries:
+ assert not self._is_summary_stats_counter_empty(summary)
+
+ # alltypestiny is compressed so both ParquetCompressedPageSize and
+ # ParquetUncompressedPageSize should have been updated
+ result = self.client.execute("select * from functional_parquet.alltypestiny"
+ " limit 10")
+
+ for summary_name in ("ParquetCompressedPageSize", "ParquetUncompressedPageSize"):
+ page_size_summaries = get_bytes_summary_stats_counter(
+ summary_name, result.runtime_profile)
+ assert len(page_size_summaries) > 0
+ for summary in page_size_summaries:
+ assert not self._is_summary_stats_counter_empty(summary)
+
+ def test_bytes_read_per_column(self, vector):
+ """IMPALA-6964: Test that the counter Parquet[Un]compressedBytesReadPerColumn is
+ updated when reading [un]compressed Parquet files, and that the counter
+ Parquet[Un]CompressedBytesReadPerColumn is not updated."""
+ # lineitem_sixblocks is not compressed so ParquetCompressedBytesReadPerColumn should
+ # be empty, but ParquetUncompressedBytesReadPerColumn should have been updated
+ result = self.client.execute("select * from functional_parquet.lineitem_sixblocks"
+ " limit 10")
+
+ compressed_bytes_read_per_col_summaries = get_bytes_summary_stats_counter(
+ "ParquetCompressedBytesReadPerColumn", result.runtime_profile)
+
+ assert len(compressed_bytes_read_per_col_summaries) > 0
+ for summary in compressed_bytes_read_per_col_summaries:
+ assert self._is_summary_stats_counter_empty(summary)
+
+ uncompressed_bytes_read_per_col_summaries = get_bytes_summary_stats_counter(
+ "ParquetUncompressedBytesReadPerColumn", result.runtime_profile)
+
+ assert len(uncompressed_bytes_read_per_col_summaries) > 0
+ for summary in uncompressed_bytes_read_per_col_summaries:
+ assert not self._is_summary_stats_counter_empty(summary)
+ # There are 16 columns in lineitem_sixblocks so there should be 16 samples
+ assert summary.total_num_values == 16
+
+ # alltypestiny is compressed so both ParquetCompressedBytesReadPerColumn and
+ # ParquetUncompressedBytesReadPerColumn should have been updated
+ result = self.client.execute("select * from functional_parquet.alltypestiny"
+ " limit 10")
+
+ for summary_name in ("ParquetCompressedBytesReadPerColumn",
+ "ParquetUncompressedBytesReadPerColumn"):
+ bytes_read_per_col_summaries = get_bytes_summary_stats_counter(summary_name,
+ result.runtime_profile)
+ assert len(bytes_read_per_col_summaries) > 0
+ for summary in bytes_read_per_col_summaries:
+ assert not self._is_summary_stats_counter_empty(summary)
+ # There are 11 columns in alltypestiny so there should be 11 samples
+ assert summary.total_num_values == 11
+
+
# We use various scan range lengths to exercise corner cases in the HDFS scanner more
# thoroughly. In particular, it will exercise:
# 1. default scan range
@@ -1121,9 +1205,8 @@ class TestOrc(ImpalaTestSuite):
result = self.client.execute(query)
assert len(result.data) == rows_in_table
- runtime_profile = str(result.runtime_profile)
num_scanners_with_no_reads_list = re.findall(
- 'NumScannersWithNoReads: ([0-9]*)', runtime_profile)
+ 'NumScannersWithNoReads: ([0-9]*)', result.runtime_profile)
# This will fail if the number of impalads != 3
# The fourth fragment is the "Averaged Fragment"
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 3d3c950..5dbaff0 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -17,6 +17,7 @@
import re
from datetime import datetime
+from RuntimeProfile.ttypes import TSummaryStatsCounter
# IMPALA-6715: Every so often the stress test or the TPC workload directories get
# changed, and the stress test loses the ability to run the full set of queries. Set
@@ -125,3 +126,53 @@ def match_memory_estimate(explain_lines):
if None in (mem_limit, units):
raise Exception('could not parse explain string:\n' + '\n'.join(explain_lines))
return mem_limit, units
+
+
+def get_bytes_summary_stats_counter(counter_name, runtime_profile):
+ """Extracts a list of TSummaryStatsCounters from a given runtime profile where the units
+ are in bytes. Each entry in the returned list corresponds to a single occurrence of
+ the counter in the profile. If the counter is present, but it has not been updated,
+ an empty TSummaryStatsCounter is returned for that entry. If the counter is not in
+ the given profile, an empty list is returned. Here is an example of how this method
+ should be used:
+
+ # A single line in a runtime profile used for example purposes.
+ runtime_profile = "- ExampleCounter: (Avg: 8.00 KB (8192) ; " \
+ "Min: 8.00 KB (8192) ; " \
+ "Max: 8.00 KB (8192) ; " \
+ "Number of samples: 4)"
+ summary_stats = get_bytes_summary_stats_counter("ExampleCounter",
+ runtime_profile)
+ assert len(summary_stats) == 1
+ assert summary_stats[0].sum == summary_stats[0].min_value == \
+ summary_stats[0].max_value == 8192 and \
+ summary_stats[0].total_num_values == 1
+ """
+
+ regex_summary_stat = re.compile(r"""\(
+ Avg:[^\(]*\((?P<avg>[0-9]+)\)\s;\s # Matches Avg: [?].[?] [?]B (?)
+ Min:[^\(]*\((?P<min>[0-9]+)\)\s;\s # Matches Min: [?].[?] [?]B (?)
+ Max:[^\(]*\((?P<max>[0-9]+)\)\s;\s # Matches Max: [?].[?] [?]B (?)
+ Number\sof\ssamples:\s(?P<samples>[0-9]+)\) # Matches Number of samples: ?)""",
+ re.VERBOSE)
+
+ # First, find all lines that contain the counter name, and then extract the summary
+ # stats from each line. If the summary stats cannot be extracted, return a dictionary
+ # with values of 0 for all keys.
+ summary_stats = []
+ for counter in re.findall(counter_name + ".*", runtime_profile):
+ summary_stat = re.search(regex_summary_stat, counter)
+ # We need to special-case when the counter has not been updated at all because empty
+ # summary counters have a different format than updated ones.
+ if not summary_stat:
+ assert "0 (Number of samples: 0)" in counter
+ summary_stats.append(TSummaryStatsCounter(sum=0, total_num_values=0, min_value=0,
+ max_value=0))
+ else:
+ summary_stat = summary_stat.groupdict()
+ num_samples = int(summary_stat['samples'])
+ summary_stats.append(TSummaryStatsCounter(sum=num_samples *
+ int(summary_stat['avg']), total_num_values=num_samples,
+ min_value=int(summary_stat['min']), max_value=int(summary_stat['max'])))
+
+ return summary_stats