You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/28 10:41:53 UTC

[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1935: add column index writer for parquet

tustvold commented on code in PR #1935:
URL: https://github.com/apache/arrow-rs/pull/1935#discussion_r908320788


##########
parquet/src/file/metadata.rs:
##########
@@ -386,6 +394,9 @@ pub struct ColumnChunkMetaData {
     offset_index_length: Option<i32>,
     column_index_offset: Option<i64>,
     column_index_length: Option<i32>,
+    // column index and offset index
+    column_index: Option<ColumnIndex>,
+    offset_index: Option<OffsetIndex>,

Review Comment:
   I'm not sure about this, as it conflates the file-level metadata with the index metadata which is stored separately. It also leaks a type from parquet-format out into the public interface, which typically this crate avoids doing.
   
   I'm not entirely sure why this needs to be here?



##########
parquet/src/file/writer.rs:
##########
@@ -177,10 +182,66 @@ impl<W: Write> SerializedFileWriter<W> {
         Ok(())
     }
 
+    /// Serialize all the offset index to the file
+    fn write_offset_indexes(&mut self) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write offset index to the file
+        for row_group in &mut self.row_groups {
+            for column_metdata in row_group.columns_mut() {
+                match column_metdata.offset_index() {
+                    Some(offset_index) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
+                        offset_index.write_to_out_protocol(&mut protocol)?;
+                        protocol.flush()?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for offset index
+                        column_metdata.set_offset_index_offset(start_offset as i64);
+                        column_metdata
+                            .set_offset_index_length((end_offset - start_offset) as i32);

Review Comment:
   As part of `SerializedFileWriter::write_metadata` we call `RowGroupMetaData::to_thrift`, perhaps we could store these offsets and apply them directly to the returned thrift message, instead of needing to clone RowGroupMetaData??



##########
parquet/src/column/writer.rs:
##########
@@ -2068,6 +2129,80 @@ mod tests {
         ),);
     }
 
+    #[test]
+    fn test_column_offset_index_metadata() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
+        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+        // first page
+        writer.flush_data_pages().unwrap();
+        // second page
+        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
+
+        let (_, rows_written, metadata) = writer.close().unwrap();
+        let column_index = match metadata.column_index() {
+            None => {
+                panic!("Can't fine the column index");
+            }
+            Some(column_index) => column_index,
+        };
+        let offset_index = match metadata.offset_index() {
+            None => {
+                panic!("Can't find the offset index");
+            }
+            Some(offset_index) => offset_index,
+        };
+
+        assert_eq!(8, rows_written);
+
+        // column index
+        assert_eq!(2, column_index.null_pages.len());
+        assert_eq!(2, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
+        for idx in 0..2 {
+            assert_eq!(&false, column_index.null_pages.get(idx).unwrap());
+            assert_eq!(
+                &0,
+                column_index.null_counts.as_ref().unwrap().get(idx).unwrap()
+            );
+        }
+
+        if let Some(stats) = metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::Int32(stats) = stats {
+                // first page is [1,2,3,4]
+                // second page is [-5,2,4,8]
+                assert_eq!(
+                    stats.min_bytes(),
+                    column_index.min_values.get(1).unwrap().as_slice()

Review Comment:
   ```suggestion
                       column_index.min_values[1].as_slice()
   ```



##########
parquet/src/column/writer.rs:
##########
@@ -2068,6 +2129,80 @@ mod tests {
         ),);
     }
 
+    #[test]
+    fn test_column_offset_index_metadata() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
+        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+        // first page
+        writer.flush_data_pages().unwrap();
+        // second page
+        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
+
+        let (_, rows_written, metadata) = writer.close().unwrap();
+        let column_index = match metadata.column_index() {
+            None => {
+                panic!("Can't fine the column index");
+            }
+            Some(column_index) => column_index,
+        };
+        let offset_index = match metadata.offset_index() {
+            None => {
+                panic!("Can't find the offset index");
+            }
+            Some(offset_index) => offset_index,
+        };
+
+        assert_eq!(8, rows_written);
+
+        // column index
+        assert_eq!(2, column_index.null_pages.len());
+        assert_eq!(2, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
+        for idx in 0..2 {
+            assert_eq!(&false, column_index.null_pages.get(idx).unwrap());

Review Comment:
   ```suggestion
               assert!!(!column_index.null_pages[idx]);
   ```



##########
parquet/src/file/writer.rs:
##########
@@ -339,11 +400,11 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
                 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
                 .build()?;
 
-            let metadata = Arc::new(row_group_metadata);
-            self.row_group_metadata = Some(metadata.clone());
+            let clone_row_group_metadata = row_group_metadata.clone();
+            self.row_group_metadata = Some(Arc::new(row_group_metadata));

Review Comment:
   I think I would prefer we clone at the location that the mutation happens, i.e. ParquetWriter, as opposed to modifying this interface



##########
parquet/src/column/writer.rs:
##########
@@ -2068,6 +2129,80 @@ mod tests {
         ),);
     }
 
+    #[test]
+    fn test_column_offset_index_metadata() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
+        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+        // first page
+        writer.flush_data_pages().unwrap();
+        // second page
+        writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
+
+        let (_, rows_written, metadata) = writer.close().unwrap();
+        let column_index = match metadata.column_index() {
+            None => {
+                panic!("Can't fine the column index");
+            }
+            Some(column_index) => column_index,
+        };
+        let offset_index = match metadata.offset_index() {
+            None => {
+                panic!("Can't find the offset index");
+            }
+            Some(offset_index) => offset_index,
+        };
+
+        assert_eq!(8, rows_written);
+
+        // column index
+        assert_eq!(2, column_index.null_pages.len());
+        assert_eq!(2, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::Unordered, column_index.boundary_order);
+        for idx in 0..2 {
+            assert_eq!(&false, column_index.null_pages.get(idx).unwrap());
+            assert_eq!(
+                &0,
+                column_index.null_counts.as_ref().unwrap().get(idx).unwrap()
+            );

Review Comment:
   ```suggestion
               assert_eq!(
                   0,
                   *column_index.null_counts.as_ref().unwrap()[idx]
               );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org