You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/08 14:22:54 UTC
[arrow-rs] branch master updated: refactor: Group metrics into page and column metrics structs (#2363)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 0c296ab61 refactor: Group metrics into page and column metrics structs (#2363)
0c296ab61 is described below
commit 0c296ab619cbdd2479bb8ab30073e1de71f27b57
Author: Markus Westerlind <mw...@influxdata.com>
AuthorDate: Mon Aug 8 16:22:49 2022 +0200
refactor: Group metrics into page and column metrics structs (#2363)
* refactor: Extract page metrics to a struct
* refactor: Extract column metrics to a struct
* refactor: Reset PageMetrics as a whole
---
parquet/src/column/writer/mod.rs | 194 ++++++++++++++++++++++-----------------
1 file changed, 110 insertions(+), 84 deletions(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index ce773c19d..20a3babd8 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -153,6 +153,29 @@ type ColumnCloseResult = (
Option<OffsetIndex>,
);
+// Metrics per page
+#[derive(Default)]
+struct PageMetrics {
+ num_buffered_values: u32,
+ num_buffered_rows: u32,
+ num_page_nulls: u64,
+}
+
+// Metrics per column writer
+struct ColumnMetrics<T> {
+ total_bytes_written: u64,
+ total_rows_written: u64,
+ total_uncompressed_size: u64,
+ total_compressed_size: u64,
+ total_num_values: u64,
+ dictionary_page_offset: Option<u64>,
+ data_page_offset: Option<u64>,
+ min_column_value: Option<T>,
+ max_column_value: Option<T>,
+ num_column_nulls: u64,
+ column_distinct_count: Option<u64>,
+}
+
/// Typed column writer for a primitive column.
pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
@@ -167,31 +190,13 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
compressor: Option<Box<dyn Codec>>,
encoder: E,
- // Metrics per page
- /// The number of values including nulls in the in-progress data page
- num_buffered_values: u32,
- /// The number of rows in the in-progress data page
- num_buffered_rows: u32,
- /// The number of nulls in the in-progress data page
- num_page_nulls: u64,
-
+ page_metrics: PageMetrics,
// Metrics per column writer
- total_bytes_written: u64,
- total_rows_written: u64,
- total_uncompressed_size: u64,
- total_compressed_size: u64,
- total_num_values: u64,
- dictionary_page_offset: Option<u64>,
- data_page_offset: Option<u64>,
- min_column_value: Option<E::T>,
- max_column_value: Option<E::T>,
- num_column_nulls: u64,
- column_distinct_count: Option<u64>,
+ column_metrics: ColumnMetrics<E::T>,
/// The order of encodings within the generated metadata does not impact its meaning,
/// but we use a BTreeSet so that the output is deterministic
encodings: BTreeSet<Encoding>,
-
// Reused buffers
def_levels_sink: Vec<i16>,
rep_levels_sink: Vec<i16>,
@@ -226,23 +231,27 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
codec,
compressor,
encoder,
- num_buffered_values: 0,
- num_buffered_rows: 0,
- num_page_nulls: 0,
- total_bytes_written: 0,
- total_rows_written: 0,
- total_uncompressed_size: 0,
- total_compressed_size: 0,
- total_num_values: 0,
- dictionary_page_offset: None,
- data_page_offset: None,
def_levels_sink: vec![],
rep_levels_sink: vec![],
data_pages: VecDeque::new(),
- min_column_value: None,
- max_column_value: None,
- num_column_nulls: 0,
- column_distinct_count: None,
+ page_metrics: PageMetrics {
+ num_buffered_values: 0,
+ num_buffered_rows: 0,
+ num_page_nulls: 0,
+ },
+ column_metrics: ColumnMetrics {
+ total_bytes_written: 0,
+ total_rows_written: 0,
+ total_uncompressed_size: 0,
+ total_compressed_size: 0,
+ total_num_values: 0,
+ dictionary_page_offset: None,
+ data_page_offset: None,
+ min_column_value: None,
+ max_column_value: None,
+ num_column_nulls: 0,
+ column_distinct_count: None,
+ },
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
@@ -284,8 +293,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
if self.statistics_enabled == EnabledStatistics::Chunk {
match (min, max) {
(Some(min), Some(max)) => {
- update_min(&self.descr, min, &mut self.min_column_value);
- update_max(&self.descr, max, &mut self.max_column_value);
+ update_min(
+ &self.descr,
+ min,
+ &mut self.column_metrics.min_column_value,
+ );
+ update_max(
+ &self.descr,
+ max,
+ &mut self.column_metrics.max_column_value,
+ );
}
(None, Some(_)) | (Some(_), None) => {
panic!("min/max should be both set or both None")
@@ -293,8 +310,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
(None, None) => {
if let Some((min, max)) = self.encoder.min_max(values, value_indices)
{
- update_min(&self.descr, &min, &mut self.min_column_value);
- update_max(&self.descr, &max, &mut self.max_column_value);
+ update_min(
+ &self.descr,
+ &min,
+ &mut self.column_metrics.min_column_value,
+ );
+ update_max(
+ &self.descr,
+ &max,
+ &mut self.column_metrics.max_column_value,
+ );
}
}
};
@@ -302,9 +327,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// We can only set the distinct count if there are no other writes
if self.encoder.num_values() == 0 {
- self.column_distinct_count = distinct_count;
+ self.column_metrics.column_distinct_count = distinct_count;
} else {
- self.column_distinct_count = None;
+ self.column_metrics.column_distinct_count = None;
}
let mut values_offset = 0;
@@ -385,19 +410,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
/// Returns total number of bytes written by this column writer so far.
/// This value is also returned when column writer is closed.
pub fn get_total_bytes_written(&self) -> u64 {
- self.total_bytes_written
+ self.column_metrics.total_bytes_written
}
/// Returns total number of rows written by this column writer so far.
/// This value is also returned when column writer is closed.
pub fn get_total_rows_written(&self) -> u64 {
- self.total_rows_written
+ self.column_metrics.total_rows_written
}
/// Finalises writes and closes the column writer.
/// Returns total bytes written, total rows written and column chunk metadata.
pub fn close(mut self) -> Result<ColumnCloseResult> {
- if self.num_buffered_values > 0 {
+ if self.page_metrics.num_buffered_values > 0 {
self.add_data_page()?;
}
if self.encoder.has_dictionary() {
@@ -417,8 +442,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
};
Ok((
- self.total_bytes_written,
- self.total_rows_written,
+ self.column_metrics.total_bytes_written,
+ self.column_metrics.total_rows_written,
metadata,
column_index,
offset_index,
@@ -464,7 +489,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
values_to_write += 1;
} else {
// We must always compute this as it is used to populate v2 pages
- self.num_page_nulls += 1
+ self.page_metrics.num_page_nulls += 1
}
}
@@ -486,14 +511,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Count the occasions where we start a new row
for &level in levels {
- self.num_buffered_rows += (level == 0) as u32
+ self.page_metrics.num_buffered_rows += (level == 0) as u32
}
self.rep_levels_sink.extend_from_slice(levels);
} else {
// Each value is exactly one row.
// Equals to the number of values, we count nulls as well.
- self.num_buffered_rows += num_levels as u32;
+ self.page_metrics.num_buffered_rows += num_levels as u32;
}
match value_indices {
@@ -504,7 +529,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
None => self.encoder.write(values, values_offset, values_to_write)?,
}
- self.num_buffered_values += num_levels as u32;
+ self.page_metrics.num_buffered_values += num_levels as u32;
if self.should_add_data_page() {
self.add_data_page()?;
@@ -547,7 +572,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
/// Prepares and writes dictionary and all data pages into page writer.
fn dict_fallback(&mut self) -> Result<()> {
// At this point we know that we need to fall back.
- if self.num_buffered_values > 0 {
+ if self.page_metrics.num_buffered_values > 0 {
self.add_data_page()?;
}
self.write_dictionary_page()?;
@@ -558,7 +583,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: &Option<Statistics>) {
// update the column index
- let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls;
+ let null_page = (self.page_metrics.num_buffered_rows as u64)
+ == self.page_metrics.num_page_nulls;
// a page contains only null values,
// and writers have to set the corresponding entries in min_values and max_values to byte[0]
if null_page && self.column_index_builder.valid() {
@@ -566,7 +592,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
null_page,
&[0; 1],
&[0; 1],
- self.num_page_nulls as i64,
+ self.page_metrics.num_page_nulls as i64,
);
} else if self.column_index_builder.valid() {
// from page statistics
@@ -580,7 +606,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
null_page,
stat.min_bytes(),
stat.max_bytes(),
- self.num_page_nulls as i64,
+ self.page_metrics.num_page_nulls as i64,
);
}
}
@@ -588,7 +614,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// update the offset index
self.offset_index_builder
- .append_row_count(self.num_buffered_rows as i64);
+ .append_row_count(self.page_metrics.num_buffered_rows as i64);
}
/// Adds data page.
@@ -600,17 +626,17 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
let max_def_level = self.descr.max_def_level();
let max_rep_level = self.descr.max_rep_level();
- self.num_column_nulls += self.num_page_nulls;
+ self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
let page_statistics = match (values_data.min_value, values_data.max_value) {
(Some(min), Some(max)) => {
- update_min(&self.descr, &min, &mut self.min_column_value);
- update_max(&self.descr, &max, &mut self.max_column_value);
+ update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
+ update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
Some(Statistics::new(
Some(min),
Some(max),
None,
- self.num_page_nulls,
+ self.page_metrics.num_page_nulls,
false,
))
}
@@ -655,7 +681,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
let data_page = Page::DataPage {
buf: ByteBufferPtr::new(buffer),
- num_values: self.num_buffered_values,
+ num_values: self.page_metrics.num_buffered_values,
encoding: values_data.encoding,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
@@ -696,10 +722,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
let data_page = Page::DataPageV2 {
buf: ByteBufferPtr::new(buffer),
- num_values: self.num_buffered_values,
+ num_values: self.page_metrics.num_buffered_values,
encoding: values_data.encoding,
- num_nulls: self.num_page_nulls as u32,
- num_rows: self.num_buffered_rows,
+ num_nulls: self.page_metrics.num_page_nulls as u32,
+ num_rows: self.page_metrics.num_buffered_rows,
def_levels_byte_len: def_levels_byte_len as u32,
rep_levels_byte_len: rep_levels_byte_len as u32,
is_compressed: self.compressor.is_some(),
@@ -718,14 +744,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
// Update total number of rows.
- self.total_rows_written += self.num_buffered_rows as u64;
+ self.column_metrics.total_rows_written +=
+ self.page_metrics.num_buffered_rows as u64;
// Reset state.
self.rep_levels_sink.clear();
self.def_levels_sink.clear();
- self.num_buffered_values = 0;
- self.num_buffered_rows = 0;
- self.num_page_nulls = 0;
+ self.page_metrics = PageMetrics::default();
Ok(())
}
@@ -735,7 +760,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
#[inline]
fn flush_data_pages(&mut self) -> Result<()> {
// Write all outstanding data to a new page.
- if self.num_buffered_values > 0 {
+ if self.page_metrics.num_buffered_values > 0 {
self.add_data_page()?;
}
@@ -748,12 +773,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
/// Assembles and writes column chunk metadata.
fn write_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
- let total_compressed_size = self.total_compressed_size as i64;
- let total_uncompressed_size = self.total_uncompressed_size as i64;
- let num_values = self.total_num_values as i64;
- let dict_page_offset = self.dictionary_page_offset.map(|v| v as i64);
+ let total_compressed_size = self.column_metrics.total_compressed_size as i64;
+ let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
+ let num_values = self.column_metrics.total_num_values as i64;
+ let dict_page_offset =
+ self.column_metrics.dictionary_page_offset.map(|v| v as i64);
// If data page offset is not set, then no pages have been written
- let data_page_offset = self.data_page_offset.unwrap_or(0) as i64;
+ let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
let file_offset = match dict_page_offset {
Some(dict_offset) => dict_offset + total_compressed_size,
@@ -772,10 +798,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
if self.statistics_enabled != EnabledStatistics::None {
let statistics = Statistics::new(
- self.min_column_value.clone(),
- self.max_column_value.clone(),
- self.column_distinct_count,
- self.num_column_nulls,
+ self.column_metrics.min_column_value.clone(),
+ self.column_metrics.max_column_value.clone(),
+ self.column_metrics.column_distinct_count,
+ self.column_metrics.num_column_nulls,
false,
);
builder = builder.set_statistics(statistics);
@@ -860,23 +886,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
/// Updates column writer metrics with each page metadata.
#[inline]
fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
- self.total_uncompressed_size += page_spec.uncompressed_size as u64;
- self.total_compressed_size += page_spec.compressed_size as u64;
- self.total_num_values += page_spec.num_values as u64;
- self.total_bytes_written += page_spec.bytes_written;
+ self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
+ self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
+ self.column_metrics.total_num_values += page_spec.num_values as u64;
+ self.column_metrics.total_bytes_written += page_spec.bytes_written;
match page_spec.page_type {
PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
- if self.data_page_offset.is_none() {
- self.data_page_offset = Some(page_spec.offset);
+ if self.column_metrics.data_page_offset.is_none() {
+ self.column_metrics.data_page_offset = Some(page_spec.offset);
}
}
PageType::DICTIONARY_PAGE => {
assert!(
- self.dictionary_page_offset.is_none(),
+ self.column_metrics.dictionary_page_offset.is_none(),
"Dictionary offset is already set"
);
- self.dictionary_page_offset = Some(page_spec.offset);
+ self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
}
_ => {}
}