You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/18 18:15:11 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #2890: Respect Page Size Limits in ArrowWriter (#2853)

alamb commented on code in PR #2890:
URL: https://github.com/apache/arrow-rs/pull/2890#discussion_r998555922


##########
parquet/src/column/writer/mod.rs:
##########
@@ -1825,7 +1825,7 @@ mod tests {
         let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
         let props = Arc::new(
             WriterProperties::builder()
-                .set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes
+                .set_data_pagesize_limit(10)

Review Comment:
   I don't understand this change



##########
parquet/src/encodings/encoding/mod.rs:
##########
@@ -888,7 +888,7 @@ mod tests {
         // DICTIONARY
         // NOTE: The final size is almost the same because the dictionary entries are
         // preserved after encoded values have been written.
-        run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 11, 68, 66);
+        run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 0, 2, 0);

Review Comment:
   can you explain these changes?



##########
parquet/src/encodings/rle.rs:
##########
@@ -101,29 +100,33 @@ impl RleEncoder {
 
     /// Returns the minimum buffer size needed to use the encoder for `bit_width`.
     /// This is the maximum length of a single run for `bit_width`.
+    #[allow(unused)]
     pub fn min_buffer_size(bit_width: u8) -> usize {
-        let max_bit_packed_run_size = 1 + bit_util::ceil(
-            (MAX_VALUES_PER_BIT_PACKED_RUN * bit_width as usize) as i64,
-            8,
-        );
-        let max_rle_run_size =
-            bit_util::MAX_VLQ_BYTE_LEN + bit_util::ceil(bit_width as i64, 8) as usize;
-        std::cmp::max(max_bit_packed_run_size as usize, max_rle_run_size)
+        let b = bit_width as usize;
+        let max_bit_packed_run_size = 1 + MAX_GROUPS_PER_BIT_PACKED_RUN * b;
+        let max_rle_run_size = 1 + bit_util::ceil(b, 8);

Review Comment:
   `max_rle_run_size` seems to have lost `bit_util::MAX_VLQ_BYTE_LEN` --  is that intentional?



##########
parquet/src/encodings/levels.rs:
##########
@@ -38,13 +38,8 @@ pub fn max_buffer_size(
 ) -> usize {
     let bit_width = num_required_bits(max_level as u64);
     match encoding {
-        Encoding::RLE => {
-            RleEncoder::max_buffer_size(bit_width, num_buffered_values)
-                + RleEncoder::min_buffer_size(bit_width)
-        }
-        Encoding::BIT_PACKED => {
-            ceil((num_buffered_values * bit_width as usize) as i64, 8) as usize
-        }
+        Encoding::RLE => RleEncoder::max_buffer_size(bit_width, num_buffered_values),

Review Comment:
   why is this different than the other estimated sizes?



##########
parquet/src/encodings/rle.rs:
##########
@@ -101,29 +100,33 @@ impl RleEncoder {
 
     /// Returns the minimum buffer size needed to use the encoder for `bit_width`.
     /// This is the maximum length of a single run for `bit_width`.
+    #[allow(unused)]

Review Comment:
   why is this unused?



##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -487,7 +484,10 @@ impl ColumnValueEncoder for ByteArrayEncoder {
     }
 
     fn num_values(&self) -> usize {
-        self.num_values
+        match &self.dict_encoder {

Review Comment:
   maybe it is worth a comment



##########
parquet/src/encodings/rle.rs:
##########
@@ -101,29 +100,33 @@ impl RleEncoder {
 
     /// Returns the minimum buffer size needed to use the encoder for `bit_width`.
     /// This is the maximum length of a single run for `bit_width`.
+    #[allow(unused)]
     pub fn min_buffer_size(bit_width: u8) -> usize {
-        let max_bit_packed_run_size = 1 + bit_util::ceil(
-            (MAX_VALUES_PER_BIT_PACKED_RUN * bit_width as usize) as i64,
-            8,
-        );
-        let max_rle_run_size =
-            bit_util::MAX_VLQ_BYTE_LEN + bit_util::ceil(bit_width as i64, 8) as usize;
-        std::cmp::max(max_bit_packed_run_size as usize, max_rle_run_size)
+        let b = bit_width as usize;
+        let max_bit_packed_run_size = 1 + MAX_GROUPS_PER_BIT_PACKED_RUN * b;
+        let max_rle_run_size = 1 + bit_util::ceil(b, 8);

Review Comment:
   I think it would also help either to name  the magic `1` constants or add comments explaining where they came from



##########
parquet/tests/arrow_writer_layout.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests that the ArrowWriter correctly lays out values into multiple pages
+
+use arrow::array::{Int32Array, StringArray};
+use arrow::record_batch::RecordBatch;
+use bytes::Bytes;
+use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
+use parquet::arrow::ArrowWriter;
+use parquet::basic::{Encoding, PageType};
+use parquet::file::metadata::ParquetMetaData;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::SerializedPageReader;
+use std::sync::Arc;
+
+struct Layout {
+    row_groups: Vec<RowGroup>,
+}
+
+struct RowGroup {
+    columns: Vec<ColumnChunk>,
+}
+
+struct ColumnChunk {
+    pages: Vec<Page>,
+    dictionary_page: Option<Page>,
+}
+
+struct Page {
+    rows: usize,
+    compressed_size: usize,
+    page_header_size: usize,
+    encoding: Encoding,
+    page_type: PageType,
+}
+
+struct LayoutTest {
+    props: WriterProperties,
+    batches: Vec<RecordBatch>,
+    layout: Layout,
+}
+
+fn do_test(test: LayoutTest) {
+    let mut buf = Vec::with_capacity(1024);
+
+    let mut writer =
+        ArrowWriter::try_new(&mut buf, test.batches[0].schema(), Some(test.props))
+            .unwrap();
+    for batch in test.batches {
+        writer.write(&batch).unwrap();
+    }
+    writer.close().unwrap();
+    let b = Bytes::from(buf);
+
+    // Re-read file to decode column index
+    let read_options = ArrowReaderOptions::new().with_page_index(true);
+    let reader =
+        ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(), read_options)
+            .unwrap();
+
+    assert_layout(&b, reader.metadata().as_ref(), &test.layout);
+}
+
+fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) {
+    assert_eq!(meta.row_groups().len(), layout.row_groups.len());
+    for (row_group, row_group_layout) in meta.row_groups().iter().zip(&layout.row_groups)
+    {
+        // Check against offset index
+        let offset_index = row_group.page_offset_index().as_ref().unwrap();
+        assert_eq!(offset_index.len(), row_group_layout.columns.len());
+
+        for (column_index, column_layout) in
+            offset_index.iter().zip(&row_group_layout.columns)
+        {
+            assert_eq!(
+                column_index.len(),
+                column_layout.pages.len(),
+                "index page count mismatch"
+            );
+            for (idx, (page, page_layout)) in
+                column_index.iter().zip(&column_layout.pages).enumerate()
+            {
+                assert_eq!(
+                    page.compressed_page_size as usize,
+                    page_layout.compressed_size + page_layout.page_header_size,
+                    "index page {} size mismatch",
+                    idx
+                );
+                let next_first_row_index = column_index
+                    .get(idx + 1)
+                    .map(|x| x.first_row_index)
+                    .unwrap_or_else(|| row_group.num_rows());
+
+                let num_rows = next_first_row_index - page.first_row_index;
+                assert_eq!(
+                    num_rows as usize, page_layout.rows,
+                    "index page {} row count",
+                    idx
+                );
+            }
+        }
+
+        // Check against page data
+        assert_eq!(
+            row_group.columns().len(),
+            row_group_layout.columns.len(),
+            "column count mismatch"
+        );
+
+        let iter = row_group
+            .columns()
+            .iter()
+            .zip(&row_group_layout.columns)
+            .enumerate();
+
+        for (idx, (column, column_layout)) in iter {
+            let page_reader = SerializedPageReader::new(
+                Arc::new(file_reader.clone()),
+                column,
+                row_group.num_rows() as usize,
+                None,
+            )
+            .unwrap();
+
+            let pages = page_reader.collect::<Result<Vec<_>, _>>().unwrap();
+            assert_eq!(
+                pages.len(),
+                column_layout.pages.len()
+                    + column_layout.dictionary_page.is_some() as usize,
+                "page {} count mismatch",
+                idx
+            );
+
+            let page_layouts = column_layout
+                .dictionary_page
+                .iter()
+                .chain(&column_layout.pages);
+
+            for (page, page_layout) in pages.iter().zip(page_layouts) {
+                assert_eq!(page.encoding(), page_layout.encoding);
+                assert_eq!(
+                    page.buffer().len(),
+                    page_layout.compressed_size,
+                    "page {} size mismatch",
+                    idx
+                );
+                assert_eq!(page.page_type(), page_layout.page_type);
+            }
+        }
+    }
+}
+
+#[test]
+fn test_primitive() {
+    let array = Arc::new(Int32Array::from_iter_values(0..2000)) as _;
+    let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(false)
+        .set_data_pagesize_limit(1000)
+        .set_write_batch_size(10)
+        .build();
+
+    // Test spill plain encoding pages
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch.clone()],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: (0..8)
+                        .map(|_| Page {
+                            rows: 250,
+                            page_header_size: 34,
+                            compressed_size: 1000,
+                            encoding: Encoding::PLAIN,
+                            page_type: PageType::DATA_PAGE,
+                        })
+                        .collect(),
+                    dictionary_page: None,
+                }],
+            }],
+        },
+    });
+
+    // Test spill dictionary
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(true)
+        .set_dictionary_pagesize_limit(1000)
+        .set_data_pagesize_limit(10000)
+        .set_write_batch_size(10)
+        .build();
+
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch.clone()],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: vec![
+                        Page {
+                            rows: 250,
+                            page_header_size: 34,
+                            compressed_size: 258,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 1750,
+                            page_header_size: 34,
+                            compressed_size: 7000,
+                            encoding: Encoding::PLAIN,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                    ],
+                    dictionary_page: Some(Page {
+                        rows: 250,
+                        page_header_size: 34,
+                        compressed_size: 1000,
+                        encoding: Encoding::PLAIN,
+                        page_type: PageType::DICTIONARY_PAGE,
+                    }),
+                }],
+            }],
+        },
+    });
+
+    // Test spill dictionary encoded pages
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(true)
+        .set_dictionary_pagesize_limit(10000)
+        .set_data_pagesize_limit(500)
+        .set_write_batch_size(10)
+        .build();
+
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: vec![
+                        Page {
+                            rows: 400,
+                            page_header_size: 34,
+                            compressed_size: 452,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 370,
+                            page_header_size: 34,
+                            compressed_size: 472,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 330,
+                            page_header_size: 34,
+                            compressed_size: 464,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 330,
+                            page_header_size: 34,
+                            compressed_size: 464,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 330,
+                            page_header_size: 34,
+                            compressed_size: 464,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 240,
+                            page_header_size: 34,
+                            compressed_size: 332,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                    ],
+                    dictionary_page: Some(Page {
+                        rows: 2000,
+                        page_header_size: 34,
+                        compressed_size: 8000,
+                        encoding: Encoding::PLAIN,
+                        page_type: PageType::DICTIONARY_PAGE,
+                    }),
+                }],
+            }],
+        },
+    });
+}
+
+#[test]
+fn test_string() {
+    let array = Arc::new(StringArray::from_iter_values(

Review Comment:
   does this cover the code in `parquet/src/arrow/arrow_writer/byte_array.rs` ?



##########
parquet/tests/arrow_writer_layout.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests that the ArrowWriter correctly lays out values into multiple pages
+
+use arrow::array::{Int32Array, StringArray};
+use arrow::record_batch::RecordBatch;
+use bytes::Bytes;
+use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
+use parquet::arrow::ArrowWriter;
+use parquet::basic::{Encoding, PageType};
+use parquet::file::metadata::ParquetMetaData;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::SerializedPageReader;
+use std::sync::Arc;
+
+struct Layout {
+    row_groups: Vec<RowGroup>,
+}
+
+struct RowGroup {
+    columns: Vec<ColumnChunk>,
+}
+
+struct ColumnChunk {
+    pages: Vec<Page>,
+    dictionary_page: Option<Page>,
+}
+
+struct Page {
+    rows: usize,
+    compressed_size: usize,
+    page_header_size: usize,
+    encoding: Encoding,
+    page_type: PageType,
+}
+
+struct LayoutTest {
+    props: WriterProperties,
+    batches: Vec<RecordBatch>,
+    layout: Layout,
+}
+
+fn do_test(test: LayoutTest) {
+    let mut buf = Vec::with_capacity(1024);
+
+    let mut writer =
+        ArrowWriter::try_new(&mut buf, test.batches[0].schema(), Some(test.props))
+            .unwrap();
+    for batch in test.batches {
+        writer.write(&batch).unwrap();
+    }
+    writer.close().unwrap();
+    let b = Bytes::from(buf);
+
+    // Re-read file to decode column index
+    let read_options = ArrowReaderOptions::new().with_page_index(true);
+    let reader =
+        ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(), read_options)
+            .unwrap();
+
+    assert_layout(&b, reader.metadata().as_ref(), &test.layout);
+}
+
+fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) {
+    assert_eq!(meta.row_groups().len(), layout.row_groups.len());
+    for (row_group, row_group_layout) in meta.row_groups().iter().zip(&layout.row_groups)
+    {
+        // Check against offset index
+        let offset_index = row_group.page_offset_index().as_ref().unwrap();
+        assert_eq!(offset_index.len(), row_group_layout.columns.len());
+
+        for (column_index, column_layout) in
+            offset_index.iter().zip(&row_group_layout.columns)
+        {
+            assert_eq!(
+                column_index.len(),
+                column_layout.pages.len(),
+                "index page count mismatch"
+            );
+            for (idx, (page, page_layout)) in
+                column_index.iter().zip(&column_layout.pages).enumerate()
+            {
+                assert_eq!(
+                    page.compressed_page_size as usize,
+                    page_layout.compressed_size + page_layout.page_header_size,
+                    "index page {} size mismatch",
+                    idx
+                );
+                let next_first_row_index = column_index
+                    .get(idx + 1)
+                    .map(|x| x.first_row_index)
+                    .unwrap_or_else(|| row_group.num_rows());
+
+                let num_rows = next_first_row_index - page.first_row_index;
+                assert_eq!(
+                    num_rows as usize, page_layout.rows,
+                    "index page {} row count",
+                    idx
+                );
+            }
+        }
+
+        // Check against page data
+        assert_eq!(
+            row_group.columns().len(),
+            row_group_layout.columns.len(),
+            "column count mismatch"
+        );
+
+        let iter = row_group
+            .columns()
+            .iter()
+            .zip(&row_group_layout.columns)
+            .enumerate();
+
+        for (idx, (column, column_layout)) in iter {
+            let page_reader = SerializedPageReader::new(
+                Arc::new(file_reader.clone()),
+                column,
+                row_group.num_rows() as usize,
+                None,
+            )
+            .unwrap();
+
+            let pages = page_reader.collect::<Result<Vec<_>, _>>().unwrap();
+            assert_eq!(
+                pages.len(),
+                column_layout.pages.len()
+                    + column_layout.dictionary_page.is_some() as usize,
+                "page {} count mismatch",
+                idx
+            );
+
+            let page_layouts = column_layout
+                .dictionary_page
+                .iter()
+                .chain(&column_layout.pages);
+
+            for (page, page_layout) in pages.iter().zip(page_layouts) {
+                assert_eq!(page.encoding(), page_layout.encoding);
+                assert_eq!(
+                    page.buffer().len(),
+                    page_layout.compressed_size,
+                    "page {} size mismatch",
+                    idx
+                );
+                assert_eq!(page.page_type(), page_layout.page_type);
+            }
+        }
+    }
+}
+
+#[test]
+fn test_primitive() {
+    let array = Arc::new(Int32Array::from_iter_values(0..2000)) as _;
+    let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(false)
+        .set_data_pagesize_limit(1000)
+        .set_write_batch_size(10)
+        .build();
+
+    // Test spill plain encoding pages
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch.clone()],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: (0..8)
+                        .map(|_| Page {
+                            rows: 250,
+                            page_header_size: 34,
+                            compressed_size: 1000,
+                            encoding: Encoding::PLAIN,
+                            page_type: PageType::DATA_PAGE,
+                        })
+                        .collect(),
+                    dictionary_page: None,
+                }],
+            }],
+        },
+    });
+
+    // Test spill dictionary
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(true)
+        .set_dictionary_pagesize_limit(1000)
+        .set_data_pagesize_limit(10000)
+        .set_write_batch_size(10)
+        .build();
+
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch.clone()],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: vec![
+                        Page {
+                            rows: 250,
+                            page_header_size: 34,
+                            compressed_size: 258,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 1750,
+                            page_header_size: 34,
+                            compressed_size: 7000,
+                            encoding: Encoding::PLAIN,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                    ],
+                    dictionary_page: Some(Page {
+                        rows: 250,
+                        page_header_size: 34,
+                        compressed_size: 1000,
+                        encoding: Encoding::PLAIN,
+                        page_type: PageType::DICTIONARY_PAGE,
+                    }),
+                }],
+            }],
+        },
+    });
+
+    // Test spill dictionary encoded pages
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(true)
+        .set_dictionary_pagesize_limit(10000)
+        .set_data_pagesize_limit(500)
+        .set_write_batch_size(10)
+        .build();
+
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: vec![
+                        Page {
+                            rows: 400,
+                            page_header_size: 34,
+                            compressed_size: 452,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 370,
+                            page_header_size: 34,
+                            compressed_size: 472,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 330,
+                            page_header_size: 34,
+                            compressed_size: 464,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 330,
+                            page_header_size: 34,
+                            compressed_size: 464,

Review Comment:
   is it ok / expected that the compressed sizes are lower than the limit (500)?



##########
parquet/tests/arrow_writer_layout.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests that the ArrowWriter correctly lays out values into multiple pages
+
+use arrow::array::{Int32Array, StringArray};
+use arrow::record_batch::RecordBatch;
+use bytes::Bytes;
+use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
+use parquet::arrow::ArrowWriter;
+use parquet::basic::{Encoding, PageType};
+use parquet::file::metadata::ParquetMetaData;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::SerializedPageReader;
+use std::sync::Arc;
+
+struct Layout {
+    row_groups: Vec<RowGroup>,
+}
+
+struct RowGroup {
+    columns: Vec<ColumnChunk>,
+}
+
+struct ColumnChunk {
+    pages: Vec<Page>,
+    dictionary_page: Option<Page>,
+}
+
+struct Page {
+    rows: usize,
+    compressed_size: usize,
+    page_header_size: usize,
+    encoding: Encoding,
+    page_type: PageType,
+}
+
+struct LayoutTest {
+    props: WriterProperties,
+    batches: Vec<RecordBatch>,
+    layout: Layout,
+}
+
+fn do_test(test: LayoutTest) {
+    let mut buf = Vec::with_capacity(1024);
+
+    let mut writer =
+        ArrowWriter::try_new(&mut buf, test.batches[0].schema(), Some(test.props))
+            .unwrap();
+    for batch in test.batches {
+        writer.write(&batch).unwrap();
+    }
+    writer.close().unwrap();
+    let b = Bytes::from(buf);
+
+    // Re-read file to decode column index
+    let read_options = ArrowReaderOptions::new().with_page_index(true);
+    let reader =
+        ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(), read_options)
+            .unwrap();
+
+    assert_layout(&b, reader.metadata().as_ref(), &test.layout);
+}
+
+fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) {

Review Comment:
   😮  -- very nice 🏆 



##########
parquet/tests/arrow_writer_layout.rs:
##########
@@ -0,0 +1,472 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests that the ArrowWriter correctly lays out values into multiple pages
+
+use arrow::array::{Int32Array, StringArray};
+use arrow::record_batch::RecordBatch;
+use bytes::Bytes;
+use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
+use parquet::arrow::ArrowWriter;
+use parquet::basic::{Encoding, PageType};
+use parquet::file::metadata::ParquetMetaData;
+use parquet::file::properties::WriterProperties;
+use parquet::file::reader::SerializedPageReader;
+use std::sync::Arc;
+
+struct Layout {
+    row_groups: Vec<RowGroup>,
+}
+
+struct RowGroup {
+    columns: Vec<ColumnChunk>,
+}
+
+struct ColumnChunk {
+    pages: Vec<Page>,
+    dictionary_page: Option<Page>,
+}
+
+struct Page {
+    rows: usize,
+    compressed_size: usize,
+    page_header_size: usize,
+    encoding: Encoding,
+    page_type: PageType,
+}
+
+struct LayoutTest {
+    props: WriterProperties,
+    batches: Vec<RecordBatch>,
+    layout: Layout,
+}
+
+fn do_test(test: LayoutTest) {
+    let mut buf = Vec::with_capacity(1024);
+
+    let mut writer =
+        ArrowWriter::try_new(&mut buf, test.batches[0].schema(), Some(test.props))
+            .unwrap();
+    for batch in test.batches {
+        writer.write(&batch).unwrap();
+    }
+    writer.close().unwrap();
+    let b = Bytes::from(buf);
+
+    // Re-read file to decode column index
+    let read_options = ArrowReaderOptions::new().with_page_index(true);
+    let reader =
+        ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(), read_options)
+            .unwrap();
+
+    assert_layout(&b, reader.metadata().as_ref(), &test.layout);
+}
+
+fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) {
+    assert_eq!(meta.row_groups().len(), layout.row_groups.len());
+    for (row_group, row_group_layout) in meta.row_groups().iter().zip(&layout.row_groups)
+    {
+        // Check against offset index
+        let offset_index = row_group.page_offset_index().as_ref().unwrap();
+        assert_eq!(offset_index.len(), row_group_layout.columns.len());
+
+        for (column_index, column_layout) in
+            offset_index.iter().zip(&row_group_layout.columns)
+        {
+            assert_eq!(
+                column_index.len(),
+                column_layout.pages.len(),
+                "index page count mismatch"
+            );
+            for (idx, (page, page_layout)) in
+                column_index.iter().zip(&column_layout.pages).enumerate()
+            {
+                assert_eq!(
+                    page.compressed_page_size as usize,
+                    page_layout.compressed_size + page_layout.page_header_size,
+                    "index page {} size mismatch",
+                    idx
+                );
+                let next_first_row_index = column_index
+                    .get(idx + 1)
+                    .map(|x| x.first_row_index)
+                    .unwrap_or_else(|| row_group.num_rows());
+
+                let num_rows = next_first_row_index - page.first_row_index;
+                assert_eq!(
+                    num_rows as usize, page_layout.rows,
+                    "index page {} row count",
+                    idx
+                );
+            }
+        }
+
+        // Check against page data
+        assert_eq!(
+            row_group.columns().len(),
+            row_group_layout.columns.len(),
+            "column count mismatch"
+        );
+
+        let iter = row_group
+            .columns()
+            .iter()
+            .zip(&row_group_layout.columns)
+            .enumerate();
+
+        for (idx, (column, column_layout)) in iter {
+            let page_reader = SerializedPageReader::new(
+                Arc::new(file_reader.clone()),
+                column,
+                row_group.num_rows() as usize,
+                None,
+            )
+            .unwrap();
+
+            let pages = page_reader.collect::<Result<Vec<_>, _>>().unwrap();
+            assert_eq!(
+                pages.len(),
+                column_layout.pages.len()
+                    + column_layout.dictionary_page.is_some() as usize,
+                "page {} count mismatch",
+                idx
+            );
+
+            let page_layouts = column_layout
+                .dictionary_page
+                .iter()
+                .chain(&column_layout.pages);
+
+            for (page, page_layout) in pages.iter().zip(page_layouts) {
+                assert_eq!(page.encoding(), page_layout.encoding);
+                assert_eq!(
+                    page.buffer().len(),
+                    page_layout.compressed_size,
+                    "page {} size mismatch",
+                    idx
+                );
+                assert_eq!(page.page_type(), page_layout.page_type);
+            }
+        }
+    }
+}
+
+#[test]
+fn test_primitive() {
+    let array = Arc::new(Int32Array::from_iter_values(0..2000)) as _;
+    let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(false)
+        .set_data_pagesize_limit(1000)
+        .set_write_batch_size(10)
+        .build();
+
+    // Test spill plain encoding pages
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch.clone()],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: (0..8)
+                        .map(|_| Page {
+                            rows: 250,
+                            page_header_size: 34,
+                            compressed_size: 1000,
+                            encoding: Encoding::PLAIN,
+                            page_type: PageType::DATA_PAGE,
+                        })
+                        .collect(),
+                    dictionary_page: None,
+                }],
+            }],
+        },
+    });
+
+    // Test spill dictionary
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(true)
+        .set_dictionary_pagesize_limit(1000)
+        .set_data_pagesize_limit(10000)
+        .set_write_batch_size(10)
+        .build();
+
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch.clone()],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: vec![
+                        Page {
+                            rows: 250,
+                            page_header_size: 34,
+                            compressed_size: 258,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 1750,
+                            page_header_size: 34,
+                            compressed_size: 7000,
+                            encoding: Encoding::PLAIN,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                    ],
+                    dictionary_page: Some(Page {
+                        rows: 250,
+                        page_header_size: 34,
+                        compressed_size: 1000,
+                        encoding: Encoding::PLAIN,
+                        page_type: PageType::DICTIONARY_PAGE,
+                    }),
+                }],
+            }],
+        },
+    });
+
+    // Test spill dictionary encoded pages
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(true)
+        .set_dictionary_pagesize_limit(10000)
+        .set_data_pagesize_limit(500)
+        .set_write_batch_size(10)
+        .build();
+
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: vec![
+                        Page {
+                            rows: 400,
+                            page_header_size: 34,
+                            compressed_size: 452,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 370,
+                            page_header_size: 34,
+                            compressed_size: 472,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 330,
+                            page_header_size: 34,
+                            compressed_size: 464,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 330,
+                            page_header_size: 34,
+                            compressed_size: 464,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 330,
+                            page_header_size: 34,
+                            compressed_size: 464,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                        Page {
+                            rows: 240,
+                            page_header_size: 34,
+                            compressed_size: 332,
+                            encoding: Encoding::RLE_DICTIONARY,
+                            page_type: PageType::DATA_PAGE,
+                        },
+                    ],
+                    dictionary_page: Some(Page {
+                        rows: 2000,
+                        page_header_size: 34,
+                        compressed_size: 8000,
+                        encoding: Encoding::PLAIN,
+                        page_type: PageType::DICTIONARY_PAGE,
+                    }),
+                }],
+            }],
+        },
+    });
+}
+
+#[test]
+fn test_string() {
+    let array = Arc::new(StringArray::from_iter_values(
+        (0..2000).map(|x| format!("{:04}", x)),
+    )) as _;
+    let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(false)
+        .set_data_pagesize_limit(1000)
+        .set_write_batch_size(10)
+        .build();
+
+    // Test spill plain encoding pages
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch.clone()],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: (0..15)
+                        .map(|_| Page {
+                            rows: 130,
+                            page_header_size: 34,
+                            compressed_size: 1040,
+                            encoding: Encoding::PLAIN,
+                            page_type: PageType::DATA_PAGE,
+                        })
+                        .chain(std::iter::once(Page {
+                            rows: 50,
+                            page_header_size: 33,
+                            compressed_size: 400,
+                            encoding: Encoding::PLAIN,
+                            page_type: PageType::DATA_PAGE,
+                        }))
+                        .collect(),
+                    dictionary_page: None,
+                }],
+            }],
+        },
+    });
+
+    // Test spill dictionary
+    let props = WriterProperties::builder()
+        .set_dictionary_enabled(true)
+        .set_dictionary_pagesize_limit(1000)
+        .set_data_pagesize_limit(10000)
+        .set_write_batch_size(10)
+        .build();
+
+    do_test(LayoutTest {
+        props,
+        batches: vec![batch.clone()],
+        layout: Layout {
+            row_groups: vec![RowGroup {
+                columns: vec![ColumnChunk {
+                    pages: vec![
+                        Page {

Review Comment:
   why did this dictionary spill? It wasn't at the dictionary page limit (1000) or the data pagesize limit (10000) that I can tell



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org