You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/20 12:49:12 UTC
[arrow-datafusion] branch master updated: Introduce RowLayout to represent rows for different purposes (#2261)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ec3543b46 Introduce RowLayout to represent rows for different purposes (#2261)
ec3543b46 is described below
commit ec3543b46e505086ec0f7920e4d9a0b3d328f90c
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Wed Apr 20 20:49:07 2022 +0800
Introduce RowLayout to represent rows for different purposes (#2261)
* Introduce RowLayout to represent rows for different purposes
* revert default
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* more &schema
* more tests and refactor
* logs for flasky test
* unwanted cargo change
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/core/benches/jit.rs | 35 +++-
datafusion/core/src/row/jit/mod.rs | 158 ++++++++++++++---
datafusion/core/src/row/jit/reader.rs | 9 +-
datafusion/core/src/row/jit/writer.rs | 12 +-
datafusion/core/src/row/layout.rs | 157 +++++++++++++++--
datafusion/core/src/row/mod.rs | 250 +++++++++++++++++++--------
datafusion/core/src/row/reader.rs | 71 ++++----
datafusion/core/src/row/writer.rs | 86 ++++-----
datafusion/core/src/test/mod.rs | 6 +-
datafusion/core/tests/sql/explain_analyze.rs | 13 +-
10 files changed, 579 insertions(+), 218 deletions(-)
diff --git a/datafusion/core/benches/jit.rs b/datafusion/core/benches/jit.rs
index 6abebc294..0c6de319d 100644
--- a/datafusion/core/benches/jit.rs
+++ b/datafusion/core/benches/jit.rs
@@ -23,7 +23,9 @@ extern crate datafusion;
mod data_utils;
use crate::criterion::Criterion;
use crate::data_utils::{create_record_batches, create_schema};
-use datafusion::row::writer::{bench_write_batch, bench_write_batch_jit};
+use datafusion::row::jit::writer::bench_write_batch_jit;
+use datafusion::row::writer::bench_write_batch;
+use datafusion::row::RowType;
use std::sync::Arc;
fn criterion_benchmark(c: &mut Criterion) {
@@ -35,15 +37,38 @@ fn criterion_benchmark(c: &mut Criterion) {
let batches =
create_record_batches(schema.clone(), array_len, partitions_len, batch_size);
- c.bench_function("row serializer", |b| {
+ c.bench_function("compact row serializer", |b| {
b.iter(|| {
- criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap())
+ criterion::black_box(
+ bench_write_batch(&batches, schema.clone(), RowType::Compact).unwrap(),
+ )
})
});
- c.bench_function("row serializer jit", |b| {
+ c.bench_function("word aligned row serializer", |b| {
b.iter(|| {
- criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap())
+ criterion::black_box(
+ bench_write_batch(&batches, schema.clone(), RowType::WordAligned)
+ .unwrap(),
+ )
+ })
+ });
+
+ c.bench_function("compact row serializer jit", |b| {
+ b.iter(|| {
+ criterion::black_box(
+ bench_write_batch_jit(&batches, schema.clone(), RowType::Compact)
+ .unwrap(),
+ )
+ })
+ });
+
+ c.bench_function("word aligned row serializer jit", |b| {
+ b.iter(|| {
+ criterion::black_box(
+ bench_write_batch_jit(&batches, schema.clone(), RowType::WordAligned)
+ .unwrap(),
+ )
})
});
}
diff --git a/datafusion/core/src/row/jit/mod.rs b/datafusion/core/src/row/jit/mod.rs
index fbb6efe3c..7ee76a9b4 100644
--- a/datafusion/core/src/row/jit/mod.rs
+++ b/datafusion/core/src/row/jit/mod.rs
@@ -17,8 +17,8 @@
//! Just-In-Time(JIT) version for row reader and writers
-mod reader;
-mod writer;
+pub mod reader;
+pub mod writer;
#[macro_export]
/// register external functions to the assembler
@@ -46,6 +46,7 @@ mod tests {
use crate::error::Result;
use crate::row::jit::reader::read_as_batch_jit;
use crate::row::jit::writer::write_batch_unchecked_jit;
+ use crate::row::layout::RowType::{Compact, WordAligned};
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_jit::api::Assembler;
@@ -53,26 +54,26 @@ mod tests {
use DataType::*;
macro_rules! fn_test_single_type {
- ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
+ ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
paste::item! {
#[test]
#[allow(non_snake_case)]
- fn [<test_single_ $TYPE _jit>]() -> Result<()> {
+ fn [<test_ $ROWTYPE _single_ $TYPE _jit>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)]));
let a = $ARRAY::from($VEC);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
- { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
- let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
+ { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
+ let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}
#[test]
#[allow(non_snake_case)]
- fn [<test_single_ $TYPE _jit_null_free>]() -> Result<()> {
+ fn [<test_ $ROWTYPE _single_ $TYPE _jit_null_free>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
let a = $ARRAY::from(v);
@@ -80,8 +81,8 @@ mod tests {
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
- { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
- let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
+ { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
+ let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}
@@ -92,85 +93,190 @@ mod tests {
fn_test_single_type!(
BooleanArray,
Boolean,
- vec![Some(true), Some(false), None, Some(true), None]
+ vec![Some(true), Some(false), None, Some(true), None],
+ Compact
+ );
+
+ fn_test_single_type!(
+ BooleanArray,
+ Boolean,
+ vec![Some(true), Some(false), None, Some(true), None],
+ WordAligned
);
fn_test_single_type!(
Int8Array,
Int8,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ Int8Array,
+ Int8,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ Int16Array,
+ Int16,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
Int16Array,
Int16,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
Int32Array,
Int32,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ Int32Array,
+ Int32,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ Int64Array,
+ Int64,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
Int64Array,
Int64,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
UInt8Array,
UInt8,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ UInt8Array,
+ UInt8,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
UInt16Array,
UInt16,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ UInt16Array,
+ UInt16,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ UInt32Array,
+ UInt32,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
UInt32Array,
UInt32,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ UInt64Array,
+ UInt64,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
UInt64Array,
UInt64,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ Float32Array,
+ Float32,
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+ Compact
);
fn_test_single_type!(
Float32Array,
Float32,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ Float64Array,
+ Float64,
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+ Compact
);
fn_test_single_type!(
Float64Array,
Float64,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ Date32Array,
+ Date32,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
Date32Array,
Date32,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ Date64Array,
+ Date64,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
Date64Array,
Date64,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
StringArray,
Utf8,
- vec![Some("hello"), Some("world"), None, Some(""), Some("")]
+ vec![Some("hello"), Some("world"), None, Some(""), Some("")],
+ Compact
);
#[test]
@@ -190,10 +296,11 @@ mod tests {
0,
schema.clone(),
&assembler,
+ Compact,
)?
};
let output_batch =
- { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
+ { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
@@ -214,10 +321,11 @@ mod tests {
0,
schema.clone(),
&assembler,
+ Compact,
)?
};
let output_batch =
- { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
+ { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
diff --git a/datafusion/core/src/row/jit/reader.rs b/datafusion/core/src/row/jit/reader.rs
index c10183667..80e10131f 100644
--- a/datafusion/core/src/row/jit/reader.rs
+++ b/datafusion/core/src/row/jit/reader.rs
@@ -20,6 +20,7 @@
use crate::error::{DataFusionError, Result};
use crate::reg_fn;
use crate::row::jit::fn_name;
+use crate::row::layout::RowType;
use crate::row::reader::RowReader;
use crate::row::reader::*;
use crate::row::MutableRecordBatch;
@@ -38,10 +39,11 @@ pub fn read_as_batch_jit(
schema: Arc<Schema>,
offsets: &[usize],
assembler: &Assembler,
+ row_type: RowType,
) -> Result<RecordBatch> {
let row_num = offsets.len();
let mut output = MutableRecordBatch::new(row_num, schema.clone());
- let mut row = RowReader::new(&schema);
+ let mut row = RowReader::new(&schema, row_type);
register_read_functions(assembler)?;
let gen_func = gen_read_row(&schema, assembler)?;
let mut jit = assembler.create_jit();
@@ -102,10 +104,7 @@ fn register_read_functions(asm: &Assembler) -> Result<()> {
Ok(())
}
-fn gen_read_row(
- schema: &Arc<Schema>,
- assembler: &Assembler,
-) -> Result<GeneratedFunction> {
+fn gen_read_row(schema: &Schema, assembler: &Assembler) -> Result<GeneratedFunction> {
use DataType::*;
let mut builder = assembler
.new_func_builder("read_row")
diff --git a/datafusion/core/src/row/jit/writer.rs b/datafusion/core/src/row/jit/writer.rs
index 5b077caf6..ae9ff1308 100644
--- a/datafusion/core/src/row/jit/writer.rs
+++ b/datafusion/core/src/row/jit/writer.rs
@@ -20,6 +20,7 @@
use crate::error::Result;
use crate::reg_fn;
use crate::row::jit::fn_name;
+use crate::row::layout::RowType;
use crate::row::schema_null_free;
use crate::row::writer::RowWriter;
use crate::row::writer::*;
@@ -43,8 +44,9 @@ pub fn write_batch_unchecked_jit(
row_idx: usize,
schema: Arc<Schema>,
assembler: &Assembler,
+ row_type: RowType,
) -> Result<Vec<usize>> {
- let mut writer = RowWriter::new(&schema);
+ let mut writer = RowWriter::new(&schema, row_type);
let mut current_offset = offset;
let mut offsets = vec![];
register_write_functions(assembler)?;
@@ -74,9 +76,10 @@ pub fn write_batch_unchecked_jit(
pub fn bench_write_batch_jit(
batches: &[Vec<RecordBatch>],
schema: Arc<Schema>,
+ row_type: RowType,
) -> Result<Vec<usize>> {
let assembler = Assembler::default();
- let mut writer = RowWriter::new(&schema);
+ let mut writer = RowWriter::new(&schema, row_type);
let mut lengths = vec![];
register_write_functions(&assembler)?;
let gen_func = gen_write_row(&schema, &assembler)?;
@@ -126,10 +129,7 @@ fn register_write_functions(asm: &Assembler) -> Result<()> {
Ok(())
}
-fn gen_write_row(
- schema: &Arc<Schema>,
- assembler: &Assembler,
-) -> Result<GeneratedFunction> {
+fn gen_write_row(schema: &Schema, assembler: &Assembler) -> Result<GeneratedFunction> {
let mut builder = assembler
.new_func_builder("write_row")
.param("row", PTR)
diff --git a/datafusion/core/src/row/layout.rs b/datafusion/core/src/row/layout.rs
index 71699d6eb..c14f9fa69 100644
--- a/datafusion/core/src/row/layout.rs
+++ b/datafusion/core/src/row/layout.rs
@@ -17,26 +17,94 @@
//! Various row layout for different use case
-use crate::row::{schema_null_free, var_length};
+use crate::row::schema_null_free;
use arrow::datatypes::{DataType, Schema};
use arrow::util::bit_util::{ceil, round_upto_power_of_2};
-use std::sync::Arc;
const UTF8_DEFAULT_SIZE: usize = 20;
const BINARY_DEFAULT_SIZE: usize = 100;
+#[derive(Copy, Clone, Debug)]
+/// Type of a RowLayout
+pub enum RowType {
+ /// This type of layout will store each field with minimum bytes for space efficiency.
+ /// Its typical use case represents a sorting payload that accesses all row fields as a unit.
+ Compact,
+ /// This type of layout will store one 8-byte word per field for CPU-friendly,
+ /// It is mainly used to represent the rows with frequently updated content,
+ /// for example, grouping state for hash aggregation.
+ WordAligned,
+ // RawComparable,
+}
+
+/// Reveals how the fields of a record are stored in the raw-bytes format
+#[derive(Debug)]
+pub(crate) struct RowLayout {
+ /// Type of the layout
+ #[allow(dead_code)]
+ row_type: RowType,
+ /// If a row is null free according to its schema
+ pub(crate) null_free: bool,
+ /// The number of bytes used to store null bits for each field.
+ pub(crate) null_width: usize,
+ /// Length in bytes for `values` part of the current tuple.
+ pub(crate) values_width: usize,
+ /// Total number of fields for each tuple.
+ pub(crate) field_count: usize,
+ /// Starting offset for each fields in the raw bytes.
+ pub(crate) field_offsets: Vec<usize>,
+}
+
+impl RowLayout {
+ pub(crate) fn new(schema: &Schema, row_type: RowType) -> Self {
+ assert!(row_supported(schema, row_type));
+ let null_free = schema_null_free(schema);
+ let field_count = schema.fields().len();
+ let null_width = if null_free {
+ 0
+ } else {
+ match row_type {
+ RowType::Compact => ceil(field_count, 8),
+ RowType::WordAligned => round_upto_power_of_2(ceil(field_count, 8), 8),
+ }
+ };
+ let (field_offsets, values_width) = match row_type {
+ RowType::Compact => compact_offsets(null_width, schema),
+ RowType::WordAligned => word_aligned_offsets(null_width, schema),
+ };
+ Self {
+ row_type,
+ null_free,
+ null_width,
+ values_width,
+ field_count,
+ field_offsets,
+ }
+ }
+
+ #[inline(always)]
+ pub(crate) fn fixed_part_width(&self) -> usize {
+ self.null_width + self.values_width
+ }
+}
+
/// Get relative offsets for each field and total width for values
-pub fn get_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) {
+fn compact_offsets(null_width: usize, schema: &Schema) -> (Vec<usize>, usize) {
let mut offsets = vec![];
let mut offset = null_width;
for f in schema.fields() {
offsets.push(offset);
- offset += type_width(f.data_type());
+ offset += compact_type_width(f.data_type());
}
(offsets, offset - null_width)
}
-fn type_width(dt: &DataType) -> usize {
+fn var_length(dt: &DataType) -> bool {
+ use DataType::*;
+ matches!(dt, Utf8 | Binary)
+}
+
+fn compact_type_width(dt: &DataType) -> usize {
use DataType::*;
if var_length(dt) {
return std::mem::size_of::<u64>();
@@ -50,13 +118,27 @@ fn type_width(dt: &DataType) -> usize {
}
}
+fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec<usize>, usize) {
+ let mut offsets = vec![];
+ let mut offset = null_width;
+ for f in schema.fields() {
+ offsets.push(offset);
+ assert!(!matches!(f.data_type(), DataType::Decimal(_, _)));
+ // All of the current support types can fit into one single 8-bytes word.
+ // When we decide to support Decimal type in the future, its width would be
+ // of two 8-bytes words and should adapt the width calculation below.
+ offset += 8;
+ }
+ (offsets, offset - null_width)
+}
+
/// Estimate row width based on schema
-pub fn estimate_row_width(schema: &Arc<Schema>) -> usize {
- let null_free = schema_null_free(schema);
- let field_count = schema.fields().len();
- let mut width = if null_free { 0 } else { ceil(field_count, 8) };
+pub(crate) fn estimate_row_width(schema: &Schema, layout: &RowLayout) -> usize {
+ let mut width = layout.fixed_part_width();
+ if matches!(layout.row_type, RowType::WordAligned) {
+ return width;
+ }
for f in schema.fields() {
- width += type_width(f.data_type());
match f.data_type() {
DataType::Utf8 => width += UTF8_DEFAULT_SIZE,
DataType::Binary => width += BINARY_DEFAULT_SIZE,
@@ -65,3 +147,58 @@ pub fn estimate_row_width(schema: &Arc<Schema>) -> usize {
}
round_upto_power_of_2(width, 8)
}
+
+/// Tell if we can create raw-bytes based rows since we currently
+/// has limited data type supports in the row format
+fn row_supported(schema: &Schema, row_type: RowType) -> bool {
+ schema
+ .fields()
+ .iter()
+ .all(|f| supported_type(f.data_type(), row_type))
+}
+
+fn supported_type(dt: &DataType, row_type: RowType) -> bool {
+ use DataType::*;
+
+ match row_type {
+ RowType::Compact => {
+ matches!(
+ dt,
+ Boolean
+ | UInt8
+ | UInt16
+ | UInt32
+ | UInt64
+ | Int8
+ | Int16
+ | Int32
+ | Int64
+ | Float32
+ | Float64
+ | Date32
+ | Date64
+ | Utf8
+ | Binary
+ )
+ }
+ // only fixed length types are supported for fast in-place update.
+ RowType::WordAligned => {
+ matches!(
+ dt,
+ Boolean
+ | UInt8
+ | UInt16
+ | UInt32
+ | UInt64
+ | Int8
+ | Int16
+ | Int32
+ | Int64
+ | Float32
+ | Float64
+ | Date32
+ | Date64
+ )
+ }
+ }
+}
diff --git a/datafusion/core/src/row/mod.rs b/datafusion/core/src/row/mod.rs
index 1fbf01275..f8e9ff273 100644
--- a/datafusion/core/src/row/mod.rs
+++ b/datafusion/core/src/row/mod.rs
@@ -48,61 +48,21 @@
//!
use arrow::array::{make_builder, ArrayBuilder};
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::Schema;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
+pub use layout::RowType;
use std::sync::Arc;
#[cfg(feature = "jit")]
-mod jit;
+pub mod jit;
mod layout;
pub mod reader;
mod validity;
pub mod writer;
-fn supported_type(dt: &DataType) -> bool {
- use DataType::*;
- matches!(
- dt,
- Boolean
- | UInt8
- | UInt16
- | UInt32
- | UInt64
- | Int8
- | Int16
- | Int32
- | Int64
- | Float32
- | Float64
- | Date32
- | Date64
- | Utf8
- | Binary
- )
-}
-
-/// Tell if we can create raw-bytes based rows since we currently
-/// has limited data type supports in the row format
-pub fn row_supported(schema: &Arc<Schema>) -> bool {
- schema
- .fields()
- .iter()
- .all(|f| supported_type(f.data_type()))
-}
-
-fn var_length(dt: &DataType) -> bool {
- use DataType::*;
- matches!(dt, Utf8 | Binary)
-}
-
-/// Tell if the row is of fixed size
-pub fn fixed_size(schema: &Arc<Schema>) -> bool {
- schema.fields().iter().all(|f| !var_length(f.data_type()))
-}
-
/// Tell if schema contains no nullable field
-pub fn schema_null_free(schema: &Arc<Schema>) -> bool {
+pub(crate) fn schema_null_free(schema: &Schema) -> bool {
schema.fields().iter().all(|f| !f.is_nullable())
}
@@ -126,7 +86,7 @@ impl MutableRecordBatch {
}
}
-fn new_arrays(schema: &Arc<Schema>, batch_size: usize) -> Vec<Box<dyn ArrayBuilder>> {
+fn new_arrays(schema: &Schema, batch_size: usize) -> Vec<Box<dyn ArrayBuilder>> {
schema
.fields()
.iter()
@@ -155,6 +115,7 @@ mod tests {
use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{collect, ExecutionPlan};
use crate::prelude::SessionContext;
+ use crate::row::layout::RowType::{Compact, WordAligned};
use crate::row::reader::read_as_batch;
use crate::row::writer::write_batch_unchecked;
use arrow::record_batch::RecordBatch;
@@ -166,33 +127,33 @@ mod tests {
use DataType::*;
macro_rules! fn_test_single_type {
- ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
+ ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
paste::item! {
#[test]
#[allow(non_snake_case)]
- fn [<test_single_ $TYPE>]() -> Result<()> {
+ fn [<test_ $ROWTYPE _single_ $TYPE>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)]));
let a = $ARRAY::from($VEC);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let row_offsets =
- { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+ { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), $ROWTYPE) };
+ let output_batch = { read_as_batch(&vector, schema, &row_offsets, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}
#[test]
#[allow(non_snake_case)]
- fn [<test_single_ $TYPE _null_free>]() -> Result<()> {
+ fn [<test_ $ROWTYPE _single_ $TYPE _null_free>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
let a = $ARRAY::from(v);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let row_offsets =
- { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+ { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), $ROWTYPE) };
+ let output_batch = { read_as_batch(&vector, schema, &row_offsets, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}
@@ -203,87 +164,202 @@ mod tests {
fn_test_single_type!(
BooleanArray,
Boolean,
- vec![Some(true), Some(false), None, Some(true), None]
+ vec![Some(true), Some(false), None, Some(true), None],
+ Compact
+ );
+
+ fn_test_single_type!(
+ BooleanArray,
+ Boolean,
+ vec![Some(true), Some(false), None, Some(true), None],
+ WordAligned
);
fn_test_single_type!(
Int8Array,
Int8,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ Int8Array,
+ Int8,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
Int16Array,
Int16,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ Int16Array,
+ Int16,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ Int32Array,
+ Int32,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
Int32Array,
Int32,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
Int64Array,
Int64,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ Int64Array,
+ Int64,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
UInt8Array,
UInt8,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ UInt8Array,
+ UInt8,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ UInt16Array,
+ UInt16,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
UInt16Array,
UInt16,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
UInt32Array,
UInt32,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ UInt32Array,
+ UInt32,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ UInt64Array,
+ UInt64,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
UInt64Array,
UInt64,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
Float32Array,
Float32,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ Float32Array,
+ Float32,
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+ WordAligned
);
fn_test_single_type!(
Float64Array,
Float64,
- vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+ Compact
+ );
+
+ fn_test_single_type!(
+ Float64Array,
+ Float64,
+ vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ Date32Array,
+ Date32,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
Date32Array,
Date32,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
+ );
+
+ fn_test_single_type!(
+ Date64Array,
+ Date64,
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ Compact
);
fn_test_single_type!(
Date64Array,
Date64,
- vec![Some(5), Some(7), None, Some(0), Some(111)]
+ vec![Some(5), Some(7), None, Some(0), Some(111)],
+ WordAligned
);
fn_test_single_type!(
StringArray,
Utf8,
- vec![Some("hello"), Some("world"), None, Some(""), Some("")]
+ vec![Some("hello"), Some("world"), None, Some(""), Some("")],
+ Compact
);
+ #[test]
+ #[should_panic(expected = "row_supported(schema, row_type)")]
+ fn test_unsupported_word_aligned_type() {
+ let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world"]));
+ let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+ let schema = batch.schema();
+ let mut vector = vec![0; 1024];
+ write_batch_unchecked(&mut vector, 0, &batch, 0, schema, WordAligned);
+ }
+
#[test]
fn test_single_binary() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, true)]));
@@ -293,8 +369,8 @@ mod tests {
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 8192];
let row_offsets =
- { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+ { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), Compact) };
+ let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
@@ -307,8 +383,8 @@ mod tests {
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 8192];
let row_offsets =
- { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+ { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), Compact) };
+ let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
@@ -327,25 +403,47 @@ mod tests {
let mut vector = vec![0; 20480];
let row_offsets =
- { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+ { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) };
+ let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
+ assert_eq!(*batch, output_batch);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_with_parquet_word_aligned() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
+ let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let schema = exec.schema().clone();
+
+ let batches = collect(exec, task_ctx).await?;
+ assert_eq!(1, batches.len());
+ let batch = &batches[0];
+
+ let mut vector = vec![0; 20480];
+ let row_offsets = {
+ write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned)
+ };
+ let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? };
assert_eq!(*batch, output_batch);
Ok(())
}
#[test]
- #[should_panic(expected = "supported(schema)")]
+ #[should_panic(expected = "row_supported(schema, row_type)")]
fn test_unsupported_type_write() {
let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8]));
let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
let schema = batch.schema();
let mut vector = vec![0; 1024];
- write_batch_unchecked(&mut vector, 0, &batch, 0, schema);
+ write_batch_unchecked(&mut vector, 0, &batch, 0, schema, Compact);
}
#[test]
- #[should_panic(expected = "supported(schema)")]
+ #[should_panic(expected = "row_supported(schema, row_type)")]
fn test_unsupported_type_read() {
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
@@ -354,7 +452,7 @@ mod tests {
)]));
let vector = vec![0; 1024];
let row_offsets = vec![0];
- read_as_batch(&vector, schema, &row_offsets).unwrap();
+ read_as_batch(&vector, schema, &row_offsets, Compact).unwrap();
}
async fn get_exec(
diff --git a/datafusion/core/src/row/reader.rs b/datafusion/core/src/row/reader.rs
index 4d9fb3136..abaf57c14 100644
--- a/datafusion/core/src/row/reader.rs
+++ b/datafusion/core/src/row/reader.rs
@@ -18,13 +18,13 @@
//! Accessing row from raw bytes
use crate::error::{DataFusionError, Result};
-use crate::row::layout::get_offsets;
+use crate::row::layout::{RowLayout, RowType};
use crate::row::validity::{all_valid, NullBitsFormatter};
-use crate::row::{row_supported, schema_null_free, MutableRecordBatch};
+use crate::row::MutableRecordBatch;
use arrow::array::*;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
-use arrow::util::bit_util::{ceil, get_bit_raw};
+use arrow::util::bit_util::get_bit_raw;
use std::sync::Arc;
/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch
@@ -32,23 +32,24 @@ pub fn read_as_batch(
data: &[u8],
schema: Arc<Schema>,
offsets: &[usize],
+ row_type: RowType,
) -> Result<RecordBatch> {
let row_num = offsets.len();
let mut output = MutableRecordBatch::new(row_num, schema.clone());
- let mut row = RowReader::new(&schema);
+ let mut row = RowReader::new(&schema, row_type);
for offset in offsets.iter().take(row_num) {
row.point_to(*offset, data);
read_row(&row, &mut output, &schema);
}
- output.output().map_err(DataFusionError::ArrowError)
+ Ok(output.output()?)
}
macro_rules! get_idx {
($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{
$SELF.assert_index_valid($IDX);
- let offset = $SELF.field_offsets[$IDX];
+ let offset = $SELF.field_offsets()[$IDX];
let start = $SELF.base_offset + offset;
let end = start + $WIDTH;
$NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap())
@@ -60,7 +61,7 @@ macro_rules! fn_get_idx {
paste::item! {
fn [<get_ $NATIVE>](&self, idx: usize) -> $NATIVE {
self.assert_index_valid(idx);
- let offset = self.field_offsets[idx];
+ let offset = self.field_offsets()[idx];
let start = self.base_offset + offset;
let end = start + $WIDTH;
$NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap())
@@ -85,32 +86,24 @@ macro_rules! fn_get_idx_opt {
/// Read the tuple `data[base_offset..]` we are currently pointing to
pub struct RowReader<'a> {
+ /// Layout on how to read each field
+ layout: RowLayout,
/// Raw bytes slice where the tuple stores
data: &'a [u8],
/// Start position for the current tuple in the raw bytes slice.
base_offset: usize,
- /// Total number of fields for each tuple.
- field_count: usize,
- /// The number of bytes used to store null bits for each field.
- null_width: usize,
- /// Starting offset for each fields in the raw bytes.
- /// For fixed length fields, it's where the actual data stores.
- /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64.
- field_offsets: Vec<usize>,
- /// If a row is null free according to its schema
- null_free: bool,
}
impl<'a> std::fmt::Debug for RowReader<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- if self.null_free {
+ if self.null_free() {
write!(f, "null_free")
} else {
let null_bits = self.null_bits();
write!(
f,
"{:?}",
- NullBitsFormatter::new(null_bits, self.field_count)
+ NullBitsFormatter::new(null_bits, self.layout.field_count)
)
}
}
@@ -118,19 +111,11 @@ impl<'a> std::fmt::Debug for RowReader<'a> {
impl<'a> RowReader<'a> {
/// new
- pub fn new(schema: &Arc<Schema>) -> Self {
- assert!(row_supported(schema));
- let null_free = schema_null_free(schema);
- let field_count = schema.fields().len();
- let null_width = if null_free { 0 } else { ceil(field_count, 8) };
- let (field_offsets, _) = get_offsets(null_width, schema);
+ pub fn new(schema: &Schema, row_type: RowType) -> Self {
Self {
+ layout: RowLayout::new(schema, row_type),
data: &[],
base_offset: 0,
- field_count,
- null_width,
- field_offsets,
- null_free,
}
}
@@ -142,26 +127,36 @@ impl<'a> RowReader<'a> {
#[inline]
fn assert_index_valid(&self, idx: usize) {
- assert!(idx < self.field_count);
+ assert!(idx < self.layout.field_count);
+ }
+
+ #[inline(always)]
+ fn field_offsets(&self) -> &[usize] {
+ &self.layout.field_offsets
+ }
+
+ #[inline(always)]
+ fn null_free(&self) -> bool {
+ self.layout.null_free
}
#[inline(always)]
fn null_bits(&self) -> &[u8] {
- if self.null_free {
+ if self.null_free() {
&[]
} else {
let start = self.base_offset;
- &self.data[start..start + self.null_width]
+ &self.data[start..start + self.layout.null_width]
}
}
#[inline(always)]
fn all_valid(&self) -> bool {
- if self.null_free {
+ if self.null_free() {
true
} else {
let null_bits = self.null_bits();
- all_valid(null_bits, self.field_count)
+ all_valid(null_bits, self.layout.field_count)
}
}
@@ -171,14 +166,14 @@ impl<'a> RowReader<'a> {
fn get_bool(&self, idx: usize) -> bool {
self.assert_index_valid(idx);
- let offset = self.field_offsets[idx];
+ let offset = self.field_offsets()[idx];
let value = &self.data[self.base_offset + offset..];
value[0] != 0
}
fn get_u8(&self, idx: usize) -> u8 {
self.assert_index_valid(idx);
- let offset = self.field_offsets[idx];
+ let offset = self.field_offsets()[idx];
self.data[self.base_offset + offset]
}
@@ -257,8 +252,8 @@ impl<'a> RowReader<'a> {
}
/// Read the row currently pointed by RowWriter to the output columnar batch buffer
-pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Arc<Schema>) {
- if row.null_free || row.all_valid() {
+pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Schema) {
+ if row.all_valid() {
for ((col_idx, to), field) in batch
.arrays
.iter_mut()
diff --git a/datafusion/core/src/row/writer.rs b/datafusion/core/src/row/writer.rs
index 9cb208d03..920eb9963 100644
--- a/datafusion/core/src/row/writer.rs
+++ b/datafusion/core/src/row/writer.rs
@@ -18,12 +18,11 @@
//! Reusable row writer backed by Vec<u8> to stitch attributes together
use crate::error::Result;
-use crate::row::layout::{estimate_row_width, get_offsets};
-use crate::row::{fixed_size, row_supported, schema_null_free};
+use crate::row::layout::{estimate_row_width, RowLayout, RowType};
use arrow::array::*;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
-use arrow::util::bit_util::{ceil, round_upto_power_of_2, set_bit_raw, unset_bit_raw};
+use arrow::util::bit_util::{round_upto_power_of_2, set_bit_raw, unset_bit_raw};
use std::cmp::max;
use std::sync::Arc;
@@ -37,8 +36,9 @@ pub fn write_batch_unchecked(
batch: &RecordBatch,
row_idx: usize,
schema: Arc<Schema>,
+ row_type: RowType,
) -> Vec<usize> {
- let mut writer = RowWriter::new(&schema);
+ let mut writer = RowWriter::new(&schema, row_type);
let mut current_offset = offset;
let mut offsets = vec![];
let columns = batch.columns();
@@ -58,8 +58,9 @@ pub fn write_batch_unchecked(
pub fn bench_write_batch(
batches: &[Vec<RecordBatch>],
schema: Arc<Schema>,
+ row_type: RowType,
) -> Result<Vec<usize>> {
- let mut writer = RowWriter::new(&schema);
+ let mut writer = RowWriter::new(&schema, row_type);
let mut lengths = vec![];
for batch in batches.iter().flatten() {
@@ -77,7 +78,7 @@ pub fn bench_write_batch(
macro_rules! set_idx {
($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
$SELF.assert_index_valid($IDX);
- let offset = $SELF.field_offsets[$IDX];
+ let offset = $SELF.field_offsets()[$IDX];
$SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
}};
}
@@ -87,7 +88,7 @@ macro_rules! fn_set_idx {
paste::item! {
fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
self.assert_index_valid(idx);
- let offset = self.field_offsets[idx];
+ let offset = self.field_offsets()[idx];
self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
}
}
@@ -96,51 +97,30 @@ macro_rules! fn_set_idx {
/// Reusable row writer backed by Vec<u8>
pub struct RowWriter {
+ /// Layout on how to write each field
+ layout: RowLayout,
/// buffer for the current tuple been written.
data: Vec<u8>,
- /// Total number of fields for each tuple.
- field_count: usize,
/// Length in bytes for the current tuple, 8-bytes word aligned.
pub(crate) row_width: usize,
- /// The number of bytes used to store null bits for each field.
- null_width: usize,
- /// Length in bytes for `values` part of the current tuple.
- values_width: usize,
/// Length in bytes for `variable length data` part of the current tuple.
varlena_width: usize,
/// Current offset for the next variable length field to write to.
varlena_offset: usize,
- /// Starting offset for each fields in the raw bytes.
- /// For fixed length fields, it's where the actual data stores.
- /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64.
- field_offsets: Vec<usize>,
- /// If a row is null free according to its schema
- null_free: bool,
}
impl RowWriter {
/// new
- pub fn new(schema: &Arc<Schema>) -> Self {
- assert!(row_supported(schema));
- let null_free = schema_null_free(schema);
- let field_count = schema.fields().len();
- let null_width = if null_free { 0 } else { ceil(field_count, 8) };
- let (field_offsets, values_width) = get_offsets(null_width, schema);
- let mut init_capacity = estimate_row_width(schema);
- if !fixed_size(schema) {
- // double the capacity to avoid repeated resize
- init_capacity *= 2;
- }
+ pub fn new(schema: &Schema, row_type: RowType) -> Self {
+ let layout = RowLayout::new(schema, row_type);
+ let init_capacity = estimate_row_width(schema, &layout);
+ let varlena_offset = layout.fixed_part_width();
Self {
+ layout,
data: vec![0; init_capacity],
- field_count,
row_width: 0,
- null_width,
- values_width,
varlena_width: 0,
- varlena_offset: null_width + values_width,
- field_offsets,
- null_free,
+ varlena_offset,
}
}
@@ -149,20 +129,30 @@ impl RowWriter {
self.data.fill(0);
self.row_width = 0;
self.varlena_width = 0;
- self.varlena_offset = self.null_width + self.values_width;
+ self.varlena_offset = self.layout.fixed_part_width();
}
#[inline]
fn assert_index_valid(&self, idx: usize) {
- assert!(idx < self.field_count);
+ assert!(idx < self.layout.field_count);
+ }
+
+ #[inline(always)]
+ fn field_offsets(&self) -> &[usize] {
+ &self.layout.field_offsets
+ }
+
+ #[inline(always)]
+ fn null_free(&self) -> bool {
+ self.layout.null_free
}
pub(crate) fn set_null_at(&mut self, idx: usize) {
assert!(
- !self.null_free,
+ !self.null_free(),
"Unexpected call to set_null_at on null-free row writer"
);
- let null_bits = &mut self.data[0..self.null_width];
+ let null_bits = &mut self.data[0..self.layout.null_width];
unsafe {
unset_bit_raw(null_bits.as_mut_ptr(), idx);
}
@@ -170,10 +160,10 @@ impl RowWriter {
pub(crate) fn set_non_null_at(&mut self, idx: usize) {
assert!(
- !self.null_free,
+ !self.null_free(),
"Unexpected call to set_non_null_at on null-free row writer"
);
- let null_bits = &mut self.data[0..self.null_width];
+ let null_bits = &mut self.data[0..self.layout.null_width];
unsafe {
set_bit_raw(null_bits.as_mut_ptr(), idx);
}
@@ -181,13 +171,13 @@ impl RowWriter {
fn set_bool(&mut self, idx: usize, value: bool) {
self.assert_index_valid(idx);
- let offset = self.field_offsets[idx];
+ let offset = self.field_offsets()[idx];
self.data[offset] = if value { 1 } else { 0 };
}
fn set_u8(&mut self, idx: usize, value: u8) {
self.assert_index_valid(idx);
- let offset = self.field_offsets[idx];
+ let offset = self.field_offsets()[idx];
self.data[offset] = value;
}
@@ -202,7 +192,7 @@ impl RowWriter {
fn set_i8(&mut self, idx: usize, value: i8) {
self.assert_index_valid(idx);
- let offset = self.field_offsets[idx];
+ let offset = self.field_offsets()[idx];
self.data[offset] = value.to_le_bytes()[0];
}
@@ -241,7 +231,7 @@ impl RowWriter {
}
fn current_width(&self) -> usize {
- self.null_width + self.values_width + self.varlena_width
+ self.layout.fixed_part_width() + self.varlena_width
}
/// End each row at 8-byte word boundary.
@@ -263,11 +253,11 @@ impl RowWriter {
pub fn write_row(
row: &mut RowWriter,
row_idx: usize,
- schema: &Arc<Schema>,
+ schema: &Schema,
columns: &[ArrayRef],
) -> usize {
// Get the row from the batch denoted by row_idx
- if row.null_free {
+ if row.null_free() {
for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) {
write_field(i, row_idx, col, f.data_type(), row);
}
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 4bdf6d666..692717828 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -216,7 +216,7 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send
}
/// Create vector batches
-pub fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
+pub fn create_vec_batches(schema: &Schema, n: usize) -> Vec<RecordBatch> {
let batch = create_batch(schema);
let mut vec = Vec::with_capacity(n);
for _ in 0..n {
@@ -226,9 +226,9 @@ pub fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
}
/// Create batch
-fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
+fn create_batch(schema: &Schema) -> RecordBatch {
RecordBatch::try_new(
- schema.clone(),
+ Arc::new(schema.clone()),
vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
)
.unwrap()
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index ef6ea52dd..a124311aa 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -16,6 +16,7 @@
// under the License.
use super::*;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
#[tokio::test]
async fn explain_analyze_baseline_metrics() {
@@ -136,8 +137,16 @@ async fn explain_analyze_baseline_metrics() {
}
let metrics = plan.metrics().unwrap().aggregate_by_partition();
- assert!(metrics.output_rows().unwrap() > 0);
- assert!(metrics.elapsed_compute().unwrap() > 0);
+ assert!(
+ metrics.output_rows().unwrap() > 0,
+ "plan: {}",
+ DisplayableExecutionPlan::with_metrics(plan).one_line()
+ );
+ assert!(
+ metrics.elapsed_compute().unwrap() > 0,
+ "plan: {}",
+ DisplayableExecutionPlan::with_metrics(plan).one_line()
+ );
let mut saw_start = false;
let mut saw_end = false;