You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/02/01 14:28:22 UTC
[arrow-rs] branch master updated: Batch multiple records in ArrowWriter (#1214)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 891b8d0 Batch multiple records in ArrowWriter (#1214)
891b8d0 is described below
commit 891b8d0dbdcf970863bf4b337c10d41274a1ecdd
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Feb 1 14:28:15 2022 +0000
Batch multiple records in ArrowWriter (#1214)
* Batch multiple records in ArrowWriter
* Document max_group_size and reduce default (#1213)
* Review feedback
* Write multiple arrays without concat
* Clippy
* Test aggregating complex types
* Test complex slice
* Clippy
---
arrow/src/util/pretty.rs | 10 +-
parquet/Cargo.toml | 2 +-
parquet/src/arrow/arrow_writer.rs | 434 ++++++++++++++++++++++++++++++++------
parquet/src/arrow/levels.rs | 9 +-
parquet/src/file/properties.rs | 6 +-
5 files changed, 380 insertions(+), 81 deletions(-)
diff --git a/arrow/src/util/pretty.rs b/arrow/src/util/pretty.rs
index 91343ec..4b67f3d 100644
--- a/arrow/src/util/pretty.rs
+++ b/arrow/src/util/pretty.rs
@@ -74,7 +74,7 @@ fn create_table(results: &[RecordBatch]) -> Result<Table> {
let mut cells = Vec::new();
for col in 0..batch.num_columns() {
let column = batch.column(col);
- cells.push(Cell::new(&array_value_to_string(&column, row)?));
+ cells.push(Cell::new(&array_value_to_string(column, row)?));
}
table.add_row(cells);
}
@@ -96,7 +96,7 @@ fn create_column(field: &str, columns: &[ArrayRef]) -> Result<Table> {
for col in columns {
for row in 0..col.len() {
- let cells = vec![Cell::new(&array_value_to_string(&col, row)?)];
+ let cells = vec![Cell::new(&array_value_to_string(col, row)?)];
table.add_row(cells);
}
}
@@ -320,7 +320,7 @@ mod tests {
let mut builder = FixedSizeBinaryBuilder::new(3, 3);
builder.append_value(&[1, 2, 3]).unwrap();
- builder.append_null();
+ builder.append_null().unwrap();
builder.append_value(&[7, 8, 9]).unwrap();
let array = Arc::new(builder.finish());
@@ -677,7 +677,7 @@ mod tests {
)?;
let mut buf = String::new();
- write!(&mut buf, "{}", pretty_format_batches(&[batch])?.to_string()).unwrap();
+ write!(&mut buf, "{}", pretty_format_batches(&[batch])?).unwrap();
let s = vec![
"+---+-----+",
@@ -689,7 +689,7 @@ mod tests {
"| d | 100 |",
"+---+-----+",
];
- let expected = String::from(s.join("\n"));
+ let expected = s.join("\n");
assert_eq!(expected, buf);
Ok(())
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 0765da1..3dd65f6 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -55,7 +55,7 @@ brotli = "3.3"
flate2 = "1.0"
lz4 = "1.23"
serde_json = { version = "1.0", features = ["preserve_order"] }
-arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] }
+arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] }
[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index fc3a567..c0205c4 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -17,9 +17,11 @@
//! Contains writer which writes arrow data into parquet data.
+use std::collections::VecDeque;
use std::sync::Arc;
use arrow::array as arrow_array;
+use arrow::array::ArrayRef;
use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
@@ -40,14 +42,25 @@ use crate::{
/// Arrow writer
///
-/// Writes Arrow `RecordBatch`es to a Parquet writer
+/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up `RecordBatch` in order
+/// to produce row groups with `max_row_group_size` rows. Any remaining rows will be
+/// flushed on close, leading the final row group in the output file to potentially
+/// contain fewer than `max_row_group_size` rows
pub struct ArrowWriter<W: ParquetWriter> {
/// Underlying Parquet writer
writer: SerializedFileWriter<W>,
+
+ /// For each column, maintain an ordered queue of arrays to write
+ buffer: Vec<VecDeque<ArrayRef>>,
+
+ /// The total number of rows currently buffered
+ buffered_rows: usize,
+
/// A copy of the Arrow schema.
///
/// The schema is used to verify that each record batch written has the correct schema
arrow_schema: SchemaRef,
+
/// The length of arrays to write to each row group
max_row_group_size: usize,
}
@@ -75,18 +88,18 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
Ok(Self {
writer: file_writer,
+ buffer: vec![Default::default(); arrow_schema.fields().len()],
+ buffered_rows: 0,
arrow_schema,
max_row_group_size,
})
}
- /// Write a RecordBatch to writer
+ /// Enqueues the provided `RecordBatch` to be written
///
- /// The writer will slice the `batch` into `max_row_group_size`,
- /// but if a batch has left-over rows less than the row group size,
- /// the last row group will have fewer records.
- /// This is currently a limitation because we close the row group
- /// instead of keeping it open for the next batch.
+ /// If following this there are more than `max_row_group_size` rows buffered,
+ /// this will flush out one or more row groups with `max_row_group_size` rows,
+ /// and drop any fully written `RecordBatch`
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
// validate batch schema against writer's supplied schema
if self.arrow_schema != batch.schema() {
@@ -94,58 +107,118 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
"Record batch schema does not match writer schema".to_string(),
));
}
- // Track the number of rows being written in the batch.
- // We currently do not have a way of slicing nested arrays, thus we
- // track this manually.
- let num_rows = batch.num_rows();
- let batches = (num_rows + self.max_row_group_size - 1) / self.max_row_group_size;
- let min_batch = num_rows.min(self.max_row_group_size);
- for batch_index in 0..batches {
- // Determine the offset and length of arrays
- let offset = batch_index * min_batch;
- let length = (num_rows - offset).min(self.max_row_group_size);
-
- // Compute the definition and repetition levels of the batch
- let batch_level = LevelInfo::new(offset, length);
- let mut row_group_writer = self.writer.next_row_group()?;
- for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
- let mut levels = batch_level.calculate_array_levels(array, field);
- // Reverse levels as we pop() them when writing arrays
- levels.reverse();
- write_leaves(&mut row_group_writer, array, &mut levels)?;
+
+ for (buffer, column) in self.buffer.iter_mut().zip(batch.columns()) {
+ buffer.push_back(column.clone())
+ }
+
+ self.buffered_rows += batch.num_rows();
+
+ self.flush_completed()?;
+
+ Ok(())
+ }
+
+ /// Flushes buffered data until there are less than `max_row_group_size` rows buffered
+ fn flush_completed(&mut self) -> Result<()> {
+ while self.buffered_rows >= self.max_row_group_size {
+ self.flush_row_group(self.max_row_group_size)?;
+ }
+ Ok(())
+ }
+
+ /// Flushes `num_rows` from the buffer into a new row group
+ fn flush_row_group(&mut self, num_rows: usize) -> Result<()> {
+ if num_rows == 0 {
+ return Ok(());
+ }
+
+ assert!(
+ num_rows <= self.buffered_rows,
+ "cannot flush {} rows only have {}",
+ num_rows,
+ self.buffered_rows
+ );
+
+ assert!(
+ num_rows <= self.max_row_group_size,
+ "cannot flush {} rows would exceed max row group size of {}",
+ num_rows,
+ self.max_row_group_size
+ );
+
+ let mut row_group_writer = self.writer.next_row_group()?;
+
+ for (col_buffer, field) in self.buffer.iter_mut().zip(self.arrow_schema.fields())
+ {
+ // Collect the number of arrays to append
+ let mut remaining = num_rows;
+ let mut arrays = Vec::with_capacity(col_buffer.len());
+ while remaining != 0 {
+ match col_buffer.pop_front() {
+ Some(next) if next.len() > remaining => {
+ col_buffer
+ .push_front(next.slice(remaining, next.len() - remaining));
+ arrays.push(next.slice(0, remaining));
+ remaining = 0;
+ }
+ Some(next) => {
+ remaining -= next.len();
+ arrays.push(next);
+ }
+ _ => break,
+ }
}
- self.writer.close_row_group(row_group_writer)?;
+ let mut levels: Vec<_> = arrays
+ .iter()
+ .map(|array| {
+ let batch_level = LevelInfo::new(0, array.len());
+ let mut levels = batch_level.calculate_array_levels(array, field);
+ // Reverse levels as we pop() them when writing arrays
+ levels.reverse();
+ levels
+ })
+ .collect();
+
+ write_leaves(row_group_writer.as_mut(), &arrays, &mut levels)?;
}
+ self.writer.close_row_group(row_group_writer)?;
+ self.buffered_rows -= num_rows;
+
Ok(())
}
/// Close and finalize the underlying Parquet writer
pub fn close(&mut self) -> Result<parquet_format::FileMetaData> {
+ self.flush_completed()?;
+ self.flush_row_group(self.buffered_rows)?;
self.writer.close()
}
}
/// Convenience method to get the next ColumnWriter from the RowGroupWriter
#[inline]
-#[allow(clippy::borrowed_box)]
-fn get_col_writer(
- row_group_writer: &mut Box<dyn RowGroupWriter>,
-) -> Result<ColumnWriter> {
+fn get_col_writer(row_group_writer: &mut dyn RowGroupWriter) -> Result<ColumnWriter> {
let col_writer = row_group_writer
.next_column()?
.expect("Unable to get column writer");
Ok(col_writer)
}
-#[allow(clippy::borrowed_box)]
fn write_leaves(
- row_group_writer: &mut Box<dyn RowGroupWriter>,
- array: &arrow_array::ArrayRef,
- levels: &mut Vec<LevelInfo>,
+ row_group_writer: &mut dyn RowGroupWriter,
+ arrays: &[ArrayRef],
+ levels: &mut [Vec<LevelInfo>],
) -> Result<()> {
- match array.data_type() {
+ assert_eq!(arrays.len(), levels.len());
+ assert!(!arrays.is_empty());
+
+ let data_type = arrays.first().unwrap().data_type().clone();
+ assert!(arrays.iter().all(|a| a.data_type() == &data_type));
+
+ match &data_type {
ArrowDataType::Null
| ArrowDataType::Boolean
| ArrowDataType::Int8
@@ -172,50 +245,76 @@ fn write_leaves(
| ArrowDataType::Decimal(_, _)
| ArrowDataType::FixedSizeBinary(_) => {
let mut col_writer = get_col_writer(row_group_writer)?;
- write_leaf(
- &mut col_writer,
- array,
- levels.pop().expect("Levels exhausted"),
- )?;
+ for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
+ write_leaf(
+ &mut col_writer,
+ array,
+ levels.pop().expect("Levels exhausted"),
+ )?;
+ }
row_group_writer.close_column(col_writer)?;
Ok(())
}
ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
- // write the child list
- let data = array.data();
- let child_array = arrow_array::make_array(data.child_data()[0].clone());
- write_leaves(row_group_writer, &child_array, levels)?;
+ let arrays: Vec<_> = arrays.iter().map(|array|{
+ // write the child list
+ let data = array.data();
+ arrow_array::make_array(data.child_data()[0].clone())
+ }).collect();
+
+ write_leaves(row_group_writer, &arrays, levels)?;
Ok(())
}
- ArrowDataType::Struct(_) => {
- let struct_array: &arrow_array::StructArray = array
- .as_any()
- .downcast_ref::<arrow_array::StructArray>()
- .expect("Unable to get struct array");
- for field in struct_array.columns() {
- write_leaves(row_group_writer, field, levels)?;
+ ArrowDataType::Struct(fields) => {
+ // Groups child arrays by field
+ let mut field_arrays = vec![Vec::with_capacity(arrays.len()); fields.len()];
+
+ for array in arrays {
+ let struct_array: &arrow_array::StructArray = array
+ .as_any()
+ .downcast_ref::<arrow_array::StructArray>()
+ .expect("Unable to get struct array");
+
+ assert_eq!(struct_array.columns().len(), fields.len());
+
+ for (child_array, field) in field_arrays.iter_mut().zip(struct_array.columns()) {
+ child_array.push(field.clone())
+ }
+ }
+
+ for field in field_arrays {
+ write_leaves(row_group_writer, &field, levels)?;
}
+
Ok(())
}
ArrowDataType::Map(_, _) => {
- let map_array: &arrow_array::MapArray = array
- .as_any()
- .downcast_ref::<arrow_array::MapArray>()
- .expect("Unable to get map array");
- write_leaves(row_group_writer, &map_array.keys(), levels)?;
- write_leaves(row_group_writer, &map_array.values(), levels)?;
+ let mut keys = Vec::with_capacity(arrays.len());
+ let mut values = Vec::with_capacity(arrays.len());
+ for array in arrays {
+ let map_array: &arrow_array::MapArray = array
+ .as_any()
+ .downcast_ref::<arrow_array::MapArray>()
+ .expect("Unable to get map array");
+ keys.push(map_array.keys());
+ values.push(map_array.values());
+ }
+
+ write_leaves(row_group_writer, &keys, levels)?;
+ write_leaves(row_group_writer, &values, levels)?;
Ok(())
}
ArrowDataType::Dictionary(_, value_type) => {
- // cast dictionary to a primitive
- let array = arrow::compute::cast(array, value_type)?;
-
let mut col_writer = get_col_writer(row_group_writer)?;
- write_leaf(
- &mut col_writer,
- &array,
- levels.pop().expect("Levels exhausted"),
- )?;
+ for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
+ // cast dictionary to a primitive
+ let array = arrow::compute::cast(array, value_type)?;
+ write_leaf(
+ &mut col_writer,
+ &array,
+ levels.pop().expect("Levels exhausted"),
+ )?;
+ }
row_group_writer.close_column(col_writer)?;
Ok(())
}
@@ -226,7 +325,7 @@ fn write_leaves(
Err(ParquetError::NYI(
format!(
"Attempting to write an Arrow type {:?} to parquet that is not yet implemented",
- array.data_type()
+ data_type
)
))
}
@@ -593,10 +692,13 @@ mod tests {
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
+ use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
+ use arrow::util::pretty::pretty_format_batches;
use arrow::{array::*, buffer::Buffer};
use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+ use crate::file::metadata::ParquetMetaData;
use crate::file::{
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
@@ -1723,4 +1825,202 @@ mod tests {
one_column_roundtrip(array, true, Some(10));
}
+
+ fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
+ metadata.row_groups().iter().map(|x| x.num_rows()).collect()
+ }
+
+ #[test]
+ fn test_aggregates_records() {
+ let arrays = [
+ Int32Array::from((0..100).collect::<Vec<_>>()),
+ Int32Array::from((0..50).collect::<Vec<_>>()),
+ Int32Array::from((200..500).collect::<Vec<_>>()),
+ ];
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+
+ let file = tempfile::tempfile().unwrap();
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(200)
+ .build();
+
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props))
+ .unwrap();
+
+ for array in arrays {
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
+ writer.write(&batch).unwrap();
+ }
+
+ writer.close().unwrap();
+
+ let reader = SerializedFileReader::new(file).unwrap();
+ assert_eq!(&row_group_sizes(reader.metadata()), &[200, 200, 50]);
+
+ let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
+ let batches = arrow_reader
+ .get_record_reader(100)
+ .unwrap()
+ .collect::<ArrowResult<Vec<_>>>()
+ .unwrap();
+
+ assert_eq!(batches.len(), 5);
+ assert!(batches.iter().all(|x| x.num_columns() == 1));
+
+ let batch_sizes: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
+
+ assert_eq!(&batch_sizes, &[100, 100, 100, 100, 50]);
+
+ let values: Vec<_> = batches
+ .iter()
+ .flat_map(|x| {
+ x.column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .values()
+ .iter()
+ .cloned()
+ })
+ .collect();
+
+ let expected_values: Vec<_> =
+ [0..100, 0..50, 200..500].into_iter().flatten().collect();
+ assert_eq!(&values, &expected_values)
+ }
+
+ #[test]
+ fn complex_aggregate() {
+ // Tests aggregating nested data
+ let field_a = Field::new("leaf_a", DataType::Int32, false);
+ let field_b = Field::new("leaf_b", DataType::Int32, true);
+ let struct_a = Field::new(
+ "struct_a",
+ DataType::Struct(vec![field_a.clone(), field_b.clone()]),
+ true,
+ );
+
+ let list_a = Field::new("list", DataType::List(Box::new(struct_a)), true);
+ let struct_b =
+ Field::new("struct_b", DataType::Struct(vec![list_a.clone()]), false);
+
+ let schema = Arc::new(Schema::new(vec![struct_b]));
+
+ // create nested data
+ let field_a_array = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
+ let field_b_array =
+ Int32Array::from_iter(vec![Some(1), None, Some(2), None, None, Some(6)]);
+
+ let struct_a_array = StructArray::from(vec![
+ (field_a.clone(), Arc::new(field_a_array) as ArrayRef),
+ (field_b.clone(), Arc::new(field_b_array) as ArrayRef),
+ ]);
+
+ let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
+ .len(5)
+ .add_buffer(Buffer::from_iter(vec![
+ 0_i32, 1_i32, 1_i32, 3_i32, 3_i32, 5_i32,
+ ]))
+ .null_bit_buffer(Buffer::from_iter(vec![true, false, true, false, true]))
+ .child_data(vec![struct_a_array.data().clone()])
+ .build()
+ .unwrap();
+
+ let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
+ let struct_b_array = StructArray::from(vec![(list_a.clone(), list_a_array)]);
+
+ let batch1 = RecordBatch::try_from_iter(vec![(
+ "struct_b",
+ Arc::new(struct_b_array) as ArrayRef,
+ )])
+ .unwrap();
+
+ let field_a_array = Int32Array::from(vec![6, 7, 8, 9, 10]);
+ let field_b_array = Int32Array::from_iter(vec![None, None, None, Some(1), None]);
+
+ let struct_a_array = StructArray::from(vec![
+ (field_a, Arc::new(field_a_array) as ArrayRef),
+ (field_b, Arc::new(field_b_array) as ArrayRef),
+ ]);
+
+ let list_data = ArrayDataBuilder::new(list_a.data_type().clone())
+ .len(2)
+ .add_buffer(Buffer::from_iter(vec![0_i32, 4_i32, 5_i32]))
+ .child_data(vec![struct_a_array.data().clone()])
+ .build()
+ .unwrap();
+
+ let list_a_array = Arc::new(ListArray::from(list_data)) as ArrayRef;
+ let struct_b_array = StructArray::from(vec![(list_a, list_a_array)]);
+
+ let batch2 = RecordBatch::try_from_iter(vec![(
+ "struct_b",
+ Arc::new(struct_b_array) as ArrayRef,
+ )])
+ .unwrap();
+
+ let batches = &[batch1, batch2];
+
+ // Verify data is as expected
+
+ let expected = r#"
+ +-------------------------------------------------------------------------------------------------------------------------------------+
+ | struct_b |
+ +-------------------------------------------------------------------------------------------------------------------------------------+
+ | {"list": [{"leaf_a": 1, "leaf_b": 1}]} |
+ | {"list": null} |
+ | {"list": [{"leaf_a": 2, "leaf_b": null}, {"leaf_a": 3, "leaf_b": 2}]} |
+ | {"list": null} |
+ | {"list": [{"leaf_a": 4, "leaf_b": null}, {"leaf_a": 5, "leaf_b": null}]} |
+ | {"list": [{"leaf_a": 6, "leaf_b": null}, {"leaf_a": 7, "leaf_b": null}, {"leaf_a": 8, "leaf_b": null}, {"leaf_a": 9, "leaf_b": 1}]} |
+ | {"list": [{"leaf_a": 10, "leaf_b": null}]} |
+ +-------------------------------------------------------------------------------------------------------------------------------------+
+ "#.trim().split('\n').map(|x| x.trim()).collect::<Vec<_>>().join("\n");
+
+ let actual = pretty_format_batches(batches).unwrap().to_string();
+ assert_eq!(actual, expected);
+
+ // Write data
+ let file = tempfile::tempfile().unwrap();
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(6)
+ .build();
+
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), schema, Some(props)).unwrap();
+
+ for batch in batches {
+ writer.write(batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ // Read Data
+ let reader = SerializedFileReader::new(file).unwrap();
+
+ // Should have written entire first batch and first row of second to the first row group
+ // leaving a single row in the second row group
+ assert_eq!(&row_group_sizes(reader.metadata()), &[6, 1]);
+
+ let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
+ let batches = arrow_reader
+ .get_record_reader(2)
+ .unwrap()
+ .collect::<ArrowResult<Vec<_>>>()
+ .unwrap();
+
+ assert_eq!(batches.len(), 4);
+ let batch_counts: Vec<_> = batches.iter().map(|x| x.num_rows()).collect();
+ assert_eq!(&batch_counts, &[2, 2, 2, 1]);
+
+ let actual = pretty_format_batches(&batches).unwrap().to_string();
+ assert_eq!(actual, expected);
+ }
}
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index ea1f212..20b0ff7 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -40,7 +40,7 @@
//!
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
-use arrow::array::{make_array, ArrayRef, MapArray, StructArray};
+use arrow::array::{make_array, Array, ArrayRef, MapArray, StructArray};
use arrow::datatypes::{DataType, Field};
/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
@@ -711,12 +711,11 @@ impl LevelInfo {
((0..=(len as i64)).collect(), array_mask)
}
DataType::List(_) | DataType::Map(_, _) => {
- let data = array.data();
- let offsets = unsafe { data.buffers()[0].typed_data::<i32>() };
+ let offsets = unsafe { array.data().buffers()[0].typed_data::<i32>() };
let offsets = offsets
.to_vec()
.into_iter()
- .skip(offset)
+ .skip(array.offset() + offset)
.take(len + 1)
.map(|v| v as i64)
.collect::<Vec<i64>>();
@@ -729,7 +728,7 @@ impl LevelInfo {
DataType::LargeList(_) => {
let offsets = unsafe { array.data().buffers()[0].typed_data::<i64>() }
.iter()
- .skip(offset)
+ .skip(array.offset() + offset)
.take(len + 1)
.copied()
.collect();
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index c48e4e7..b42516e 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -62,7 +62,7 @@ const DEFAULT_DICTIONARY_ENABLED: bool = true;
const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
const DEFAULT_STATISTICS_ENABLED: bool = true;
const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
-const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 128 * 1024 * 1024;
+const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY");
/// Parquet writer version.
@@ -129,7 +129,7 @@ impl WriterProperties {
self.write_batch_size
}
- /// Returns max size for a row group.
+ /// Returns maximum number of rows in a row group.
pub fn max_row_group_size(&self) -> usize {
self.max_row_group_size
}
@@ -288,7 +288,7 @@ impl WriterPropertiesBuilder {
self
}
- /// Sets max size for a row group.
+ /// Sets maximum number of rows in a row group.
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
assert!(value > 0, "Cannot have a 0 max row group size");
self.max_row_group_size = value;