You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/08 14:22:54 UTC

[arrow-rs] branch master updated: refactor: Group metrics into page and column metrics structs (#2363)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c296ab61 refactor: Group metrics into page and column metrics structs (#2363)
0c296ab61 is described below

commit 0c296ab619cbdd2479bb8ab30073e1de71f27b57
Author: Markus Westerlind <mw...@influxdata.com>
AuthorDate: Mon Aug 8 16:22:49 2022 +0200

    refactor: Group metrics into page and column metrics structs (#2363)
    
    * refactor: Extract page metrics to a struct
    
    * refactor: Extract column metrics to a struct
    
    * refactor: Reset PageMetrics as a whole
---
 parquet/src/column/writer/mod.rs | 194 ++++++++++++++++++++++-----------------
 1 file changed, 110 insertions(+), 84 deletions(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index ce773c19d..20a3babd8 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -153,6 +153,29 @@ type ColumnCloseResult = (
     Option<OffsetIndex>,
 );
 
+// Metrics per page
+#[derive(Default)]
+struct PageMetrics {
+    num_buffered_values: u32,
+    num_buffered_rows: u32,
+    num_page_nulls: u64,
+}
+
+// Metrics per column writer
+struct ColumnMetrics<T> {
+    total_bytes_written: u64,
+    total_rows_written: u64,
+    total_uncompressed_size: u64,
+    total_compressed_size: u64,
+    total_num_values: u64,
+    dictionary_page_offset: Option<u64>,
+    data_page_offset: Option<u64>,
+    min_column_value: Option<T>,
+    max_column_value: Option<T>,
+    num_column_nulls: u64,
+    column_distinct_count: Option<u64>,
+}
+
 /// Typed column writer for a primitive column.
 pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
 
@@ -167,31 +190,13 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
     compressor: Option<Box<dyn Codec>>,
     encoder: E,
 
-    // Metrics per page
-    /// The number of values including nulls in the in-progress data page
-    num_buffered_values: u32,
-    /// The number of rows in the in-progress data page
-    num_buffered_rows: u32,
-    /// The number of nulls in the in-progress data page
-    num_page_nulls: u64,
-
+    page_metrics: PageMetrics,
     // Metrics per column writer
-    total_bytes_written: u64,
-    total_rows_written: u64,
-    total_uncompressed_size: u64,
-    total_compressed_size: u64,
-    total_num_values: u64,
-    dictionary_page_offset: Option<u64>,
-    data_page_offset: Option<u64>,
-    min_column_value: Option<E::T>,
-    max_column_value: Option<E::T>,
-    num_column_nulls: u64,
-    column_distinct_count: Option<u64>,
+    column_metrics: ColumnMetrics<E::T>,
 
     /// The order of encodings within the generated metadata does not impact its meaning,
     /// but we use a BTreeSet so that the output is deterministic
     encodings: BTreeSet<Encoding>,
-
     // Reused buffers
     def_levels_sink: Vec<i16>,
     rep_levels_sink: Vec<i16>,
@@ -226,23 +231,27 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
             codec,
             compressor,
             encoder,
-            num_buffered_values: 0,
-            num_buffered_rows: 0,
-            num_page_nulls: 0,
-            total_bytes_written: 0,
-            total_rows_written: 0,
-            total_uncompressed_size: 0,
-            total_compressed_size: 0,
-            total_num_values: 0,
-            dictionary_page_offset: None,
-            data_page_offset: None,
             def_levels_sink: vec![],
             rep_levels_sink: vec![],
             data_pages: VecDeque::new(),
-            min_column_value: None,
-            max_column_value: None,
-            num_column_nulls: 0,
-            column_distinct_count: None,
+            page_metrics: PageMetrics {
+                num_buffered_values: 0,
+                num_buffered_rows: 0,
+                num_page_nulls: 0,
+            },
+            column_metrics: ColumnMetrics {
+                total_bytes_written: 0,
+                total_rows_written: 0,
+                total_uncompressed_size: 0,
+                total_compressed_size: 0,
+                total_num_values: 0,
+                dictionary_page_offset: None,
+                data_page_offset: None,
+                min_column_value: None,
+                max_column_value: None,
+                num_column_nulls: 0,
+                column_distinct_count: None,
+            },
             column_index_builder: ColumnIndexBuilder::new(),
             offset_index_builder: OffsetIndexBuilder::new(),
             encodings,
@@ -284,8 +293,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         if self.statistics_enabled == EnabledStatistics::Chunk {
             match (min, max) {
                 (Some(min), Some(max)) => {
-                    update_min(&self.descr, min, &mut self.min_column_value);
-                    update_max(&self.descr, max, &mut self.max_column_value);
+                    update_min(
+                        &self.descr,
+                        min,
+                        &mut self.column_metrics.min_column_value,
+                    );
+                    update_max(
+                        &self.descr,
+                        max,
+                        &mut self.column_metrics.max_column_value,
+                    );
                 }
                 (None, Some(_)) | (Some(_), None) => {
                     panic!("min/max should be both set or both None")
@@ -293,8 +310,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 (None, None) => {
                     if let Some((min, max)) = self.encoder.min_max(values, value_indices)
                     {
-                        update_min(&self.descr, &min, &mut self.min_column_value);
-                        update_max(&self.descr, &max, &mut self.max_column_value);
+                        update_min(
+                            &self.descr,
+                            &min,
+                            &mut self.column_metrics.min_column_value,
+                        );
+                        update_max(
+                            &self.descr,
+                            &max,
+                            &mut self.column_metrics.max_column_value,
+                        );
                     }
                 }
             };
@@ -302,9 +327,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
 
         // We can only set the distinct count if there are no other writes
         if self.encoder.num_values() == 0 {
-            self.column_distinct_count = distinct_count;
+            self.column_metrics.column_distinct_count = distinct_count;
         } else {
-            self.column_distinct_count = None;
+            self.column_metrics.column_distinct_count = None;
         }
 
         let mut values_offset = 0;
@@ -385,19 +410,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
     /// Returns total number of bytes written by this column writer so far.
     /// This value is also returned when column writer is closed.
     pub fn get_total_bytes_written(&self) -> u64 {
-        self.total_bytes_written
+        self.column_metrics.total_bytes_written
     }
 
     /// Returns total number of rows written by this column writer so far.
     /// This value is also returned when column writer is closed.
     pub fn get_total_rows_written(&self) -> u64 {
-        self.total_rows_written
+        self.column_metrics.total_rows_written
     }
 
     /// Finalises writes and closes the column writer.
     /// Returns total bytes written, total rows written and column chunk metadata.
     pub fn close(mut self) -> Result<ColumnCloseResult> {
-        if self.num_buffered_values > 0 {
+        if self.page_metrics.num_buffered_values > 0 {
             self.add_data_page()?;
         }
         if self.encoder.has_dictionary() {
@@ -417,8 +442,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         };
 
         Ok((
-            self.total_bytes_written,
-            self.total_rows_written,
+            self.column_metrics.total_bytes_written,
+            self.column_metrics.total_rows_written,
             metadata,
             column_index,
             offset_index,
@@ -464,7 +489,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                     values_to_write += 1;
                 } else {
                     // We must always compute this as it is used to populate v2 pages
-                    self.num_page_nulls += 1
+                    self.page_metrics.num_page_nulls += 1
                 }
             }
 
@@ -486,14 +511,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
 
             // Count the occasions where we start a new row
             for &level in levels {
-                self.num_buffered_rows += (level == 0) as u32
+                self.page_metrics.num_buffered_rows += (level == 0) as u32
             }
 
             self.rep_levels_sink.extend_from_slice(levels);
         } else {
             // Each value is exactly one row.
             // Equals to the number of values, we count nulls as well.
-            self.num_buffered_rows += num_levels as u32;
+            self.page_metrics.num_buffered_rows += num_levels as u32;
         }
 
         match value_indices {
@@ -504,7 +529,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
             None => self.encoder.write(values, values_offset, values_to_write)?,
         }
 
-        self.num_buffered_values += num_levels as u32;
+        self.page_metrics.num_buffered_values += num_levels as u32;
 
         if self.should_add_data_page() {
             self.add_data_page()?;
@@ -547,7 +572,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
     /// Prepares and writes dictionary and all data pages into page writer.
     fn dict_fallback(&mut self) -> Result<()> {
         // At this point we know that we need to fall back.
-        if self.num_buffered_values > 0 {
+        if self.page_metrics.num_buffered_values > 0 {
             self.add_data_page()?;
         }
         self.write_dictionary_page()?;
@@ -558,7 +583,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
     /// Update the column index and offset index when adding the data page
     fn update_column_offset_index(&mut self, page_statistics: &Option<Statistics>) {
         // update the column index
-        let null_page = (self.num_buffered_rows as u64) == self.num_page_nulls;
+        let null_page = (self.page_metrics.num_buffered_rows as u64)
+            == self.page_metrics.num_page_nulls;
         // a page contains only null values,
         // and writers have to set the corresponding entries in min_values and max_values to byte[0]
         if null_page && self.column_index_builder.valid() {
@@ -566,7 +592,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 null_page,
                 &[0; 1],
                 &[0; 1],
-                self.num_page_nulls as i64,
+                self.page_metrics.num_page_nulls as i64,
             );
         } else if self.column_index_builder.valid() {
             // from page statistics
@@ -580,7 +606,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                         null_page,
                         stat.min_bytes(),
                         stat.max_bytes(),
-                        self.num_page_nulls as i64,
+                        self.page_metrics.num_page_nulls as i64,
                     );
                 }
             }
@@ -588,7 +614,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
 
         // update the offset index
         self.offset_index_builder
-            .append_row_count(self.num_buffered_rows as i64);
+            .append_row_count(self.page_metrics.num_buffered_rows as i64);
     }
 
     /// Adds data page.
@@ -600,17 +626,17 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         let max_def_level = self.descr.max_def_level();
         let max_rep_level = self.descr.max_rep_level();
 
-        self.num_column_nulls += self.num_page_nulls;
+        self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
 
         let page_statistics = match (values_data.min_value, values_data.max_value) {
             (Some(min), Some(max)) => {
-                update_min(&self.descr, &min, &mut self.min_column_value);
-                update_max(&self.descr, &max, &mut self.max_column_value);
+                update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
+                update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
                 Some(Statistics::new(
                     Some(min),
                     Some(max),
                     None,
-                    self.num_page_nulls,
+                    self.page_metrics.num_page_nulls,
                     false,
                 ))
             }
@@ -655,7 +681,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
 
                 let data_page = Page::DataPage {
                     buf: ByteBufferPtr::new(buffer),
-                    num_values: self.num_buffered_values,
+                    num_values: self.page_metrics.num_buffered_values,
                     encoding: values_data.encoding,
                     def_level_encoding: Encoding::RLE,
                     rep_level_encoding: Encoding::RLE,
@@ -696,10 +722,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
 
                 let data_page = Page::DataPageV2 {
                     buf: ByteBufferPtr::new(buffer),
-                    num_values: self.num_buffered_values,
+                    num_values: self.page_metrics.num_buffered_values,
                     encoding: values_data.encoding,
-                    num_nulls: self.num_page_nulls as u32,
-                    num_rows: self.num_buffered_rows,
+                    num_nulls: self.page_metrics.num_page_nulls as u32,
+                    num_rows: self.page_metrics.num_buffered_rows,
                     def_levels_byte_len: def_levels_byte_len as u32,
                     rep_levels_byte_len: rep_levels_byte_len as u32,
                     is_compressed: self.compressor.is_some(),
@@ -718,14 +744,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         }
 
         // Update total number of rows.
-        self.total_rows_written += self.num_buffered_rows as u64;
+        self.column_metrics.total_rows_written +=
+            self.page_metrics.num_buffered_rows as u64;
 
         // Reset state.
         self.rep_levels_sink.clear();
         self.def_levels_sink.clear();
-        self.num_buffered_values = 0;
-        self.num_buffered_rows = 0;
-        self.num_page_nulls = 0;
+        self.page_metrics = PageMetrics::default();
 
         Ok(())
     }
@@ -735,7 +760,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
     #[inline]
     fn flush_data_pages(&mut self) -> Result<()> {
         // Write all outstanding data to a new page.
-        if self.num_buffered_values > 0 {
+        if self.page_metrics.num_buffered_values > 0 {
             self.add_data_page()?;
         }
 
@@ -748,12 +773,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
 
     /// Assembles and writes column chunk metadata.
     fn write_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
-        let total_compressed_size = self.total_compressed_size as i64;
-        let total_uncompressed_size = self.total_uncompressed_size as i64;
-        let num_values = self.total_num_values as i64;
-        let dict_page_offset = self.dictionary_page_offset.map(|v| v as i64);
+        let total_compressed_size = self.column_metrics.total_compressed_size as i64;
+        let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
+        let num_values = self.column_metrics.total_num_values as i64;
+        let dict_page_offset =
+            self.column_metrics.dictionary_page_offset.map(|v| v as i64);
         // If data page offset is not set, then no pages have been written
-        let data_page_offset = self.data_page_offset.unwrap_or(0) as i64;
+        let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
 
         let file_offset = match dict_page_offset {
             Some(dict_offset) => dict_offset + total_compressed_size,
@@ -772,10 +798,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
 
         if self.statistics_enabled != EnabledStatistics::None {
             let statistics = Statistics::new(
-                self.min_column_value.clone(),
-                self.max_column_value.clone(),
-                self.column_distinct_count,
-                self.num_column_nulls,
+                self.column_metrics.min_column_value.clone(),
+                self.column_metrics.max_column_value.clone(),
+                self.column_metrics.column_distinct_count,
+                self.column_metrics.num_column_nulls,
                 false,
             );
             builder = builder.set_statistics(statistics);
@@ -860,23 +886,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
     /// Updates column writer metrics with each page metadata.
     #[inline]
     fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
-        self.total_uncompressed_size += page_spec.uncompressed_size as u64;
-        self.total_compressed_size += page_spec.compressed_size as u64;
-        self.total_num_values += page_spec.num_values as u64;
-        self.total_bytes_written += page_spec.bytes_written;
+        self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
+        self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
+        self.column_metrics.total_num_values += page_spec.num_values as u64;
+        self.column_metrics.total_bytes_written += page_spec.bytes_written;
 
         match page_spec.page_type {
             PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
-                if self.data_page_offset.is_none() {
-                    self.data_page_offset = Some(page_spec.offset);
+                if self.column_metrics.data_page_offset.is_none() {
+                    self.column_metrics.data_page_offset = Some(page_spec.offset);
                 }
             }
             PageType::DICTIONARY_PAGE => {
                 assert!(
-                    self.dictionary_page_offset.is_none(),
+                    self.column_metrics.dictionary_page_offset.is_none(),
                     "Dictionary offset is already set"
                 );
-                self.dictionary_page_offset = Some(page_spec.offset);
+                self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
             }
             _ => {}
         }