You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/02 19:33:50 UTC
[arrow-rs] branch master updated: Respect max rowgroup size in
Arrow writer (#381)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 7ec3158 Respect max rowgroup size in Arrow writer (#381)
7ec3158 is described below
commit 7ec3158b09d24a63730f7aa63d147fd78e2102a5
Author: Wakahisa <ne...@gmail.com>
AuthorDate: Wed Jun 2 21:31:50 2021 +0200
Respect max rowgroup size in Arrow writer (#381)
* Respect max rowgroup size in Arrow writer
* simplify while loop
* address review feedback
---
parquet/src/arrow/arrow_writer.rs | 176 ++++++++++++++++++++++++++--------
parquet/src/arrow/levels.rs | 193 +++++++++++++++++++++++++++-----------
parquet/src/file/properties.rs | 1 +
3 files changed, 277 insertions(+), 93 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index 332b893..69ebce6 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -47,6 +47,8 @@ pub struct ArrowWriter<W: ParquetWriter> {
///
/// The schema is used to verify that each record batch written has the correct schema
arrow_schema: SchemaRef,
+ /// The length of arrays to write to each row group
+ max_row_group_size: usize,
}
impl<W: 'static + ParquetWriter> ArrowWriter<W> {
@@ -65,6 +67,8 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
let mut props = props.unwrap_or_else(|| WriterProperties::builder().build());
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
+ let max_row_group_size = props.max_row_group_size();
+
let file_writer = SerializedFileWriter::new(
writer.try_clone()?,
schema.root_schema_ptr(),
@@ -74,12 +78,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
Ok(Self {
writer: file_writer,
arrow_schema,
+ max_row_group_size,
})
}
/// Write a RecordBatch to writer
///
- /// *NOTE:* The writer currently does not support all Arrow data types
+ /// The writer will slice the `batch` into `max_row_group_size`,
+ /// but if a batch has left-over rows less than the row group size,
+ /// the last row group will have fewer records.
+ /// This is currently a limitation because we close the row group
+ /// instead of keeping it open for the next batch.
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
// validate batch schema against writer's supplied schema
if self.arrow_schema != batch.schema() {
@@ -87,17 +96,31 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
"Record batch schema does not match writer schema".to_string(),
));
}
- // compute the definition and repetition levels of the batch
- let batch_level = LevelInfo::new_from_batch(batch);
- let mut row_group_writer = self.writer.next_row_group()?;
- for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
- let mut levels = batch_level.calculate_array_levels(array, field);
- // Reverse levels as we pop() them when writing arrays
- levels.reverse();
- write_leaves(&mut row_group_writer, array, &mut levels)?;
+ // Track the number of rows being written in the batch.
+ // We currently do not have a way of slicing nested arrays, thus we
+ // track this manually.
+ let num_rows = batch.num_rows();
+ let batches = (num_rows + self.max_row_group_size - 1) / self.max_row_group_size;
+ let min_batch = num_rows.min(self.max_row_group_size);
+ for batch_index in 0..batches {
+ // Determine the offset and length of arrays
+ let offset = batch_index * min_batch;
+ let length = (num_rows - offset).min(self.max_row_group_size);
+
+ // Compute the definition and repetition levels of the batch
+ let batch_level = LevelInfo::new(offset, length);
+ let mut row_group_writer = self.writer.next_row_group()?;
+ for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
+ let mut levels = batch_level.calculate_array_levels(array, field);
+ // Reverse levels as we pop() them when writing arrays
+ levels.reverse();
+ write_leaves(&mut row_group_writer, array, &mut levels)?;
+ }
+
+ self.writer.close_row_group(row_group_writer)?;
}
- self.writer.close_row_group(row_group_writer)
+ Ok(())
}
/// Close and finalize the underlying Parquet writer
@@ -209,16 +232,19 @@ fn write_leaf(
levels: LevelInfo,
) -> Result<i64> {
let indices = levels.filter_array_indices();
+ // Slice array according to computed offset and length
+ let column = column.slice(levels.offset, levels.length);
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
let values = match column.data_type() {
ArrowDataType::Date64 => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
- let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
+ let array =
+ arrow::compute::cast(&column, &ArrowDataType::Date32)?;
arrow::compute::cast(&array, &ArrowDataType::Int32)?
} else {
- arrow::compute::cast(column, &ArrowDataType::Int32)?
+ arrow::compute::cast(&column, &ArrowDataType::Int32)?
};
let array = array
.as_any()
@@ -240,7 +266,7 @@ fn write_leaf(
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
_ => {
- let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+ let array = arrow::compute::cast(&column, &ArrowDataType::Int32)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
@@ -288,7 +314,7 @@ fn write_leaf(
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
}
_ => {
- let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
+ let array = arrow::compute::cast(&column, &ArrowDataType::Int64)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
@@ -866,7 +892,17 @@ mod tests {
)
.unwrap();
- roundtrip("test_arrow_writer_complex.parquet", batch);
+ roundtrip(
+ "test_arrow_writer_complex.parquet",
+ batch.clone(),
+ Some(SMALL_SIZE / 2),
+ );
+
+ roundtrip(
+ "test_arrow_writer_complex_small_batch.parquet",
+ batch,
+ Some(SMALL_SIZE / 3),
+ );
}
#[test]
@@ -904,7 +940,11 @@ mod tests {
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
.unwrap();
- roundtrip("test_arrow_writer_complex_mixed.parquet", batch);
+ roundtrip(
+ "test_arrow_writer_complex_mixed.parquet",
+ batch,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
@@ -936,7 +976,11 @@ mod tests {
// build a racord batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
- roundtrip("test_arrow_writer_2_level_struct.parquet", batch);
+ roundtrip(
+ "test_arrow_writer_2_level_struct.parquet",
+ batch,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
@@ -966,7 +1010,11 @@ mod tests {
// build a racord batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
- roundtrip("test_arrow_writer_2_level_struct_non_null.parquet", batch);
+ roundtrip(
+ "test_arrow_writer_2_level_struct_non_null.parquet",
+ batch,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
@@ -998,18 +1046,30 @@ mod tests {
// build a racord batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
- roundtrip("test_arrow_writer_2_level_struct_mixed_null.parquet", batch);
+ roundtrip(
+ "test_arrow_writer_2_level_struct_mixed_null.parquet",
+ batch,
+ Some(SMALL_SIZE / 2),
+ );
}
- const SMALL_SIZE: usize = 4;
+ const SMALL_SIZE: usize = 7;
- fn roundtrip(filename: &str, expected_batch: RecordBatch) -> File {
+ fn roundtrip(
+ filename: &str,
+ expected_batch: RecordBatch,
+ max_row_group_size: Option<usize>,
+ ) -> File {
let file = get_temp_file(filename, &[]);
let mut writer = ArrowWriter::try_new(
file.try_clone().unwrap(),
expected_batch.schema(),
- None,
+ max_row_group_size.map(|size| {
+ WriterProperties::builder()
+ .set_max_row_group_size(size)
+ .build()
+ }),
)
.expect("Unable to write file");
writer.write(&expected_batch).unwrap();
@@ -1037,7 +1097,12 @@ mod tests {
file
}
- fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) -> File {
+ fn one_column_roundtrip(
+ filename: &str,
+ values: ArrayRef,
+ nullable: bool,
+ max_row_group_size: Option<usize>,
+ ) -> File {
let schema = Schema::new(vec![Field::new(
"col",
values.data_type().clone(),
@@ -1046,7 +1111,7 @@ mod tests {
let expected_batch =
RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
- roundtrip(filename, expected_batch)
+ roundtrip(filename, expected_batch, max_row_group_size)
}
fn values_required<A, I>(iter: I, filename: &str)
@@ -1056,7 +1121,7 @@ mod tests {
{
let raw_values: Vec<_> = iter.into_iter().collect();
let values = Arc::new(A::from(raw_values));
- one_column_roundtrip(filename, values, false);
+ one_column_roundtrip(filename, values, false, Some(SMALL_SIZE / 2));
}
fn values_optional<A, I>(iter: I, filename: &str)
@@ -1070,7 +1135,7 @@ mod tests {
.map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
.collect();
let optional_values = Arc::new(A::from(optional_raw_values));
- one_column_roundtrip(filename, optional_values, true);
+ one_column_roundtrip(filename, optional_values, true, Some(SMALL_SIZE / 2));
}
fn required_and_optional<A, I>(iter: I, filename: &str)
@@ -1085,12 +1150,17 @@ mod tests {
#[test]
fn all_null_primitive_single_column() {
let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
- one_column_roundtrip("all_null_primitive_single_column", values, true);
+ one_column_roundtrip(
+ "all_null_primitive_single_column",
+ values,
+ true,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
fn null_single_column() {
let values = Arc::new(NullArray::new(SMALL_SIZE));
- one_column_roundtrip("null_single_column", values, true);
+ one_column_roundtrip("null_single_column", values, true, Some(SMALL_SIZE / 2));
// null arrays are always nullable, a test with non-nullable nulls fails
}
@@ -1176,7 +1246,7 @@ mod tests {
let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None));
- one_column_roundtrip("timestamp_second_single_column", values, false);
+ one_column_roundtrip("timestamp_second_single_column", values, false, Some(3));
}
#[test]
@@ -1184,7 +1254,12 @@ mod tests {
let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None));
- one_column_roundtrip("timestamp_millisecond_single_column", values, false);
+ one_column_roundtrip(
+ "timestamp_millisecond_single_column",
+ values,
+ false,
+ Some(SMALL_SIZE / 2 + 1),
+ );
}
#[test]
@@ -1192,7 +1267,12 @@ mod tests {
let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None));
- one_column_roundtrip("timestamp_microsecond_single_column", values, false);
+ one_column_roundtrip(
+ "timestamp_microsecond_single_column",
+ values,
+ false,
+ Some(SMALL_SIZE / 2 + 2),
+ );
}
#[test]
@@ -1200,7 +1280,12 @@ mod tests {
let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None));
- one_column_roundtrip("timestamp_nanosecond_single_column", values, false);
+ one_column_roundtrip(
+ "timestamp_nanosecond_single_column",
+ values,
+ false,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
@@ -1336,7 +1421,12 @@ mod tests {
builder.append_value(b"1112").unwrap();
let array = Arc::new(builder.finish());
- one_column_roundtrip("fixed_size_binary_single_column", array, true);
+ one_column_roundtrip(
+ "fixed_size_binary_single_column",
+ array,
+ true,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
@@ -1379,7 +1469,7 @@ mod tests {
let a = ListArray::from(a_list_data);
let values = Arc::new(a);
- one_column_roundtrip("list_single_column", values, true);
+ one_column_roundtrip("list_single_column", values, true, Some(SMALL_SIZE / 2));
}
#[test]
@@ -1404,7 +1494,12 @@ mod tests {
let a = LargeListArray::from(a_list_data);
let values = Arc::new(a);
- one_column_roundtrip("large_list_single_column", values, true);
+ one_column_roundtrip(
+ "large_list_single_column",
+ values,
+ true,
+ Some(SMALL_SIZE / 2),
+ );
}
#[test]
@@ -1414,7 +1509,7 @@ mod tests {
let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);
let values = Arc::new(s);
- one_column_roundtrip("struct_single_column", values, false);
+ one_column_roundtrip("struct_single_column", values, false, Some(SMALL_SIZE / 2));
}
#[test]
@@ -1440,6 +1535,7 @@ mod tests {
roundtrip(
"test_arrow_writer_string_dictionary.parquet",
expected_batch,
+ Some(SMALL_SIZE / 2),
);
}
@@ -1470,6 +1566,7 @@ mod tests {
roundtrip(
"test_arrow_writer_primitive_dictionary.parquet",
expected_batch,
+ Some(SMALL_SIZE / 2),
);
}
@@ -1496,6 +1593,7 @@ mod tests {
roundtrip(
"test_arrow_writer_string_dictionary_unsigned_index.parquet",
expected_batch,
+ Some(SMALL_SIZE / 2),
);
}
@@ -1511,7 +1609,7 @@ mod tests {
u32::MAX - 1,
u32::MAX,
]));
- let file = one_column_roundtrip("u32_min_max_single_column", values, false);
+ let file = one_column_roundtrip("u32_min_max_single_column", values, false, None);
// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
@@ -1542,7 +1640,7 @@ mod tests {
u64::MAX - 1,
u64::MAX,
]));
- let file = one_column_roundtrip("u64_min_max_single_column", values, false);
+ let file = one_column_roundtrip("u64_min_max_single_column", values, false, None);
// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
@@ -1565,7 +1663,7 @@ mod tests {
fn statistics_null_counts_only_nulls() {
// check that null-count statistics for "only NULL"-columns are correct
let values = Arc::new(UInt64Array::from(vec![None, None]));
- let file = one_column_roundtrip("null_counts", values, true);
+ let file = one_column_roundtrip("null_counts", values, true, None);
// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index be56726..2e95039 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -42,7 +42,6 @@
use arrow::array::{make_array, ArrayRef, StructArray};
use arrow::datatypes::{DataType, Field};
-use arrow::record_batch::RecordBatch;
/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
///
@@ -67,6 +66,10 @@ pub(crate) struct LevelInfo {
pub max_definition: i16,
/// The type of array represented by this level info
pub level_type: LevelType,
+ /// The offset of the current level's array
+ pub offset: usize,
+ /// The length of the current level's array
+ pub length: usize,
}
/// LevelType defines the type of level, and whether it is nullable or not
@@ -91,22 +94,23 @@ impl LevelType {
}
impl LevelInfo {
- /// Create a new [LevelInfo] from a record batch.
+ /// Create a new [LevelInfo] by filling `length` slots, and setting an initial offset.
///
/// This is a convenience function to populate the starting point of the traversal.
- pub(crate) fn new_from_batch(batch: &RecordBatch) -> Self {
- let num_rows = batch.num_rows();
+ pub(crate) fn new(offset: usize, length: usize) -> Self {
Self {
// a batch has no definition level yet
- definition: vec![0; num_rows],
+ definition: vec![0; length],
// a batch has no repetition as it is not a list
repetition: None,
// a batch has sequential offsets, should be num_rows + 1
- array_offsets: (0..=(num_rows as i64)).collect(),
+ array_offsets: (0..=(length as i64)).collect(),
// all values at a batch-level are non-null
- array_mask: vec![true; num_rows],
+ array_mask: vec![true; length],
max_definition: 0,
level_type: LevelType::Root,
+ offset,
+ length,
}
}
@@ -127,16 +131,19 @@ impl LevelInfo {
array: &ArrayRef,
field: &Field,
) -> Vec<Self> {
- let (array_offsets, array_mask) = Self::get_array_offsets_and_masks(array);
+ let (array_offsets, array_mask) =
+ Self::get_array_offsets_and_masks(array, self.offset, self.length);
match array.data_type() {
DataType::Null => vec![Self {
definition: self.definition.clone(),
repetition: self.repetition.clone(),
- array_offsets: self.array_offsets.clone(),
+ array_offsets,
array_mask,
max_definition: self.max_definition.max(1),
// Null type is always nullable
level_type: LevelType::Primitive(true),
+ offset: self.offset,
+ length: self.length,
}],
DataType::Boolean
| DataType::Int8
@@ -171,6 +178,8 @@ impl LevelInfo {
)]
}
DataType::List(list_field) | DataType::LargeList(list_field) => {
+ let child_offset = array_offsets[0] as usize;
+ let child_len = *array_offsets.last().unwrap() as usize;
// Calculate the list level
let list_level = self.calculate_child_levels(
array_offsets,
@@ -182,8 +191,11 @@ impl LevelInfo {
let array_data = array.data();
let child_data = array_data.child_data().get(0).unwrap();
let child_array = make_array(child_data.clone());
- let (child_offsets, child_mask) =
- Self::get_array_offsets_and_masks(&child_array);
+ let (child_offsets, child_mask) = Self::get_array_offsets_and_masks(
+ &child_array,
+ child_offset,
+ child_len - child_offset,
+ );
match child_array.data_type() {
// TODO: The behaviour of a <list<null>> is untested
@@ -364,14 +376,15 @@ impl LevelInfo {
if parent_len == 0 {
// If the parent length is 0, there won't be a slot for the child
- let index = start + nulls_seen;
+ let index = start + nulls_seen - self.offset;
definition.push(self.definition[index]);
repetition.push(0);
merged_array_mask.push(self.array_mask[index]);
nulls_seen += 1;
} else {
(start..end).for_each(|parent_index| {
- let index = parent_index + nulls_seen;
+ let index = parent_index + nulls_seen - self.offset;
+ let parent_index = parent_index - self.offset;
// parent is either defined at this level, or earlier
let parent_def = self.definition[index];
@@ -418,6 +431,9 @@ impl LevelInfo {
debug_assert_eq!(definition.len(), merged_array_mask.len());
+ let offset = *array_offsets.first().unwrap() as usize;
+ let length = *array_offsets.last().unwrap() as usize - offset;
+
Self {
definition,
repetition: Some(repetition),
@@ -425,6 +441,8 @@ impl LevelInfo {
array_mask: merged_array_mask,
max_definition,
level_type,
+ offset: offset + self.offset,
+ length,
}
}
(LevelType::List(_), _) => {
@@ -442,7 +460,7 @@ impl LevelInfo {
let parent_len = end - start;
if parent_len == 0 {
- let index = start + nulls_seen;
+ let index = start + nulls_seen - self.offset;
definition.push(self.definition[index]);
repetition.push(reps[index]);
merged_array_mask.push(self.array_mask[index]);
@@ -450,8 +468,8 @@ impl LevelInfo {
} else {
// iterate through the array, adjusting child definitions for nulls
(start..end).for_each(|child_index| {
- let index = child_index + nulls_seen;
- let child_mask = array_mask[child_index];
+ let index = child_index + nulls_seen - self.offset;
+ let child_mask = array_mask[child_index - self.offset];
let parent_mask = self.array_mask[index];
let parent_def = self.definition[index];
@@ -470,6 +488,9 @@ impl LevelInfo {
debug_assert_eq!(definition.len(), merged_array_mask.len());
+ let offset = *array_offsets.first().unwrap() as usize;
+ let length = *array_offsets.last().unwrap() as usize - offset;
+
Self {
definition,
repetition: Some(repetition),
@@ -477,6 +498,8 @@ impl LevelInfo {
array_mask: merged_array_mask,
max_definition,
level_type,
+ offset: offset + self.offset,
+ length,
}
}
(_, LevelType::List(is_nullable)) => {
@@ -539,6 +562,9 @@ impl LevelInfo {
debug_assert_eq!(definition.len(), merged_array_mask.len());
+ let offset = *array_offsets.first().unwrap() as usize;
+ let length = *array_offsets.last().unwrap() as usize - offset;
+
Self {
definition,
repetition: Some(repetition),
@@ -546,6 +572,8 @@ impl LevelInfo {
array_mask: merged_array_mask,
max_definition,
level_type,
+ offset,
+ length,
}
}
(_, _) => {
@@ -583,6 +611,9 @@ impl LevelInfo {
array_mask: merged_array_mask,
max_definition,
level_type,
+ // Inherit parent offset and length
+ offset: self.offset,
+ length: self.length,
}
}
}
@@ -592,7 +623,11 @@ impl LevelInfo {
/// - Primitive, binary and struct arrays' offsets will be a sequence, masks obtained
/// from validity bitmap
/// - List array offsets will be the value offsets, masks are computed from offsets
- fn get_array_offsets_and_masks(array: &ArrayRef) -> (Vec<i64>, Vec<bool>) {
+ fn get_array_offsets_and_masks(
+ array: &ArrayRef,
+ offset: usize,
+ len: usize,
+ ) -> (Vec<i64>, Vec<bool>) {
match array.data_type() {
DataType::Null
| DataType::Boolean
@@ -622,10 +657,10 @@ impl LevelInfo {
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _) => {
let array_mask = match array.data().null_buffer() {
- Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
- None => vec![true; array.len()],
+ Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
+ None => vec![true; len],
};
- ((0..=(array.len() as i64)).collect(), array_mask)
+ ((0..=(len as i64)).collect(), array_mask)
}
DataType::List(_) => {
let data = array.data();
@@ -633,31 +668,37 @@ impl LevelInfo {
let offsets = offsets
.to_vec()
.into_iter()
+ .skip(offset)
+ .take(len + 1)
.map(|v| v as i64)
.collect::<Vec<i64>>();
let array_mask = match array.data().null_buffer() {
- Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
- None => vec![true; array.len()],
+ Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
+ None => vec![true; len],
};
(offsets, array_mask)
}
DataType::LargeList(_) => {
- let offsets =
- unsafe { array.data().buffers()[0].typed_data::<i64>() }.to_vec();
+ let offsets = unsafe { array.data().buffers()[0].typed_data::<i64>() }
+ .iter()
+ .skip(offset)
+ .take(len + 1)
+ .copied()
+ .collect();
let array_mask = match array.data().null_buffer() {
- Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
- None => vec![true; array.len()],
+ Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
+ None => vec![true; len],
};
(offsets, array_mask)
}
DataType::FixedSizeBinary(value_len) => {
let array_mask = match array.data().null_buffer() {
- Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()),
- None => vec![true; array.len()],
+ Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len),
+ None => vec![true; len],
};
let value_len = *value_len as i64;
(
- (0..=(array.len() as i64)).map(|v| v * value_len).collect(),
+ (0..=(len as i64)).map(|v| v * value_len).collect(),
array_mask,
)
}
@@ -722,20 +763,14 @@ fn get_bool_array_slice(
#[cfg(test)]
mod tests {
- use std::sync::Arc;
+ use super::*;
- use arrow::{
- array::ListArray,
- array::{Array, ArrayData, Int32Array},
- buffer::Buffer,
- datatypes::Schema,
- };
- use arrow::{
- array::{Float32Array, Float64Array, Int16Array},
- datatypes::ToByteSlice,
- };
+ use std::sync::Arc;
- use super::*;
+ use arrow::array::*;
+ use arrow::buffer::Buffer;
+ use arrow::datatypes::{Schema, ToByteSlice};
+ use arrow::record_batch::RecordBatch;
#[test]
fn test_calculate_array_levels_twitter_example() {
@@ -748,6 +783,8 @@ mod tests {
array_mask: vec![true, true], // both lists defined
max_definition: 0,
level_type: LevelType::Root,
+ offset: 0,
+ length: 2,
};
// offset into array, each level1 has 2 values
let array_offsets = vec![0, 2, 4];
@@ -767,6 +804,8 @@ mod tests {
array_mask: vec![true, true, true, true],
max_definition: 1,
level_type: LevelType::List(false),
+ offset: 0,
+ length: 4,
};
// the separate asserts make it easier to see what's failing
assert_eq!(&levels.definition, &expected_levels.definition);
@@ -794,6 +833,8 @@ mod tests {
array_mask: vec![true; 10],
max_definition: 2,
level_type: LevelType::List(false),
+ offset: 0,
+ length: 10,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -814,6 +855,8 @@ mod tests {
array_mask: vec![true; 10],
max_definition: 0,
level_type: LevelType::Root,
+ offset: 0,
+ length: 10,
};
let array_offsets: Vec<i64> = (0..=10).collect();
let array_mask = vec![true; 10];
@@ -830,6 +873,8 @@ mod tests {
array_mask,
max_definition: 1,
level_type: LevelType::Primitive(false),
+ offset: 0,
+ length: 10,
};
assert_eq!(&levels, &expected_levels);
}
@@ -844,6 +889,8 @@ mod tests {
array_mask: vec![true, true, true, true, true],
max_definition: 0,
level_type: LevelType::Root,
+ offset: 0,
+ length: 5,
};
let array_offsets: Vec<i64> = (0..=5).collect();
let array_mask = vec![true, false, true, true, false];
@@ -860,6 +907,8 @@ mod tests {
array_mask,
max_definition: 1,
level_type: LevelType::Primitive(true),
+ offset: 0,
+ length: 5,
};
assert_eq!(&levels, &expected_levels);
}
@@ -875,6 +924,8 @@ mod tests {
array_mask: vec![true, true, true, true, true],
max_definition: 0,
level_type: LevelType::Root,
+ offset: 0,
+ length: 5,
};
let array_offsets = vec![0, 2, 2, 4, 8, 11];
let array_mask = vec![true, false, true, true, true];
@@ -905,6 +956,8 @@ mod tests {
],
max_definition: 1,
level_type: LevelType::List(true),
+ offset: 0,
+ length: 11, // the child has 11 slots
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -936,6 +989,8 @@ mod tests {
array_mask: vec![false, true, false, true, true],
max_definition: 1,
level_type: LevelType::Struct(true),
+ offset: 0,
+ length: 5,
};
let array_offsets = vec![0, 2, 2, 4, 8, 11];
let array_mask = vec![true, false, true, true, true];
@@ -960,6 +1015,8 @@ mod tests {
],
max_definition: 2,
level_type: LevelType::List(true),
+ offset: 0,
+ length: 11,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -1017,6 +1074,8 @@ mod tests {
],
max_definition: 4,
level_type: LevelType::List(true),
+ offset: 0,
+ length: 22,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -1042,6 +1101,8 @@ mod tests {
array_mask: vec![true, true, true, true],
max_definition: 1,
level_type: LevelType::Struct(true),
+ offset: 0,
+ length: 4,
};
// 0: null ([], but mask is false, so it's not just an empty list)
// 1: [1, 2, 3]
@@ -1066,6 +1127,8 @@ mod tests {
array_mask: vec![false, true, true, true, true, true, true, true],
max_definition: 2,
level_type: LevelType::List(true),
+ offset: 0,
+ length: 8,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -1113,6 +1176,8 @@ mod tests {
array_offsets,
max_definition: 4,
level_type: LevelType::List(true),
+ offset: 0,
+ length: 16,
};
assert_eq!(&levels.definition, &expected_levels.definition);
assert_eq!(&levels.repetition, &expected_levels.repetition);
@@ -1140,6 +1205,8 @@ mod tests {
array_mask: vec![true, true, true, true, false, true],
max_definition: 1,
level_type: LevelType::Struct(true),
+ offset: 0,
+ length: 6,
};
// b's offset and mask
let b_offsets: Vec<i64> = (0..=6).collect();
@@ -1152,6 +1219,8 @@ mod tests {
array_mask: vec![true, true, true, false, false, true],
max_definition: 2,
level_type: LevelType::Struct(true),
+ offset: 0,
+ length: 6,
};
let b_levels = a_levels.calculate_child_levels(
b_offsets.clone(),
@@ -1171,6 +1240,8 @@ mod tests {
array_mask: vec![true, false, true, false, false, true],
max_definition: 3,
level_type: LevelType::Struct(true),
+ offset: 0,
+ length: 6,
};
let c_levels =
b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true));
@@ -1203,15 +1274,17 @@ mod tests {
let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
let expected_batch_level = LevelInfo {
- definition: vec![0; 5],
+ definition: vec![0; 2],
repetition: None,
- array_offsets: (0..=5).collect(),
- array_mask: vec![true, true, true, true, true],
+ array_offsets: (0..=2).collect(),
+ array_mask: vec![true, true],
max_definition: 0,
level_type: LevelType::Root,
+ offset: 2,
+ length: 2,
};
- let batch_level = LevelInfo::new_from_batch(&batch);
+ let batch_level = LevelInfo::new(2, 2);
assert_eq!(&batch_level, &expected_batch_level);
// calculate the list's level
@@ -1229,14 +1302,14 @@ mod tests {
let list_level = levels.get(0).unwrap();
let expected_level = LevelInfo {
- definition: vec![3, 3, 3, 0, 3, 3, 3, 3, 3, 3, 3],
- repetition: Some(vec![0, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1]),
- array_offsets: vec![0, 1, 3, 3, 6, 10],
- array_mask: vec![
- true, true, true, false, true, true, true, true, true, true, true,
- ],
+ definition: vec![0, 3, 3, 3],
+ repetition: Some(vec![0, 0, 1, 1]),
+ array_offsets: vec![3, 3, 6],
+ array_mask: vec![false, true, true, true],
max_definition: 3,
level_type: LevelType::Primitive(true),
+ offset: 3,
+ length: 3,
};
assert_eq!(&list_level.definition, &expected_level.definition);
assert_eq!(&list_level.repetition, &expected_level.repetition);
@@ -1320,9 +1393,11 @@ mod tests {
array_mask: vec![true, true, true, true, true],
max_definition: 0,
level_type: LevelType::Root,
+ offset: 0,
+ length: 5,
};
- let batch_level = LevelInfo::new_from_batch(&batch);
+ let batch_level = LevelInfo::new(0, 5);
assert_eq!(&batch_level, &expected_batch_level);
// calculate the list's level
@@ -1347,6 +1422,8 @@ mod tests {
array_mask: vec![true, true, true, true, true],
max_definition: 1,
level_type: LevelType::Primitive(false),
+ offset: 0,
+ length: 5,
};
assert_eq!(list_level, &expected_level);
@@ -1360,6 +1437,8 @@ mod tests {
array_mask: vec![true, false, false, true, true],
max_definition: 1,
level_type: LevelType::Primitive(true),
+ offset: 0,
+ length: 5,
};
assert_eq!(list_level, &expected_level);
@@ -1373,6 +1452,8 @@ mod tests {
array_mask: vec![false, false, false, true, false],
max_definition: 2,
level_type: LevelType::Primitive(true),
+ offset: 0,
+ length: 5,
};
assert_eq!(list_level, &expected_level);
@@ -1386,6 +1467,8 @@ mod tests {
array_mask: vec![true, false, true, false, true],
max_definition: 3,
level_type: LevelType::Primitive(true),
+ offset: 0,
+ length: 5,
};
assert_eq!(list_level, &expected_level);
}
@@ -1399,6 +1482,8 @@ mod tests {
array_mask: vec![true, true, true, false, true, true, true],
max_definition: 3,
level_type: LevelType::Primitive(true),
+ offset: 0,
+ length: 6,
};
let expected = vec![0, 1, 2, 3, 4, 5];
@@ -1427,7 +1512,7 @@ mod tests {
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
.unwrap();
- let batch_level = LevelInfo::new_from_batch(&batch);
+ let batch_level = LevelInfo::new(0, batch.num_rows());
let struct_null_level =
batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
@@ -1451,7 +1536,7 @@ mod tests {
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)])
.unwrap();
- let batch_level = LevelInfo::new_from_batch(&batch);
+ let batch_level = LevelInfo::new(0, batch.num_rows());
let struct_non_null_level =
batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0));
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index b0b25f9..0d0cbef 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -290,6 +290,7 @@ impl WriterPropertiesBuilder {
/// Sets max size for a row group.
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
+ assert!(value > 0, "Cannot have a 0 max row group size");
self.max_row_group_size = value;
self
}