You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/20 12:49:12 UTC

[arrow-datafusion] branch master updated: Introduce RowLayout to represent rows for different purposes (#2261)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ec3543b46 Introduce RowLayout to represent rows for different purposes (#2261)
ec3543b46 is described below

commit ec3543b46e505086ec0f7920e4d9a0b3d328f90c
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Wed Apr 20 20:49:07 2022 +0800

    Introduce RowLayout to represent rows for different purposes (#2261)
    
    * Introduce RowLayout to represent rows for different purposes
    
    * revert default
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * more &schema
    
    * more tests and refactor
    
    * logs for flasky test
    
    * unwanted cargo change
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/core/benches/jit.rs               |  35 +++-
 datafusion/core/src/row/jit/mod.rs           | 158 ++++++++++++++---
 datafusion/core/src/row/jit/reader.rs        |   9 +-
 datafusion/core/src/row/jit/writer.rs        |  12 +-
 datafusion/core/src/row/layout.rs            | 157 +++++++++++++++--
 datafusion/core/src/row/mod.rs               | 250 +++++++++++++++++++--------
 datafusion/core/src/row/reader.rs            |  71 ++++----
 datafusion/core/src/row/writer.rs            |  86 ++++-----
 datafusion/core/src/test/mod.rs              |   6 +-
 datafusion/core/tests/sql/explain_analyze.rs |  13 +-
 10 files changed, 579 insertions(+), 218 deletions(-)

diff --git a/datafusion/core/benches/jit.rs b/datafusion/core/benches/jit.rs
index 6abebc294..0c6de319d 100644
--- a/datafusion/core/benches/jit.rs
+++ b/datafusion/core/benches/jit.rs
@@ -23,7 +23,9 @@ extern crate datafusion;
 mod data_utils;
 use crate::criterion::Criterion;
 use crate::data_utils::{create_record_batches, create_schema};
-use datafusion::row::writer::{bench_write_batch, bench_write_batch_jit};
+use datafusion::row::jit::writer::bench_write_batch_jit;
+use datafusion::row::writer::bench_write_batch;
+use datafusion::row::RowType;
 use std::sync::Arc;
 
 fn criterion_benchmark(c: &mut Criterion) {
@@ -35,15 +37,38 @@ fn criterion_benchmark(c: &mut Criterion) {
     let batches =
         create_record_batches(schema.clone(), array_len, partitions_len, batch_size);
 
-    c.bench_function("row serializer", |b| {
+    c.bench_function("compact row serializer", |b| {
         b.iter(|| {
-            criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap())
+            criterion::black_box(
+                bench_write_batch(&batches, schema.clone(), RowType::Compact).unwrap(),
+            )
         })
     });
 
-    c.bench_function("row serializer jit", |b| {
+    c.bench_function("word aligned row serializer", |b| {
         b.iter(|| {
-            criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap())
+            criterion::black_box(
+                bench_write_batch(&batches, schema.clone(), RowType::WordAligned)
+                    .unwrap(),
+            )
+        })
+    });
+
+    c.bench_function("compact row serializer jit", |b| {
+        b.iter(|| {
+            criterion::black_box(
+                bench_write_batch_jit(&batches, schema.clone(), RowType::Compact)
+                    .unwrap(),
+            )
+        })
+    });
+
+    c.bench_function("word aligned row serializer jit", |b| {
+        b.iter(|| {
+            criterion::black_box(
+                bench_write_batch_jit(&batches, schema.clone(), RowType::WordAligned)
+                    .unwrap(),
+            )
         })
     });
 }
diff --git a/datafusion/core/src/row/jit/mod.rs b/datafusion/core/src/row/jit/mod.rs
index fbb6efe3c..7ee76a9b4 100644
--- a/datafusion/core/src/row/jit/mod.rs
+++ b/datafusion/core/src/row/jit/mod.rs
@@ -17,8 +17,8 @@
 
 //! Just-In-Time(JIT) version for row reader and writers
 
-mod reader;
-mod writer;
+pub mod reader;
+pub mod writer;
 
 #[macro_export]
 /// register external functions to the assembler
@@ -46,6 +46,7 @@ mod tests {
     use crate::error::Result;
     use crate::row::jit::reader::read_as_batch_jit;
     use crate::row::jit::writer::write_batch_unchecked_jit;
+    use crate::row::layout::RowType::{Compact, WordAligned};
     use arrow::record_batch::RecordBatch;
     use arrow::{array::*, datatypes::*};
     use datafusion_jit::api::Assembler;
@@ -53,26 +54,26 @@ mod tests {
     use DataType::*;
 
     macro_rules! fn_test_single_type {
-        ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
+        ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
             paste::item! {
                 #[test]
                 #[allow(non_snake_case)]
-                fn [<test_single_ $TYPE _jit>]() -> Result<()> {
+                fn [<test_ $ROWTYPE _single_ $TYPE _jit>]() -> Result<()> {
                     let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)]));
                     let a = $ARRAY::from($VEC);
                     let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
                     let mut vector = vec![0; 1024];
                     let assembler = Assembler::default();
                     let row_offsets =
-                        { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
-                    let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
+                        { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
+                    let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
                     assert_eq!(batch, output_batch);
                     Ok(())
                 }
 
                 #[test]
                 #[allow(non_snake_case)]
-                fn [<test_single_ $TYPE _jit_null_free>]() -> Result<()> {
+                fn [<test_ $ROWTYPE _single_ $TYPE _jit_null_free>]() -> Result<()> {
                     let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
                     let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
                     let a = $ARRAY::from(v);
@@ -80,8 +81,8 @@ mod tests {
                     let mut vector = vec![0; 1024];
                     let assembler = Assembler::default();
                     let row_offsets =
-                        { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
-                    let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
+                        { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
+                    let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
                     assert_eq!(batch, output_batch);
                     Ok(())
                 }
@@ -92,85 +93,190 @@ mod tests {
     fn_test_single_type!(
         BooleanArray,
         Boolean,
-        vec![Some(true), Some(false), None, Some(true), None]
+        vec![Some(true), Some(false), None, Some(true), None],
+        Compact
+    );
+
+    fn_test_single_type!(
+        BooleanArray,
+        Boolean,
+        vec![Some(true), Some(false), None, Some(true), None],
+        WordAligned
     );
 
     fn_test_single_type!(
         Int8Array,
         Int8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        Int8Array,
+        Int8,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        Int16Array,
+        Int16,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         Int16Array,
         Int16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         Int32Array,
         Int32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        Int32Array,
+        Int32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        Int64Array,
+        Int64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         Int64Array,
         Int64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         UInt8Array,
         UInt8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        UInt8Array,
+        UInt8,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         UInt16Array,
         UInt16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        UInt16Array,
+        UInt16,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        UInt32Array,
+        UInt32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         UInt32Array,
         UInt32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        UInt64Array,
+        UInt64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         UInt64Array,
         UInt64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        Float32Array,
+        Float32,
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+        Compact
     );
 
     fn_test_single_type!(
         Float32Array,
         Float32,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        Float64Array,
+        Float64,
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+        Compact
     );
 
     fn_test_single_type!(
         Float64Array,
         Float64,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        Date32Array,
+        Date32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         Date32Array,
         Date32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        Date64Array,
+        Date64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         Date64Array,
         Date64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         StringArray,
         Utf8,
-        vec![Some("hello"), Some("world"), None, Some(""), Some("")]
+        vec![Some("hello"), Some("world"), None, Some(""), Some("")],
+        Compact
     );
 
     #[test]
@@ -190,10 +296,11 @@ mod tests {
                 0,
                 schema.clone(),
                 &assembler,
+                Compact,
             )?
         };
         let output_batch =
-            { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
+            { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
         assert_eq!(batch, output_batch);
         Ok(())
     }
@@ -214,10 +321,11 @@ mod tests {
                 0,
                 schema.clone(),
                 &assembler,
+                Compact,
             )?
         };
         let output_batch =
-            { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
+            { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
         assert_eq!(batch, output_batch);
         Ok(())
     }
diff --git a/datafusion/core/src/row/jit/reader.rs b/datafusion/core/src/row/jit/reader.rs
index c10183667..80e10131f 100644
--- a/datafusion/core/src/row/jit/reader.rs
+++ b/datafusion/core/src/row/jit/reader.rs
@@ -20,6 +20,7 @@
 use crate::error::{DataFusionError, Result};
 use crate::reg_fn;
 use crate::row::jit::fn_name;
+use crate::row::layout::RowType;
 use crate::row::reader::RowReader;
 use crate::row::reader::*;
 use crate::row::MutableRecordBatch;
@@ -38,10 +39,11 @@ pub fn read_as_batch_jit(
     schema: Arc<Schema>,
     offsets: &[usize],
     assembler: &Assembler,
+    row_type: RowType,
 ) -> Result<RecordBatch> {
     let row_num = offsets.len();
     let mut output = MutableRecordBatch::new(row_num, schema.clone());
-    let mut row = RowReader::new(&schema);
+    let mut row = RowReader::new(&schema, row_type);
     register_read_functions(assembler)?;
     let gen_func = gen_read_row(&schema, assembler)?;
     let mut jit = assembler.create_jit();
@@ -102,10 +104,7 @@ fn register_read_functions(asm: &Assembler) -> Result<()> {
     Ok(())
 }
 
-fn gen_read_row(
-    schema: &Arc<Schema>,
-    assembler: &Assembler,
-) -> Result<GeneratedFunction> {
+fn gen_read_row(schema: &Schema, assembler: &Assembler) -> Result<GeneratedFunction> {
     use DataType::*;
     let mut builder = assembler
         .new_func_builder("read_row")
diff --git a/datafusion/core/src/row/jit/writer.rs b/datafusion/core/src/row/jit/writer.rs
index 5b077caf6..ae9ff1308 100644
--- a/datafusion/core/src/row/jit/writer.rs
+++ b/datafusion/core/src/row/jit/writer.rs
@@ -20,6 +20,7 @@
 use crate::error::Result;
 use crate::reg_fn;
 use crate::row::jit::fn_name;
+use crate::row::layout::RowType;
 use crate::row::schema_null_free;
 use crate::row::writer::RowWriter;
 use crate::row::writer::*;
@@ -43,8 +44,9 @@ pub fn write_batch_unchecked_jit(
     row_idx: usize,
     schema: Arc<Schema>,
     assembler: &Assembler,
+    row_type: RowType,
 ) -> Result<Vec<usize>> {
-    let mut writer = RowWriter::new(&schema);
+    let mut writer = RowWriter::new(&schema, row_type);
     let mut current_offset = offset;
     let mut offsets = vec![];
     register_write_functions(assembler)?;
@@ -74,9 +76,10 @@ pub fn write_batch_unchecked_jit(
 pub fn bench_write_batch_jit(
     batches: &[Vec<RecordBatch>],
     schema: Arc<Schema>,
+    row_type: RowType,
 ) -> Result<Vec<usize>> {
     let assembler = Assembler::default();
-    let mut writer = RowWriter::new(&schema);
+    let mut writer = RowWriter::new(&schema, row_type);
     let mut lengths = vec![];
     register_write_functions(&assembler)?;
     let gen_func = gen_write_row(&schema, &assembler)?;
@@ -126,10 +129,7 @@ fn register_write_functions(asm: &Assembler) -> Result<()> {
     Ok(())
 }
 
-fn gen_write_row(
-    schema: &Arc<Schema>,
-    assembler: &Assembler,
-) -> Result<GeneratedFunction> {
+fn gen_write_row(schema: &Schema, assembler: &Assembler) -> Result<GeneratedFunction> {
     let mut builder = assembler
         .new_func_builder("write_row")
         .param("row", PTR)
diff --git a/datafusion/core/src/row/layout.rs b/datafusion/core/src/row/layout.rs
index 71699d6eb..c14f9fa69 100644
--- a/datafusion/core/src/row/layout.rs
+++ b/datafusion/core/src/row/layout.rs
@@ -17,26 +17,94 @@
 
 //! Various row layout for different use case
 
-use crate::row::{schema_null_free, var_length};
+use crate::row::schema_null_free;
 use arrow::datatypes::{DataType, Schema};
 use arrow::util::bit_util::{ceil, round_upto_power_of_2};
-use std::sync::Arc;
 
 const UTF8_DEFAULT_SIZE: usize = 20;
 const BINARY_DEFAULT_SIZE: usize = 100;
 
+#[derive(Copy, Clone, Debug)]
+/// Type of a RowLayout
+pub enum RowType {
+    /// This type of layout will store each field with minimum bytes for space efficiency.
+    /// Its typical use case represents a sorting payload that accesses all row fields as a unit.
+    Compact,
+    /// This type of layout will store one 8-byte word per field for CPU-friendly,
+    /// It is mainly used to represent the rows with frequently updated content,
+    /// for example, grouping state for hash aggregation.
+    WordAligned,
+    // RawComparable,
+}
+
+/// Reveals how the fields of a record are stored in the raw-bytes format
+#[derive(Debug)]
+pub(crate) struct RowLayout {
+    /// Type of the layout
+    #[allow(dead_code)]
+    row_type: RowType,
+    /// If a row is null free according to its schema
+    pub(crate) null_free: bool,
+    /// The number of bytes used to store null bits for each field.
+    pub(crate) null_width: usize,
+    /// Length in bytes for `values` part of the current tuple.
+    pub(crate) values_width: usize,
+    /// Total number of fields for each tuple.
+    pub(crate) field_count: usize,
+    /// Starting offset for each fields in the raw bytes.
+    pub(crate) field_offsets: Vec<usize>,
+}
+
+impl RowLayout {
+    pub(crate) fn new(schema: &Schema, row_type: RowType) -> Self {
+        assert!(row_supported(schema, row_type));
+        let null_free = schema_null_free(schema);
+        let field_count = schema.fields().len();
+        let null_width = if null_free {
+            0
+        } else {
+            match row_type {
+                RowType::Compact => ceil(field_count, 8),
+                RowType::WordAligned => round_upto_power_of_2(ceil(field_count, 8), 8),
+            }
+        };
+        let (field_offsets, values_width) = match row_type {
+            RowType::Compact => compact_offsets(null_width, schema),
+            RowType::WordAligned => word_aligned_offsets(null_width, schema),
+        };
+        Self {
+            row_type,
+            null_free,
+            null_width,
+            values_width,
+            field_count,
+            field_offsets,
+        }
+    }
+
+    #[inline(always)]
+    pub(crate) fn fixed_part_width(&self) -> usize {
+        self.null_width + self.values_width
+    }
+}
+
 /// Get relative offsets for each field and total width for values
-pub fn get_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) {
+fn compact_offsets(null_width: usize, schema: &Schema) -> (Vec<usize>, usize) {
     let mut offsets = vec![];
     let mut offset = null_width;
     for f in schema.fields() {
         offsets.push(offset);
-        offset += type_width(f.data_type());
+        offset += compact_type_width(f.data_type());
     }
     (offsets, offset - null_width)
 }
 
-fn type_width(dt: &DataType) -> usize {
+fn var_length(dt: &DataType) -> bool {
+    use DataType::*;
+    matches!(dt, Utf8 | Binary)
+}
+
+fn compact_type_width(dt: &DataType) -> usize {
     use DataType::*;
     if var_length(dt) {
         return std::mem::size_of::<u64>();
@@ -50,13 +118,27 @@ fn type_width(dt: &DataType) -> usize {
     }
 }
 
+fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec<usize>, usize) {
+    let mut offsets = vec![];
+    let mut offset = null_width;
+    for f in schema.fields() {
+        offsets.push(offset);
+        assert!(!matches!(f.data_type(), DataType::Decimal(_, _)));
+        // All of the current support types can fit into one single 8-bytes word.
+        // When we decide to support Decimal type in the future, its width would be
+        // of two 8-bytes words and should adapt the width calculation below.
+        offset += 8;
+    }
+    (offsets, offset - null_width)
+}
+
 /// Estimate row width based on schema
-pub fn estimate_row_width(schema: &Arc<Schema>) -> usize {
-    let null_free = schema_null_free(schema);
-    let field_count = schema.fields().len();
-    let mut width = if null_free { 0 } else { ceil(field_count, 8) };
+pub(crate) fn estimate_row_width(schema: &Schema, layout: &RowLayout) -> usize {
+    let mut width = layout.fixed_part_width();
+    if matches!(layout.row_type, RowType::WordAligned) {
+        return width;
+    }
     for f in schema.fields() {
-        width += type_width(f.data_type());
         match f.data_type() {
             DataType::Utf8 => width += UTF8_DEFAULT_SIZE,
             DataType::Binary => width += BINARY_DEFAULT_SIZE,
@@ -65,3 +147,58 @@ pub fn estimate_row_width(schema: &Arc<Schema>) -> usize {
     }
     round_upto_power_of_2(width, 8)
 }
+
+/// Tell if we can create raw-bytes based rows since we currently
+/// has limited data type supports in the row format
+fn row_supported(schema: &Schema, row_type: RowType) -> bool {
+    schema
+        .fields()
+        .iter()
+        .all(|f| supported_type(f.data_type(), row_type))
+}
+
+fn supported_type(dt: &DataType, row_type: RowType) -> bool {
+    use DataType::*;
+
+    match row_type {
+        RowType::Compact => {
+            matches!(
+                dt,
+                Boolean
+                    | UInt8
+                    | UInt16
+                    | UInt32
+                    | UInt64
+                    | Int8
+                    | Int16
+                    | Int32
+                    | Int64
+                    | Float32
+                    | Float64
+                    | Date32
+                    | Date64
+                    | Utf8
+                    | Binary
+            )
+        }
+        // only fixed length types are supported for fast in-place update.
+        RowType::WordAligned => {
+            matches!(
+                dt,
+                Boolean
+                    | UInt8
+                    | UInt16
+                    | UInt32
+                    | UInt64
+                    | Int8
+                    | Int16
+                    | Int32
+                    | Int64
+                    | Float32
+                    | Float64
+                    | Date32
+                    | Date64
+            )
+        }
+    }
+}
diff --git a/datafusion/core/src/row/mod.rs b/datafusion/core/src/row/mod.rs
index 1fbf01275..f8e9ff273 100644
--- a/datafusion/core/src/row/mod.rs
+++ b/datafusion/core/src/row/mod.rs
@@ -48,61 +48,21 @@
 //!
 
 use arrow::array::{make_builder, ArrayBuilder};
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::Schema;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
+pub use layout::RowType;
 use std::sync::Arc;
 
 #[cfg(feature = "jit")]
-mod jit;
+pub mod jit;
 mod layout;
 pub mod reader;
 mod validity;
 pub mod writer;
 
-fn supported_type(dt: &DataType) -> bool {
-    use DataType::*;
-    matches!(
-        dt,
-        Boolean
-            | UInt8
-            | UInt16
-            | UInt32
-            | UInt64
-            | Int8
-            | Int16
-            | Int32
-            | Int64
-            | Float32
-            | Float64
-            | Date32
-            | Date64
-            | Utf8
-            | Binary
-    )
-}
-
-/// Tell if we can create raw-bytes based rows since we currently
-/// has limited data type supports in the row format
-pub fn row_supported(schema: &Arc<Schema>) -> bool {
-    schema
-        .fields()
-        .iter()
-        .all(|f| supported_type(f.data_type()))
-}
-
-fn var_length(dt: &DataType) -> bool {
-    use DataType::*;
-    matches!(dt, Utf8 | Binary)
-}
-
-/// Tell if the row is of fixed size
-pub fn fixed_size(schema: &Arc<Schema>) -> bool {
-    schema.fields().iter().all(|f| !var_length(f.data_type()))
-}
-
 /// Tell if schema contains no nullable field
-pub fn schema_null_free(schema: &Arc<Schema>) -> bool {
+pub(crate) fn schema_null_free(schema: &Schema) -> bool {
     schema.fields().iter().all(|f| !f.is_nullable())
 }
 
@@ -126,7 +86,7 @@ impl MutableRecordBatch {
     }
 }
 
-fn new_arrays(schema: &Arc<Schema>, batch_size: usize) -> Vec<Box<dyn ArrayBuilder>> {
+fn new_arrays(schema: &Schema, batch_size: usize) -> Vec<Box<dyn ArrayBuilder>> {
     schema
         .fields()
         .iter()
@@ -155,6 +115,7 @@ mod tests {
     use crate::physical_plan::file_format::FileScanConfig;
     use crate::physical_plan::{collect, ExecutionPlan};
     use crate::prelude::SessionContext;
+    use crate::row::layout::RowType::{Compact, WordAligned};
     use crate::row::reader::read_as_batch;
     use crate::row::writer::write_batch_unchecked;
     use arrow::record_batch::RecordBatch;
@@ -166,33 +127,33 @@ mod tests {
     use DataType::*;
 
     macro_rules! fn_test_single_type {
-        ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
+        ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
             paste::item! {
                 #[test]
                 #[allow(non_snake_case)]
-                fn [<test_single_ $TYPE>]() -> Result<()> {
+                fn [<test_ $ROWTYPE _single_ $TYPE>]() -> Result<()> {
                     let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)]));
                     let a = $ARRAY::from($VEC);
                     let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
                     let mut vector = vec![0; 1024];
                     let row_offsets =
-                        { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
-                    let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+                        { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), $ROWTYPE) };
+                    let output_batch = { read_as_batch(&vector, schema, &row_offsets, $ROWTYPE)? };
                     assert_eq!(batch, output_batch);
                     Ok(())
                 }
 
                 #[test]
                 #[allow(non_snake_case)]
-                fn [<test_single_ $TYPE _null_free>]() -> Result<()> {
+                fn [<test_ $ROWTYPE _single_ $TYPE _null_free>]() -> Result<()> {
                     let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
                     let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
                     let a = $ARRAY::from(v);
                     let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
                     let mut vector = vec![0; 1024];
                     let row_offsets =
-                        { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
-                    let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+                        { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), $ROWTYPE) };
+                    let output_batch = { read_as_batch(&vector, schema, &row_offsets, $ROWTYPE)? };
                     assert_eq!(batch, output_batch);
                     Ok(())
                 }
@@ -203,87 +164,202 @@ mod tests {
     fn_test_single_type!(
         BooleanArray,
         Boolean,
-        vec![Some(true), Some(false), None, Some(true), None]
+        vec![Some(true), Some(false), None, Some(true), None],
+        Compact
+    );
+
+    fn_test_single_type!(
+        BooleanArray,
+        Boolean,
+        vec![Some(true), Some(false), None, Some(true), None],
+        WordAligned
     );
 
     fn_test_single_type!(
         Int8Array,
         Int8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        Int8Array,
+        Int8,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         Int16Array,
         Int16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        Int16Array,
+        Int16,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        Int32Array,
+        Int32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         Int32Array,
         Int32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         Int64Array,
         Int64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        Int64Array,
+        Int64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         UInt8Array,
         UInt8,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        UInt8Array,
+        UInt8,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        UInt16Array,
+        UInt16,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         UInt16Array,
         UInt16,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         UInt32Array,
         UInt32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        UInt32Array,
+        UInt32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        UInt64Array,
+        UInt64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         UInt64Array,
         UInt64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         Float32Array,
         Float32,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        Float32Array,
+        Float32,
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+        WordAligned
     );
 
     fn_test_single_type!(
         Float64Array,
         Float64,
-        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+        Compact
+    );
+
+    fn_test_single_type!(
+        Float64Array,
+        Float64,
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        Date32Array,
+        Date32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         Date32Array,
         Date32,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
+    );
+
+    fn_test_single_type!(
+        Date64Array,
+        Date64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        Compact
     );
 
     fn_test_single_type!(
         Date64Array,
         Date64,
-        vec![Some(5), Some(7), None, Some(0), Some(111)]
+        vec![Some(5), Some(7), None, Some(0), Some(111)],
+        WordAligned
     );
 
     fn_test_single_type!(
         StringArray,
         Utf8,
-        vec![Some("hello"), Some("world"), None, Some(""), Some("")]
+        vec![Some("hello"), Some("world"), None, Some(""), Some("")],
+        Compact
     );
 
+    #[test]
+    #[should_panic(expected = "row_supported(schema, row_type)")]
+    fn test_unsupported_word_aligned_type() {
+        let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world"]));
+        let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+        let schema = batch.schema();
+        let mut vector = vec![0; 1024];
+        write_batch_unchecked(&mut vector, 0, &batch, 0, schema, WordAligned);
+    }
+
     #[test]
     fn test_single_binary() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, true)]));
@@ -293,8 +369,8 @@ mod tests {
         let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
         let mut vector = vec![0; 8192];
         let row_offsets =
-            { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
-        let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+            { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), Compact) };
+        let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
         assert_eq!(batch, output_batch);
         Ok(())
     }
@@ -307,8 +383,8 @@ mod tests {
         let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
         let mut vector = vec![0; 8192];
         let row_offsets =
-            { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
-        let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+            { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), Compact) };
+        let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
         assert_eq!(batch, output_batch);
         Ok(())
     }
@@ -327,25 +403,47 @@ mod tests {
 
         let mut vector = vec![0; 20480];
         let row_offsets =
-            { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) };
-        let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
+            { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) };
+        let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
+        assert_eq!(*batch, output_batch);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_parquet_word_aligned() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
+        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let schema = exec.schema().clone();
+
+        let batches = collect(exec, task_ctx).await?;
+        assert_eq!(1, batches.len());
+        let batch = &batches[0];
+
+        let mut vector = vec![0; 20480];
+        let row_offsets = {
+            write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned)
+        };
+        let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? };
         assert_eq!(*batch, output_batch);
 
         Ok(())
     }
 
     #[test]
-    #[should_panic(expected = "supported(schema)")]
+    #[should_panic(expected = "row_supported(schema, row_type)")]
     fn test_unsupported_type_write() {
         let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8]));
         let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
         let schema = batch.schema();
         let mut vector = vec![0; 1024];
-        write_batch_unchecked(&mut vector, 0, &batch, 0, schema);
+        write_batch_unchecked(&mut vector, 0, &batch, 0, schema, Compact);
     }
 
     #[test]
-    #[should_panic(expected = "supported(schema)")]
+    #[should_panic(expected = "row_supported(schema, row_type)")]
     fn test_unsupported_type_read() {
         let schema = Arc::new(Schema::new(vec![Field::new(
             "a",
@@ -354,7 +452,7 @@ mod tests {
         )]));
         let vector = vec![0; 1024];
         let row_offsets = vec![0];
-        read_as_batch(&vector, schema, &row_offsets).unwrap();
+        read_as_batch(&vector, schema, &row_offsets, Compact).unwrap();
     }
 
     async fn get_exec(
diff --git a/datafusion/core/src/row/reader.rs b/datafusion/core/src/row/reader.rs
index 4d9fb3136..abaf57c14 100644
--- a/datafusion/core/src/row/reader.rs
+++ b/datafusion/core/src/row/reader.rs
@@ -18,13 +18,13 @@
 //! Accessing row from raw bytes
 
 use crate::error::{DataFusionError, Result};
-use crate::row::layout::get_offsets;
+use crate::row::layout::{RowLayout, RowType};
 use crate::row::validity::{all_valid, NullBitsFormatter};
-use crate::row::{row_supported, schema_null_free, MutableRecordBatch};
+use crate::row::MutableRecordBatch;
 use arrow::array::*;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
-use arrow::util::bit_util::{ceil, get_bit_raw};
+use arrow::util::bit_util::get_bit_raw;
 use std::sync::Arc;
 
 /// Read `data` of raw-bytes rows starting at `offsets` out to a record batch
@@ -32,23 +32,24 @@ pub fn read_as_batch(
     data: &[u8],
     schema: Arc<Schema>,
     offsets: &[usize],
+    row_type: RowType,
 ) -> Result<RecordBatch> {
     let row_num = offsets.len();
     let mut output = MutableRecordBatch::new(row_num, schema.clone());
-    let mut row = RowReader::new(&schema);
+    let mut row = RowReader::new(&schema, row_type);
 
     for offset in offsets.iter().take(row_num) {
         row.point_to(*offset, data);
         read_row(&row, &mut output, &schema);
     }
 
-    output.output().map_err(DataFusionError::ArrowError)
+    Ok(output.output()?)
 }
 
 macro_rules! get_idx {
     ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{
         $SELF.assert_index_valid($IDX);
-        let offset = $SELF.field_offsets[$IDX];
+        let offset = $SELF.field_offsets()[$IDX];
         let start = $SELF.base_offset + offset;
         let end = start + $WIDTH;
         $NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap())
@@ -60,7 +61,7 @@ macro_rules! fn_get_idx {
         paste::item! {
             fn [<get_ $NATIVE>](&self, idx: usize) -> $NATIVE {
                 self.assert_index_valid(idx);
-                let offset = self.field_offsets[idx];
+                let offset = self.field_offsets()[idx];
                 let start = self.base_offset + offset;
                 let end = start + $WIDTH;
                 $NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap())
@@ -85,32 +86,24 @@ macro_rules! fn_get_idx_opt {
 
 /// Read the tuple `data[base_offset..]` we are currently pointing to
 pub struct RowReader<'a> {
+    /// Layout on how to read each field
+    layout: RowLayout,
     /// Raw bytes slice where the tuple stores
     data: &'a [u8],
     /// Start position for the current tuple in the raw bytes slice.
     base_offset: usize,
-    /// Total number of fields for each tuple.
-    field_count: usize,
-    /// The number of bytes used to store null bits for each field.
-    null_width: usize,
-    /// Starting offset for each fields in the raw bytes.
-    /// For fixed length fields, it's where the actual data stores.
-    /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64.
-    field_offsets: Vec<usize>,
-    /// If a row is null free according to its schema
-    null_free: bool,
 }
 
 impl<'a> std::fmt::Debug for RowReader<'a> {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        if self.null_free {
+        if self.null_free() {
             write!(f, "null_free")
         } else {
             let null_bits = self.null_bits();
             write!(
                 f,
                 "{:?}",
-                NullBitsFormatter::new(null_bits, self.field_count)
+                NullBitsFormatter::new(null_bits, self.layout.field_count)
             )
         }
     }
@@ -118,19 +111,11 @@ impl<'a> std::fmt::Debug for RowReader<'a> {
 
 impl<'a> RowReader<'a> {
     /// new
-    pub fn new(schema: &Arc<Schema>) -> Self {
-        assert!(row_supported(schema));
-        let null_free = schema_null_free(schema);
-        let field_count = schema.fields().len();
-        let null_width = if null_free { 0 } else { ceil(field_count, 8) };
-        let (field_offsets, _) = get_offsets(null_width, schema);
+    pub fn new(schema: &Schema, row_type: RowType) -> Self {
         Self {
+            layout: RowLayout::new(schema, row_type),
             data: &[],
             base_offset: 0,
-            field_count,
-            null_width,
-            field_offsets,
-            null_free,
         }
     }
 
@@ -142,26 +127,36 @@ impl<'a> RowReader<'a> {
 
     #[inline]
     fn assert_index_valid(&self, idx: usize) {
-        assert!(idx < self.field_count);
+        assert!(idx < self.layout.field_count);
+    }
+
+    #[inline(always)]
+    fn field_offsets(&self) -> &[usize] {
+        &self.layout.field_offsets
+    }
+
+    #[inline(always)]
+    fn null_free(&self) -> bool {
+        self.layout.null_free
     }
 
     #[inline(always)]
     fn null_bits(&self) -> &[u8] {
-        if self.null_free {
+        if self.null_free() {
             &[]
         } else {
             let start = self.base_offset;
-            &self.data[start..start + self.null_width]
+            &self.data[start..start + self.layout.null_width]
         }
     }
 
     #[inline(always)]
     fn all_valid(&self) -> bool {
-        if self.null_free {
+        if self.null_free() {
             true
         } else {
             let null_bits = self.null_bits();
-            all_valid(null_bits, self.field_count)
+            all_valid(null_bits, self.layout.field_count)
         }
     }
 
@@ -171,14 +166,14 @@ impl<'a> RowReader<'a> {
 
     fn get_bool(&self, idx: usize) -> bool {
         self.assert_index_valid(idx);
-        let offset = self.field_offsets[idx];
+        let offset = self.field_offsets()[idx];
         let value = &self.data[self.base_offset + offset..];
         value[0] != 0
     }
 
     fn get_u8(&self, idx: usize) -> u8 {
         self.assert_index_valid(idx);
-        let offset = self.field_offsets[idx];
+        let offset = self.field_offsets()[idx];
         self.data[self.base_offset + offset]
     }
 
@@ -257,8 +252,8 @@ impl<'a> RowReader<'a> {
 }
 
 /// Read the row currently pointed by RowWriter to the output columnar batch buffer
-pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Arc<Schema>) {
-    if row.null_free || row.all_valid() {
+pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Schema) {
+    if row.all_valid() {
         for ((col_idx, to), field) in batch
             .arrays
             .iter_mut()
diff --git a/datafusion/core/src/row/writer.rs b/datafusion/core/src/row/writer.rs
index 9cb208d03..920eb9963 100644
--- a/datafusion/core/src/row/writer.rs
+++ b/datafusion/core/src/row/writer.rs
@@ -18,12 +18,11 @@
 //! Reusable row writer backed by Vec<u8> to stitch attributes together
 
 use crate::error::Result;
-use crate::row::layout::{estimate_row_width, get_offsets};
-use crate::row::{fixed_size, row_supported, schema_null_free};
+use crate::row::layout::{estimate_row_width, RowLayout, RowType};
 use arrow::array::*;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
-use arrow::util::bit_util::{ceil, round_upto_power_of_2, set_bit_raw, unset_bit_raw};
+use arrow::util::bit_util::{round_upto_power_of_2, set_bit_raw, unset_bit_raw};
 use std::cmp::max;
 use std::sync::Arc;
 
@@ -37,8 +36,9 @@ pub fn write_batch_unchecked(
     batch: &RecordBatch,
     row_idx: usize,
     schema: Arc<Schema>,
+    row_type: RowType,
 ) -> Vec<usize> {
-    let mut writer = RowWriter::new(&schema);
+    let mut writer = RowWriter::new(&schema, row_type);
     let mut current_offset = offset;
     let mut offsets = vec![];
     let columns = batch.columns();
@@ -58,8 +58,9 @@ pub fn write_batch_unchecked(
 pub fn bench_write_batch(
     batches: &[Vec<RecordBatch>],
     schema: Arc<Schema>,
+    row_type: RowType,
 ) -> Result<Vec<usize>> {
-    let mut writer = RowWriter::new(&schema);
+    let mut writer = RowWriter::new(&schema, row_type);
     let mut lengths = vec![];
 
     for batch in batches.iter().flatten() {
@@ -77,7 +78,7 @@ pub fn bench_write_batch(
 macro_rules! set_idx {
     ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
         $SELF.assert_index_valid($IDX);
-        let offset = $SELF.field_offsets[$IDX];
+        let offset = $SELF.field_offsets()[$IDX];
         $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
     }};
 }
@@ -87,7 +88,7 @@ macro_rules! fn_set_idx {
         paste::item! {
             fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
                 self.assert_index_valid(idx);
-                let offset = self.field_offsets[idx];
+                let offset = self.field_offsets()[idx];
                 self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
             }
         }
@@ -96,51 +97,30 @@ macro_rules! fn_set_idx {
 
 /// Reusable row writer backed by Vec<u8>
 pub struct RowWriter {
+    /// Layout on how to write each field
+    layout: RowLayout,
     /// buffer for the current tuple been written.
     data: Vec<u8>,
-    /// Total number of fields for each tuple.
-    field_count: usize,
     /// Length in bytes for the current tuple, 8-bytes word aligned.
     pub(crate) row_width: usize,
-    /// The number of bytes used to store null bits for each field.
-    null_width: usize,
-    /// Length in bytes for `values` part of the current tuple.
-    values_width: usize,
     /// Length in bytes for `variable length data` part of the current tuple.
     varlena_width: usize,
     /// Current offset for the next variable length field to write to.
     varlena_offset: usize,
-    /// Starting offset for each fields in the raw bytes.
-    /// For fixed length fields, it's where the actual data stores.
-    /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64.
-    field_offsets: Vec<usize>,
-    /// If a row is null free according to its schema
-    null_free: bool,
 }
 
 impl RowWriter {
     /// new
-    pub fn new(schema: &Arc<Schema>) -> Self {
-        assert!(row_supported(schema));
-        let null_free = schema_null_free(schema);
-        let field_count = schema.fields().len();
-        let null_width = if null_free { 0 } else { ceil(field_count, 8) };
-        let (field_offsets, values_width) = get_offsets(null_width, schema);
-        let mut init_capacity = estimate_row_width(schema);
-        if !fixed_size(schema) {
-            // double the capacity to avoid repeated resize
-            init_capacity *= 2;
-        }
+    pub fn new(schema: &Schema, row_type: RowType) -> Self {
+        let layout = RowLayout::new(schema, row_type);
+        let init_capacity = estimate_row_width(schema, &layout);
+        let varlena_offset = layout.fixed_part_width();
         Self {
+            layout,
             data: vec![0; init_capacity],
-            field_count,
             row_width: 0,
-            null_width,
-            values_width,
             varlena_width: 0,
-            varlena_offset: null_width + values_width,
-            field_offsets,
-            null_free,
+            varlena_offset,
         }
     }
 
@@ -149,20 +129,30 @@ impl RowWriter {
         self.data.fill(0);
         self.row_width = 0;
         self.varlena_width = 0;
-        self.varlena_offset = self.null_width + self.values_width;
+        self.varlena_offset = self.layout.fixed_part_width();
     }
 
     #[inline]
     fn assert_index_valid(&self, idx: usize) {
-        assert!(idx < self.field_count);
+        assert!(idx < self.layout.field_count);
+    }
+
+    #[inline(always)]
+    fn field_offsets(&self) -> &[usize] {
+        &self.layout.field_offsets
+    }
+
+    #[inline(always)]
+    fn null_free(&self) -> bool {
+        self.layout.null_free
     }
 
     pub(crate) fn set_null_at(&mut self, idx: usize) {
         assert!(
-            !self.null_free,
+            !self.null_free(),
             "Unexpected call to set_null_at on null-free row writer"
         );
-        let null_bits = &mut self.data[0..self.null_width];
+        let null_bits = &mut self.data[0..self.layout.null_width];
         unsafe {
             unset_bit_raw(null_bits.as_mut_ptr(), idx);
         }
@@ -170,10 +160,10 @@ impl RowWriter {
 
     pub(crate) fn set_non_null_at(&mut self, idx: usize) {
         assert!(
-            !self.null_free,
+            !self.null_free(),
             "Unexpected call to set_non_null_at on null-free row writer"
         );
-        let null_bits = &mut self.data[0..self.null_width];
+        let null_bits = &mut self.data[0..self.layout.null_width];
         unsafe {
             set_bit_raw(null_bits.as_mut_ptr(), idx);
         }
@@ -181,13 +171,13 @@ impl RowWriter {
 
     fn set_bool(&mut self, idx: usize, value: bool) {
         self.assert_index_valid(idx);
-        let offset = self.field_offsets[idx];
+        let offset = self.field_offsets()[idx];
         self.data[offset] = if value { 1 } else { 0 };
     }
 
     fn set_u8(&mut self, idx: usize, value: u8) {
         self.assert_index_valid(idx);
-        let offset = self.field_offsets[idx];
+        let offset = self.field_offsets()[idx];
         self.data[offset] = value;
     }
 
@@ -202,7 +192,7 @@ impl RowWriter {
 
     fn set_i8(&mut self, idx: usize, value: i8) {
         self.assert_index_valid(idx);
-        let offset = self.field_offsets[idx];
+        let offset = self.field_offsets()[idx];
         self.data[offset] = value.to_le_bytes()[0];
     }
 
@@ -241,7 +231,7 @@ impl RowWriter {
     }
 
     fn current_width(&self) -> usize {
-        self.null_width + self.values_width + self.varlena_width
+        self.layout.fixed_part_width() + self.varlena_width
     }
 
     /// End each row at 8-byte word boundary.
@@ -263,11 +253,11 @@ impl RowWriter {
 pub fn write_row(
     row: &mut RowWriter,
     row_idx: usize,
-    schema: &Arc<Schema>,
+    schema: &Schema,
     columns: &[ArrayRef],
 ) -> usize {
     // Get the row from the batch denoted by row_idx
-    if row.null_free {
+    if row.null_free() {
         for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) {
             write_field(i, row_idx, col, f.data_type(), row);
         }
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 4bdf6d666..692717828 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -216,7 +216,7 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send
 }
 
 /// Create vector batches
-pub fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
+pub fn create_vec_batches(schema: &Schema, n: usize) -> Vec<RecordBatch> {
     let batch = create_batch(schema);
     let mut vec = Vec::with_capacity(n);
     for _ in 0..n {
@@ -226,9 +226,9 @@ pub fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
 }
 
 /// Create batch
-fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
+fn create_batch(schema: &Schema) -> RecordBatch {
     RecordBatch::try_new(
-        schema.clone(),
+        Arc::new(schema.clone()),
         vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
     )
     .unwrap()
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index ef6ea52dd..a124311aa 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use super::*;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
 
 #[tokio::test]
 async fn explain_analyze_baseline_metrics() {
@@ -136,8 +137,16 @@ async fn explain_analyze_baseline_metrics() {
             }
             let metrics = plan.metrics().unwrap().aggregate_by_partition();
 
-            assert!(metrics.output_rows().unwrap() > 0);
-            assert!(metrics.elapsed_compute().unwrap() > 0);
+            assert!(
+                metrics.output_rows().unwrap() > 0,
+                "plan: {}",
+                DisplayableExecutionPlan::with_metrics(plan).one_line()
+            );
+            assert!(
+                metrics.elapsed_compute().unwrap() > 0,
+                "plan: {}",
+                DisplayableExecutionPlan::with_metrics(plan).one_line()
+            );
 
             let mut saw_start = false;
             let mut saw_end = false;