You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/08 11:11:03 UTC

[GitHub] [arrow-datafusion] yjshen opened a new pull request #1782: Introduce `Row` format backed by raw bytes

yjshen opened a new pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782


   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #1708 .
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   A row format backed by raw bytes:
   
   Each tuple consists of up to three parts: [null bit set] [values] [var length data]
   
   The null bit set is used for null tracking and is aligned to 1-byte. It stores
   one bit per field.
   
   In the region of the values, we store the fields in the order they are defined in the schema.
   - For fixed-length, sequential access fields, we store them directly. E.g., 4 bytes for int and 1 byte for bool.
   - For fixed-length, update often fields, we store one 8-byte word per field.
   - For fields of non-primitive or variable-length types, we append their actual content to the end of the var length region and store their offset relative to row base and their length, packed into an 8-byte word.
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   No.
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   
   # TODOs:
   
   - [ ] JIT the tuple field get/set with schema, avoid branching for each field in each row.
   - [ ] Null-bit free code path if the schema has no nullable fields.
   - [ ] Support more types.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r802432870



##########
File path: datafusion/src/row/bitmap/mod.rs
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! General utilities for null bit section handling
+//!
+//! Note: this is a tailored version based on [arrow2 bitmap utils](https://github.com/jorgecarleitao/arrow2/tree/main/src/bitmap/utils)

Review comment:
       The bitmap is rewritten on top of `arrow/util/bit_util`, along with a much-simplified version of `fmt`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r801534654



##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.
+//! - For fixed-length, sequential access fields, we store them directly.
+//!       E.g., 4 bytes for int and 1 byte for bool.
+//! - For fixed-length, update often fields, we store one 8-byte word per field.
+//! - For fields of non-primitive or variable-length types,
+//!       we append their actual content to the end of the var length region and
+//!       store their offset relative to row base and their length, packed into an 8-byte word.
+
+use arrow::datatypes::{DataType, Schema};
+use std::sync::Arc;
+
+mod bitmap;
+mod reader;
+mod writer;
+
+const UTF8_DEFAULT_SIZE: usize = 20;
+const BINARY_DEFAULT_SIZE: usize = 100;
+
+/// Get relative offsets for each field and total width for values
+fn get_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) {
+    let mut offsets = vec![];
+    let mut offset = null_width;
+    for f in schema.fields() {
+        offsets.push(offset);
+        offset += type_width(f.data_type());
+    }
+    (offsets, offset - null_width)
+}
+
+fn supported_type(dt: &DataType) -> bool {
+    use DataType::*;
+    matches!(
+        dt,
+        Boolean
+            | UInt8
+            | UInt16
+            | UInt32
+            | UInt64
+            | Int8
+            | Int16
+            | Int32
+            | Int64
+            | Float32
+            | Float64
+            | Date32
+            | Date64
+            | Utf8
+            | Binary
+    )
+}
+
+fn var_length(dt: &DataType) -> bool {
+    use DataType::*;
+    matches!(dt, Utf8 | Binary)
+}
+
+fn type_width(dt: &DataType) -> usize {
+    use DataType::*;
+    if var_length(dt) {
+        return 8;
+    }
+    match dt {
+        Boolean | UInt8 | Int8 => 1,
+        UInt16 | Int16 => 2,
+        UInt32 | Int32 | Float32 | Date32 => 4,
+        UInt64 | Int64 | Float64 | Date64 => 8,
+        _ => unreachable!(),
+    }
+}
+
+fn estimate_row_width(null_width: usize, schema: &Arc<Schema>) -> usize {
+    let mut width = null_width;
+    for f in schema.fields() {
+        width += type_width(f.data_type());
+        match f.data_type() {
+            DataType::Utf8 => width += UTF8_DEFAULT_SIZE,
+            DataType::Binary => width += BINARY_DEFAULT_SIZE,
+            _ => {}
+        }
+    }
+    (width.saturating_add(7) / 8) * 8
+}
+
+fn fixed_size(schema: &Arc<Schema>) -> bool {
+    schema.fields().iter().all(|f| !var_length(f.data_type()))
+}
+
+fn supported(schema: &Arc<Schema>) -> bool {
+    schema
+        .fields()
+        .iter()
+        .all(|f| supported_type(f.data_type()))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::datasource::file_format::parquet::ParquetFormat;
+    use crate::datasource::file_format::FileFormat;
+    use crate::datasource::object_store::local::{
+        local_object_reader, local_object_reader_stream, local_unpartitioned_file,
+        LocalFileSystem,
+    };
+    use crate::error::Result;
+    use crate::execution::runtime_env::RuntimeEnv;
+    use crate::physical_plan::file_format::FileScanConfig;
+    use crate::physical_plan::{collect, ExecutionPlan};
+    use crate::row::reader::read_as_batch;
+    use crate::row::writer::write_batch_unchecked;
+    use arrow::record_batch::RecordBatch;
+    use arrow::{array::*, datatypes::*};
+    use DataType::*;
+
+    macro_rules! fn_test_single_type {
+        ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
+            paste::item! {
+                #[test]
+                #[allow(non_snake_case)]
+                fn [<test_single_ $TYPE>]() -> Result<()> {
+                    let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
+                    let a = $ARRAY::from($VEC);
+                    let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
+                    let mut vector = vec![0; 1024];
+                    let row_offsets =
+                        { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
+                    let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? };
+                    assert_eq!(batch, output_batch);
+                    Ok(())
+                }
+            }
+        };
+    }
+
+    fn_test_single_type!(
+        BooleanArray,
+        Boolean,
+        vec![Some(true), Some(false), None, Some(true), None]
+    );
+
+    fn_test_single_type!(
+        Int8Array,
+        Int8,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Int16Array,
+        Int16,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Int32Array,
+        Int32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Int64Array,
+        Int64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        UInt8Array,
+        UInt8,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        UInt16Array,
+        UInt16,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        UInt32Array,
+        UInt32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        UInt64Array,
+        UInt64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Float32Array,
+        Float32,
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+    );
+
+    fn_test_single_type!(
+        Float64Array,
+        Float64,
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+    );
+
+    fn_test_single_type!(
+        Date32Array,
+        Date32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Date64Array,
+        Date64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        StringArray,
+        Utf8,
+        vec![Some("hello"), Some("world"), None, Some(""), Some("")]
+    );
+
+    #[test]
+    fn test_single_binary() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)]));
+        let values: Vec<Option<&[u8]>> =
+            vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")];
+        let a = BinaryArray::from_opt_vec(values);
+        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
+        let mut vector = vec![0; 8192];
+        let row_offsets =
+            { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
+        let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? };
+        assert_eq!(batch, output_batch);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_parquet() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
+        let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
+        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let schema = exec.schema().clone();
+
+        let batches = collect(exec, runtime).await?;
+        assert_eq!(1, batches.len());
+        let batch = &batches[0];
+
+        let mut vector = vec![0; 20480];
+        let row_offsets =
+            { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) };
+        let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? };
+        assert_eq!(*batch, output_batch);
+
+        Ok(())
+    }
+
+    #[test]
+    #[should_panic(expected = "supported(schema)")]
+    fn test_unsupported_type_write() {
+        let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8]));

Review comment:
       FWIW it would be really cool to support this as IOx uses it and it is just an `Int64Array` with a different logical type, but I can always add later :grin: 

##########
File path: datafusion/src/row/bitmap/mod.rs
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! General utilities for null bit section handling
+//!
+//! Note: this is a tailored version based on [arrow2 bitmap utils](https://github.com/jorgecarleitao/arrow2/tree/main/src/bitmap/utils)
+
+mod fmt;
+
+pub use fmt::fmt;
+
+const BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128];
+const UNSET_BIT_MASK: [u8; 8] = [
+    255 - 1,
+    255 - 2,
+    255 - 4,
+    255 - 8,
+    255 - 16,
+    255 - 32,
+    255 - 64,
+    255 - 128,
+];
+const ALL_VALID_MASK: [u8; 8] = [1, 3, 7, 15, 31, 63, 127, 255];
+
+/// Returns whether bit at position `i` in `byte` is set or not
+#[inline]
+pub fn is_set(byte: u8, i: usize) -> bool {
+    (byte & BIT_MASK[i]) != 0
+}
+
+/// Sets bit at position `i` in `byte`
+#[inline]
+pub fn set(byte: u8, i: usize, value: bool) -> u8 {
+    if value {
+        byte | BIT_MASK[i]
+    } else {
+        byte & UNSET_BIT_MASK[i]
+    }
+}
+
+/// Sets bit at position `i` in `data`
+#[inline]
+pub fn set_bit(data: &mut [u8], i: usize, value: bool) {
+    data[i / 8] = set(data[i / 8], i % 8, value);
+}
+
+/// Returns whether bit at position `i` in `data` is set or not.
+///
+/// # Safety
+/// `i >= data.len() * 8` results in undefined behavior
+#[inline]
+pub unsafe fn get_bit_unchecked(data: &[u8], i: usize) -> bool {
+    (*data.as_ptr().add(i >> 3) & BIT_MASK[i & 7]) != 0
+}
+
+/// Returns the number of bytes required to hold `bits` bits.
+#[inline]
+pub fn bytes_for(bits: usize) -> usize {
+    bits.saturating_add(7) / 8

Review comment:
       I was curious if this optimized to the same thing as `ceil(value, 8)` and the answer is not quite
   
   ```
   add     rax, 7
   mov     rcx, -1
   cmovae  rcx, rax
   shr     rcx, 3
   ```
   
   vs
   
   ```
   mov     rcx, rax
   shr     rcx, 3
   and     eax, 7
   cmp     rax, 1
   sbb     rcx, -1
   ```
   
   The reason being that technically the saturating add form is incorrect at the limit.
   
   I sincerely doubt there is any actual performance difference, but I was sufficiently nerd sniped :laughing: 
   

##########
File path: datafusion/src/row/reader.rs
##########
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Accessing row from raw bytes

Review comment:
       I think a pretty picture would be very helpful, showing how data is encoded

##########
File path: datafusion/src/row/writer.rs
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Reusable row writer backed by Vec<u8> to stitch attributes together
+
+use crate::row::bitmap::{bytes_for, set_bit};
+use crate::row::{estimate_row_width, fixed_size, get_offsets, supported};
+use arrow::array::Array;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use std::cmp::max;
+use std::sync::Arc;
+
+/// Append batch from `row_idx` to `output` buffer start from `offset`
+/// # Panics
+///
+/// This function will panic if the output buffer doesn't have enough space to hold all the rows
+pub fn write_batch_unchecked(
+    output: &mut [u8],
+    offset: usize,
+    batch: &RecordBatch,
+    row_idx: usize,
+    schema: Arc<Schema>,
+) -> Vec<usize> {
+    let mut writer = RowWriter::new(&schema);
+    let mut current_offset = offset;
+    let mut offsets = vec![];
+    for cur_row in row_idx..batch.num_rows() {
+        offsets.push(current_offset);
+        let row_width = write_row(&mut writer, cur_row, batch, &schema);
+        output[current_offset..current_offset + row_width]
+            .copy_from_slice(writer.get_row());
+        current_offset += row_width;
+        writer.reset()
+    }
+    offsets
+}
+
+macro_rules! set_idx {
+    ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
+    }};
+}
+
+macro_rules! fn_set_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
+            }
+        }
+    };
+}
+
+/// Reusable row writer backed by Vec<u8>
+pub struct RowWriter {
+    data: Vec<u8>,
+    field_count: usize,
+    row_width: usize,
+    null_width: usize,
+    values_width: usize,
+    varlena_width: usize,
+    varlena_offset: usize,
+    field_offsets: Vec<usize>,
+}
+
+impl RowWriter {
+    /// new
+    pub fn new(schema: &Arc<Schema>) -> Self {
+        assert!(supported(schema));
+        let field_count = schema.fields().len();
+        let null_width = bytes_for(field_count);
+        let (field_offsets, values_width) = get_offsets(null_width, schema);
+        let mut init_capacity = estimate_row_width(null_width, schema);
+        if !fixed_size(schema) {
+            // double the capacity to avoid repeated resize
+            init_capacity *= 2;
+        }
+        Self {
+            data: vec![0; init_capacity],
+            field_count,
+            row_width: 0,
+            null_width,
+            values_width,
+            varlena_width: 0,
+            varlena_offset: null_width + values_width,

Review comment:
       This way of representing variable length data is pretty cool :+1: 

##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.
+//! - For fixed-length, sequential access fields, we store them directly.
+//!       E.g., 4 bytes for int and 1 byte for bool.
+//! - For fixed-length, update often fields, we store one 8-byte word per field.
+//! - For fields of non-primitive or variable-length types,
+//!       we append their actual content to the end of the var length region and
+//!       store their offset relative to row base and their length, packed into an 8-byte word.
+
+use arrow::datatypes::{DataType, Schema};
+use std::sync::Arc;
+
+mod bitmap;
+mod reader;
+mod writer;
+
+const UTF8_DEFAULT_SIZE: usize = 20;
+const BINARY_DEFAULT_SIZE: usize = 100;
+
+/// Get relative offsets for each field and total width for values
+fn get_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) {
+    let mut offsets = vec![];
+    let mut offset = null_width;
+    for f in schema.fields() {
+        offsets.push(offset);
+        offset += type_width(f.data_type());
+    }
+    (offsets, offset - null_width)
+}
+
+fn supported_type(dt: &DataType) -> bool {
+    use DataType::*;
+    matches!(
+        dt,
+        Boolean
+            | UInt8
+            | UInt16
+            | UInt32
+            | UInt64
+            | Int8
+            | Int16
+            | Int32
+            | Int64
+            | Float32
+            | Float64
+            | Date32
+            | Date64
+            | Utf8
+            | Binary
+    )
+}
+
+fn var_length(dt: &DataType) -> bool {
+    use DataType::*;
+    matches!(dt, Utf8 | Binary)
+}
+
+fn type_width(dt: &DataType) -> usize {
+    use DataType::*;
+    if var_length(dt) {
+        return 8;

Review comment:
       I think this should probably be `std::mem::size_of<u64>` or ideally a `varlena` offset type alias 

##########
File path: datafusion/src/row/reader.rs
##########
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Accessing row from raw bytes
+
+use crate::error::{DataFusionError, Result};
+use crate::row::bitmap::{all_valid, bytes_for, get_bit_unchecked};
+use crate::row::{get_offsets, supported};
+use arrow::array::{make_builder, ArrayBuilder};
+use arrow::datatypes::{DataType, Schema};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use std::sync::Arc;
+
+/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch
+pub fn read_as_batch(
+    data: &mut [u8],
+    schema: Arc<Schema>,
+    offsets: Vec<usize>,
+) -> Result<RecordBatch> {
+    let row_num = offsets.len();
+    let mut output = MutableRecordBatch::new(row_num, schema.clone());
+    let mut row = RowReader::new(&schema, data);
+
+    for offset in offsets.iter().take(row_num) {
+        row.point_to(*offset);
+        read_row(&row, &mut output, &schema)?
+    }
+
+    output.output().map_err(DataFusionError::ArrowError)
+}
+
+macro_rules! get_idx {
+    ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        let start = $SELF.base_offset + offset;
+        let end = start + $WIDTH;
+        $NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap())
+    }};
+}
+
+macro_rules! fn_get_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<get_ $NATIVE>](&self, idx: usize) -> $NATIVE {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                let start = self.base_offset + offset;
+                let end = start + $WIDTH;
+                $NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap())
+            }
+        }
+    };
+}
+
+macro_rules! fn_get_idx_opt {
+    ($NATIVE: ident) => {
+        paste::item! {
+            fn [<get_ $NATIVE _opt>](&self, idx: usize) -> Option<$NATIVE> {
+                if self.is_valid_at(idx) {
+                    Some(self.[<get_ $NATIVE>](idx))
+                } else {
+                    None
+                }
+            }
+        }
+    };
+}
+
+struct RowReader<'a> {
+    data: &'a [u8],
+    base_offset: usize,
+    field_count: usize,
+    null_width: usize,
+    field_offsets: Vec<usize>,
+}
+
+impl<'a> std::fmt::Debug for RowReader<'a> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let null_bits = self.null_bits();
+        super::bitmap::fmt(null_bits, 0, self.null_width, f)
+    }
+}
+
+impl<'a> RowReader<'a> {
+    fn new(schema: &Arc<Schema>, data: &'a [u8]) -> Self {
+        assert!(supported(schema));
+        let field_count = schema.fields().len();
+        let null_width = bytes_for(field_count);
+        let (field_offsets, _) = get_offsets(null_width, schema);
+        Self {
+            data,
+            base_offset: 0,
+            field_count,
+            null_width,
+            field_offsets,
+        }
+    }
+
+    /// Update this row to point to position `offset` in `base`
+    fn point_to(&mut self, offset: usize) {
+        self.base_offset = offset;
+    }
+
+    #[inline]
+    fn assert_index_valid(&self, idx: usize) {
+        assert!(idx < self.field_count);
+    }
+
+    #[inline(always)]
+    fn null_bits(&self) -> &[u8] {
+        let start = self.base_offset;
+        &self.data[start..start + self.null_width]
+    }
+
+    #[inline(always)]
+    fn all_valid(&self) -> bool {
+        let null_bits = self.null_bits();
+        all_valid(null_bits, self.field_count)
+    }
+
+    fn is_valid_at(&self, idx: usize) -> bool {
+        unsafe { get_bit_unchecked(self.null_bits(), idx) }
+    }
+
+    fn get_bool(&self, idx: usize) -> bool {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        let value = &self.data[self.base_offset + offset..];
+        value[0] != 0
+    }
+
+    fn get_u8(&self, idx: usize) -> u8 {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[self.base_offset + offset]
+    }
+
+    fn_get_idx!(u16, 2);
+    fn_get_idx!(u32, 4);
+    fn_get_idx!(u64, 8);
+    fn_get_idx!(i8, 1);
+    fn_get_idx!(i16, 2);
+    fn_get_idx!(i32, 4);
+    fn_get_idx!(i64, 8);
+    fn_get_idx!(f32, 4);
+    fn_get_idx!(f64, 8);
+
+    fn get_date32(&self, idx: usize) -> i32 {
+        get_idx!(i32, self, idx, 4)
+    }
+
+    fn get_date64(&self, idx: usize) -> i64 {
+        get_idx!(i64, self, idx, 8)
+    }
+
+    fn get_utf8(&self, idx: usize) -> &str {
+        self.assert_index_valid(idx);
+        let offset_size = self.get_u64(idx);
+        let offset = (offset_size >> 32) as usize;
+        let len = (offset_size & 0xffff_ffff) as usize;
+        let varlena_offset = self.base_offset + offset;
+        let bytes = &self.data[varlena_offset..varlena_offset + len];
+        std::str::from_utf8(bytes).unwrap()
+    }
+
+    fn get_binary(&self, idx: usize) -> &[u8] {
+        self.assert_index_valid(idx);
+        let offset_size = self.get_u64(idx);
+        let offset = (offset_size >> 32) as usize;
+        let len = (offset_size & 0xffff_ffff) as usize;
+        let varlena_offset = self.base_offset + offset;
+        &self.data[varlena_offset..varlena_offset + len]
+    }
+
+    fn_get_idx_opt!(bool);
+    fn_get_idx_opt!(u8);
+    fn_get_idx_opt!(u16);
+    fn_get_idx_opt!(u32);
+    fn_get_idx_opt!(u64);
+    fn_get_idx_opt!(i8);
+    fn_get_idx_opt!(i16);
+    fn_get_idx_opt!(i32);
+    fn_get_idx_opt!(i64);
+    fn_get_idx_opt!(f32);
+    fn_get_idx_opt!(f64);
+
+    fn get_date32_opt(&self, idx: usize) -> Option<i32> {
+        if self.is_valid_at(idx) {
+            Some(self.get_date32(idx))
+        } else {
+            None
+        }
+    }
+
+    fn get_date64_opt(&self, idx: usize) -> Option<i64> {
+        if self.is_valid_at(idx) {
+            Some(self.get_date64(idx))
+        } else {
+            None
+        }
+    }
+
+    fn get_utf8_opt(&self, idx: usize) -> Option<&str> {
+        if self.is_valid_at(idx) {
+            Some(self.get_utf8(idx))
+        } else {
+            None
+        }
+    }
+}
+
+fn read_row(
+    row: &RowReader,
+    batch: &mut MutableRecordBatch,
+    schema: &Arc<Schema>,
+) -> Result<()> {
+    if row.all_valid() {
+        for ((col_idx, to), field) in batch
+            .arrays
+            .iter_mut()
+            .enumerate()
+            .zip(schema.fields().iter())
+        {
+            read_field_null_free(to, field.data_type(), col_idx, row)?
+        }
+    } else {
+        for ((col_idx, to), field) in batch
+            .arrays
+            .iter_mut()
+            .enumerate()
+            .zip(schema.fields().iter())
+        {
+            read_field(to, field.data_type(), col_idx, row)?
+        }
+    }
+    Ok(())
+}
+
+fn read_field(
+    to: &mut Box<dyn ArrayBuilder>,
+    dt: &DataType,
+    col_idx: usize,
+    row: &RowReader,
+) -> Result<()> {
+    use arrow::array::*;
+    use DataType::*;
+    match dt {
+        Boolean => {
+            let to = to.as_any_mut().downcast_mut::<BooleanBuilder>().unwrap();
+            to.append_option(row.get_bool_opt(col_idx))?;
+        }
+        UInt8 => {
+            let to = to.as_any_mut().downcast_mut::<UInt8Builder>().unwrap();
+            to.append_option(row.get_u8_opt(col_idx))?;
+        }
+        UInt16 => {
+            let to = to.as_any_mut().downcast_mut::<UInt16Builder>().unwrap();
+            to.append_option(row.get_u16_opt(col_idx))?;
+        }
+        UInt32 => {
+            let to = to.as_any_mut().downcast_mut::<UInt32Builder>().unwrap();
+            to.append_option(row.get_u32_opt(col_idx))?;
+        }
+        UInt64 => {
+            let to = to.as_any_mut().downcast_mut::<UInt64Builder>().unwrap();
+            to.append_option(row.get_u64_opt(col_idx))?;
+        }
+        Int8 => {
+            let to = to.as_any_mut().downcast_mut::<Int8Builder>().unwrap();
+            to.append_option(row.get_i8_opt(col_idx))?;
+        }
+        Int16 => {
+            let to = to.as_any_mut().downcast_mut::<Int16Builder>().unwrap();
+            to.append_option(row.get_i16_opt(col_idx))?;
+        }
+        Int32 => {
+            let to = to.as_any_mut().downcast_mut::<Int32Builder>().unwrap();
+            to.append_option(row.get_i32_opt(col_idx))?;
+        }
+        Int64 => {
+            let to = to.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
+            to.append_option(row.get_i64_opt(col_idx))?;
+        }
+        Float32 => {
+            let to = to.as_any_mut().downcast_mut::<Float32Builder>().unwrap();
+            to.append_option(row.get_f32_opt(col_idx))?;
+        }
+        Float64 => {
+            let to = to.as_any_mut().downcast_mut::<Float64Builder>().unwrap();
+            to.append_option(row.get_f64_opt(col_idx))?;
+        }
+        Date32 => {
+            let to = to.as_any_mut().downcast_mut::<Date32Builder>().unwrap();
+            to.append_option(row.get_date32_opt(col_idx))?;
+        }
+        Date64 => {
+            let to = to.as_any_mut().downcast_mut::<Date64Builder>().unwrap();
+            to.append_option(row.get_date64_opt(col_idx))?;
+        }
+        Utf8 => {
+            let to = to.as_any_mut().downcast_mut::<StringBuilder>().unwrap();
+            to.append_option(row.get_utf8_opt(col_idx))?;
+        }
+        Binary => {
+            let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap();
+            if row.is_valid_at(col_idx) {
+                to.append_value(row.get_binary(col_idx))?;
+            } else {
+                to.append_null()?;
+            }
+        }
+        _ => unimplemented!(),
+    }
+    Ok(())
+}
+
+fn read_field_null_free(
+    to: &mut Box<dyn ArrayBuilder>,
+    dt: &DataType,
+    col_idx: usize,
+    row: &RowReader,
+) -> Result<()> {
+    use arrow::array::*;
+    use DataType::*;
+    match dt {
+        Boolean => {
+            let to = to.as_any_mut().downcast_mut::<BooleanBuilder>().unwrap();
+            to.append_value(row.get_bool(col_idx))?;
+        }
+        UInt8 => {
+            let to = to.as_any_mut().downcast_mut::<UInt8Builder>().unwrap();
+            to.append_value(row.get_u8(col_idx))?;
+        }
+        UInt16 => {
+            let to = to.as_any_mut().downcast_mut::<UInt16Builder>().unwrap();
+            to.append_value(row.get_u16(col_idx))?;
+        }
+        UInt32 => {
+            let to = to.as_any_mut().downcast_mut::<UInt32Builder>().unwrap();
+            to.append_value(row.get_u32(col_idx))?;
+        }
+        UInt64 => {
+            let to = to.as_any_mut().downcast_mut::<UInt64Builder>().unwrap();
+            to.append_value(row.get_u64(col_idx))?;
+        }
+        Int8 => {
+            let to = to.as_any_mut().downcast_mut::<Int8Builder>().unwrap();
+            to.append_value(row.get_i8(col_idx))?;
+        }
+        Int16 => {
+            let to = to.as_any_mut().downcast_mut::<Int16Builder>().unwrap();
+            to.append_value(row.get_i16(col_idx))?;
+        }
+        Int32 => {
+            let to = to.as_any_mut().downcast_mut::<Int32Builder>().unwrap();
+            to.append_value(row.get_i32(col_idx))?;
+        }
+        Int64 => {
+            let to = to.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
+            to.append_value(row.get_i64(col_idx))?;
+        }
+        Float32 => {
+            let to = to.as_any_mut().downcast_mut::<Float32Builder>().unwrap();
+            to.append_value(row.get_f32(col_idx))?;
+        }
+        Float64 => {
+            let to = to.as_any_mut().downcast_mut::<Float64Builder>().unwrap();
+            to.append_value(row.get_f64(col_idx))?;
+        }
+        Date32 => {
+            let to = to.as_any_mut().downcast_mut::<Date32Builder>().unwrap();
+            to.append_value(row.get_date32(col_idx))?;
+        }
+        Date64 => {
+            let to = to.as_any_mut().downcast_mut::<Date64Builder>().unwrap();
+            to.append_value(row.get_date64(col_idx))?;
+        }
+        Utf8 => {
+            let to = to.as_any_mut().downcast_mut::<StringBuilder>().unwrap();
+            to.append_value(row.get_utf8(col_idx))?;
+        }
+        Binary => {
+            let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap();
+            to.append_value(row.get_binary(col_idx))?;
+        }
+        _ => unimplemented!(),
+    }
+    Ok(())
+}
+
+struct MutableRecordBatch {

Review comment:
       This feels like something we could upstream into arrow-rs :thinking: 

##########
File path: datafusion/src/row/writer.rs
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Reusable row writer backed by Vec<u8> to stitch attributes together
+
+use crate::row::bitmap::{bytes_for, set_bit};
+use crate::row::{estimate_row_width, fixed_size, get_offsets, supported};
+use arrow::array::Array;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use std::cmp::max;
+use std::sync::Arc;
+
+/// Append batch from `row_idx` to `output` buffer start from `offset`
+/// # Panics
+///
+/// This function will panic if the output buffer doesn't have enough space to hold all the rows
+pub fn write_batch_unchecked(
+    output: &mut [u8],
+    offset: usize,
+    batch: &RecordBatch,
+    row_idx: usize,
+    schema: Arc<Schema>,
+) -> Vec<usize> {
+    let mut writer = RowWriter::new(&schema);
+    let mut current_offset = offset;
+    let mut offsets = vec![];
+    for cur_row in row_idx..batch.num_rows() {
+        offsets.push(current_offset);
+        let row_width = write_row(&mut writer, cur_row, batch, &schema);
+        output[current_offset..current_offset + row_width]
+            .copy_from_slice(writer.get_row());
+        current_offset += row_width;
+        writer.reset()
+    }
+    offsets
+}
+
+macro_rules! set_idx {
+    ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
+    }};
+}
+
+macro_rules! fn_set_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
+            }
+        }
+    };
+}
+
+/// Reusable row writer backed by Vec<u8>
+pub struct RowWriter {
+    data: Vec<u8>,
+    field_count: usize,
+    row_width: usize,
+    null_width: usize,
+    values_width: usize,
+    varlena_width: usize,
+    varlena_offset: usize,
+    field_offsets: Vec<usize>,

Review comment:
       Some doc strings would be pretty :+1:

##########
File path: datafusion/src/row/bitmap/fmt.rs
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Write;
+
+use super::is_set;
+
+/// Formats `bytes` taking into account an offset and length of the form
+pub fn fmt(
+    bytes: &[u8],
+    offset: usize,
+    length: usize,
+    f: &mut std::fmt::Formatter<'_>,
+) -> std::fmt::Result {

Review comment:
       It might be an idea to make this a `struct BitFormatter<'a>` that implements `Display`. Similar to the `A` in the test

##########
File path: datafusion/src/row/bitmap/fmt.rs
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Write;
+
+use super::is_set;
+
+/// Formats `bytes` taking into account an offset and length of the form

Review comment:
       Possibly worth specifying that `offset` and `length` are in bits

##########
File path: datafusion/src/row/bitmap/fmt.rs
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Write;
+
+use super::is_set;
+
+/// Formats `bytes` taking into account an offset and length of the form
+pub fn fmt(
+    bytes: &[u8],
+    offset: usize,
+    length: usize,
+    f: &mut std::fmt::Formatter<'_>,
+) -> std::fmt::Result {
+    assert!(offset < 8);

Review comment:
       You could compute `offset / 8` and offset `bytes` by this amount (and decrement length) instead of panicking

##########
File path: datafusion/src/row/bitmap/fmt.rs
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Write;
+
+use super::is_set;
+
+/// Formats `bytes` taking into account an offset and length of the form
+pub fn fmt(
+    bytes: &[u8],
+    offset: usize,
+    length: usize,
+    f: &mut std::fmt::Formatter<'_>,
+) -> std::fmt::Result {
+    assert!(offset < 8);
+
+    f.write_char('[')?;
+    let mut remaining = length;
+    if remaining == 0 {
+        f.write_char(']')?;
+        return Ok(());
+    }
+
+    let first = bytes[0];
+    let bytes = &bytes[1..];
+    let empty_before = 8usize.saturating_sub(remaining + offset);
+    f.write_str("0b")?;
+    for _ in 0..empty_before {
+        f.write_char('_')?;
+    }
+    let until = std::cmp::min(8, offset + remaining);
+    for i in offset..until {
+        if is_set(first, offset + until - 1 - i) {
+            f.write_char('1')?;
+        } else {
+            f.write_char('0')?;
+        }
+    }
+    for _ in 0..offset {
+        f.write_char('_')?;
+    }
+    remaining -= until - offset;
+
+    if remaining == 0 {
+        f.write_char(']')?;
+        return Ok(());
+    }
+
+    let number_of_bytes = remaining / 8;
+    for byte in &bytes[..number_of_bytes] {
+        f.write_str(", ")?;
+        f.write_fmt(format_args!("{:#010b}", byte))?;
+    }
+    remaining -= number_of_bytes * 8;
+    if remaining == 0 {
+        f.write_char(']')?;
+        return Ok(());
+    }
+
+    let last = bytes[std::cmp::min((length + offset + 7) / 8, bytes.len() - 1)];
+    let remaining = (length + offset) % 8;
+    f.write_str(", ")?;
+    f.write_str("0b")?;
+    for _ in 0..(8 - remaining) {
+        f.write_char('_')?;
+    }
+    for i in 0..remaining {
+        if is_set(last, remaining - 1 - i) {
+            f.write_char('1')?;
+        } else {
+            f.write_char('0')?;
+        }
+    }
+    f.write_char(']')
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    struct A<'a>(&'a [u8], usize, usize);
+    impl<'a> std::fmt::Debug for A<'a> {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            fmt(self.0, self.1, self.2, f)
+        }
+    }
+
+    #[test]
+    fn test_debug() -> std::fmt::Result {
+        assert_eq!(format!("{:?}", A(&[1], 0, 0)), "[]");

Review comment:
       Putting some of this into a doctest might make it easier to see what the function does

##########
File path: datafusion/src/row/reader.rs
##########
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Accessing row from raw bytes
+
+use crate::error::{DataFusionError, Result};
+use crate::row::bitmap::{all_valid, bytes_for, get_bit_unchecked};
+use crate::row::{get_offsets, supported};
+use arrow::array::{make_builder, ArrayBuilder};
+use arrow::datatypes::{DataType, Schema};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use std::sync::Arc;
+
+/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch
+pub fn read_as_batch(
+    data: &mut [u8],
+    schema: Arc<Schema>,
+    offsets: Vec<usize>,
+) -> Result<RecordBatch> {
+    let row_num = offsets.len();
+    let mut output = MutableRecordBatch::new(row_num, schema.clone());
+    let mut row = RowReader::new(&schema, data);
+
+    for offset in offsets.iter().take(row_num) {
+        row.point_to(*offset);
+        read_row(&row, &mut output, &schema)?
+    }
+
+    output.output().map_err(DataFusionError::ArrowError)
+}
+
+macro_rules! get_idx {
+    ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        let start = $SELF.base_offset + offset;
+        let end = start + $WIDTH;
+        $NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap())
+    }};
+}
+
+macro_rules! fn_get_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<get_ $NATIVE>](&self, idx: usize) -> $NATIVE {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                let start = self.base_offset + offset;
+                let end = start + $WIDTH;
+                $NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap())
+            }
+        }
+    };
+}
+
+macro_rules! fn_get_idx_opt {
+    ($NATIVE: ident) => {
+        paste::item! {
+            fn [<get_ $NATIVE _opt>](&self, idx: usize) -> Option<$NATIVE> {
+                if self.is_valid_at(idx) {
+                    Some(self.[<get_ $NATIVE>](idx))
+                } else {
+                    None
+                }
+            }
+        }
+    };
+}
+
+struct RowReader<'a> {
+    data: &'a [u8],
+    base_offset: usize,
+    field_count: usize,
+    null_width: usize,
+    field_offsets: Vec<usize>,
+}
+
+impl<'a> std::fmt::Debug for RowReader<'a> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let null_bits = self.null_bits();
+        super::bitmap::fmt(null_bits, 0, self.null_width, f)
+    }
+}
+
+impl<'a> RowReader<'a> {
+    fn new(schema: &Arc<Schema>, data: &'a [u8]) -> Self {
+        assert!(supported(schema));
+        let field_count = schema.fields().len();
+        let null_width = bytes_for(field_count);
+        let (field_offsets, _) = get_offsets(null_width, schema);
+        Self {
+            data,
+            base_offset: 0,
+            field_count,
+            null_width,
+            field_offsets,
+        }
+    }
+
+    /// Update this row to point to position `offset` in `base`
+    fn point_to(&mut self, offset: usize) {
+        self.base_offset = offset;
+    }
+
+    #[inline]
+    fn assert_index_valid(&self, idx: usize) {
+        assert!(idx < self.field_count);
+    }
+
+    #[inline(always)]
+    fn null_bits(&self) -> &[u8] {
+        let start = self.base_offset;
+        &self.data[start..start + self.null_width]
+    }
+
+    #[inline(always)]
+    fn all_valid(&self) -> bool {
+        let null_bits = self.null_bits();
+        all_valid(null_bits, self.field_count)
+    }
+
+    fn is_valid_at(&self, idx: usize) -> bool {
+        unsafe { get_bit_unchecked(self.null_bits(), idx) }
+    }
+
+    fn get_bool(&self, idx: usize) -> bool {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        let value = &self.data[self.base_offset + offset..];
+        value[0] != 0
+    }
+
+    fn get_u8(&self, idx: usize) -> u8 {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[self.base_offset + offset]
+    }
+
+    fn_get_idx!(u16, 2);
+    fn_get_idx!(u32, 4);
+    fn_get_idx!(u64, 8);
+    fn_get_idx!(i8, 1);
+    fn_get_idx!(i16, 2);
+    fn_get_idx!(i32, 4);
+    fn_get_idx!(i64, 8);
+    fn_get_idx!(f32, 4);
+    fn_get_idx!(f64, 8);
+
+    fn get_date32(&self, idx: usize) -> i32 {
+        get_idx!(i32, self, idx, 4)
+    }
+
+    fn get_date64(&self, idx: usize) -> i64 {

Review comment:
       Perhaps these methods should assert that `idx` is actually the corresponding type. Otherwise this is effectively reinterpreting memory as different primitives, which whilst technically safe, is borderline unsafe :grinning: 

##########
File path: datafusion/src/row/writer.rs
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Reusable row writer backed by Vec<u8> to stitch attributes together
+
+use crate::row::bitmap::{bytes_for, set_bit};
+use crate::row::{estimate_row_width, fixed_size, get_offsets, supported};
+use arrow::array::Array;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use std::cmp::max;
+use std::sync::Arc;
+
+/// Append batch from `row_idx` to `output` buffer start from `offset`
+/// # Panics
+///
+/// This function will panic if the output buffer doesn't have enough space to hold all the rows
+pub fn write_batch_unchecked(
+    output: &mut [u8],
+    offset: usize,
+    batch: &RecordBatch,
+    row_idx: usize,
+    schema: Arc<Schema>,
+) -> Vec<usize> {
+    let mut writer = RowWriter::new(&schema);
+    let mut current_offset = offset;
+    let mut offsets = vec![];
+    for cur_row in row_idx..batch.num_rows() {
+        offsets.push(current_offset);
+        let row_width = write_row(&mut writer, cur_row, batch, &schema);
+        output[current_offset..current_offset + row_width]
+            .copy_from_slice(writer.get_row());
+        current_offset += row_width;
+        writer.reset()
+    }
+    offsets
+}
+
+macro_rules! set_idx {
+    ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
+    }};
+}
+
+macro_rules! fn_set_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
+            }
+        }
+    };
+}
+
+/// Reusable row writer backed by Vec<u8>
+pub struct RowWriter {
+    data: Vec<u8>,
+    field_count: usize,
+    row_width: usize,
+    null_width: usize,
+    values_width: usize,
+    varlena_width: usize,
+    varlena_offset: usize,
+    field_offsets: Vec<usize>,
+}
+
+impl RowWriter {
+    /// new
+    pub fn new(schema: &Arc<Schema>) -> Self {
+        assert!(supported(schema));
+        let field_count = schema.fields().len();
+        let null_width = bytes_for(field_count);
+        let (field_offsets, values_width) = get_offsets(null_width, schema);
+        let mut init_capacity = estimate_row_width(null_width, schema);
+        if !fixed_size(schema) {
+            // double the capacity to avoid repeated resize
+            init_capacity *= 2;
+        }
+        Self {
+            data: vec![0; init_capacity],
+            field_count,
+            row_width: 0,
+            null_width,
+            values_width,
+            varlena_width: 0,
+            varlena_offset: null_width + values_width,
+            field_offsets,
+        }
+    }
+
+    /// Reset the row writer state for new tuple
+    pub fn reset(&mut self) {
+        self.data.fill(0);
+        self.row_width = 0;
+        self.varlena_width = 0;
+        self.varlena_offset = self.null_width + self.values_width;
+    }
+
+    #[inline]
+    fn assert_index_valid(&self, idx: usize) {
+        assert!(idx < self.field_count);
+    }
+
+    fn set_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, false)
+    }
+
+    fn set_non_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, true)
+    }
+
+    fn set_bool(&mut self, idx: usize, value: bool) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = if value { 1 } else { 0 };
+    }
+
+    fn set_u8(&mut self, idx: usize, value: u8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value;
+    }
+
+    fn_set_idx!(u16, 2);
+    fn_set_idx!(u32, 4);
+    fn_set_idx!(u64, 8);
+    fn_set_idx!(i16, 2);
+    fn_set_idx!(i32, 4);
+    fn_set_idx!(i64, 8);
+    fn_set_idx!(f32, 4);
+    fn_set_idx!(f64, 8);
+
+    fn set_i8(&mut self, idx: usize, value: i8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value.to_le_bytes()[0];
+    }
+
+    fn set_date32(&mut self, idx: usize, value: i32) {
+        set_idx!(4, self, idx, value)
+    }
+
+    fn set_date64(&mut self, idx: usize, value: i64) {
+        set_idx!(8, self, idx, value)
+    }
+
+    fn set_offset_size(&mut self, idx: usize, size: usize) {
+        let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64;
+        self.set_u64(idx, offset_and_size);
+    }
+
+    fn set_utf8(&mut self, idx: usize, value: &str) {
+        self.assert_index_valid(idx);
+        let bytes = value.as_bytes();
+        let size = bytes.len();
+        self.set_offset_size(idx, size);
+        let varlena_offset = self.varlena_offset;
+        self.data[varlena_offset..varlena_offset + size].copy_from_slice(bytes);
+        self.varlena_offset += size;
+        self.varlena_width += size;
+    }
+
+    fn set_binary(&mut self, idx: usize, value: &[u8]) {
+        self.assert_index_valid(idx);
+        let size = value.len();
+        self.set_offset_size(idx, size);
+        let varlena_offset = self.varlena_offset;
+        self.data[varlena_offset..varlena_offset + size].copy_from_slice(value);
+        self.varlena_offset += size;
+        self.varlena_width += size;
+    }
+
+    fn current_width(&self) -> usize {
+        self.null_width + self.values_width + self.varlena_width
+    }
+
+    /// End each row at 8-byte word boundary.
+    fn end_padding(&mut self) {
+        let payload_width = self.current_width();
+        self.row_width = (payload_width.saturating_add(7) / 8) * 8;
+        if self.data.capacity() < self.row_width {
+            self.data.resize(self.row_width, 0);
+        }
+    }
+
+    fn get_row(&self) -> &[u8] {
+        &self.data[0..self.row_width]
+    }
+}
+
+/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width
+fn write_row(
+    row: &mut RowWriter,
+    row_idx: usize,
+    batch: &RecordBatch,
+    schema: &Arc<Schema>,
+) -> usize {
+    // Get the row from the batch denoted by row_idx
+    for ((i, f), col) in schema
+        .fields()
+        .iter()
+        .enumerate()
+        .zip(batch.columns().iter())
+    {
+        if !col.is_null(row_idx) {
+            row.set_non_null_at(i);
+            write_field(i, row_idx, col, f.data_type(), row);
+        } else {
+            row.set_null_at(i);
+        }
+    }
+
+    row.end_padding();
+    row.row_width
+}
+
+fn write_field(
+    col_idx: usize,
+    row_idx: usize,
+    col: &Arc<dyn Array>,
+    dt: &DataType,
+    row: &mut RowWriter,
+) {
+    // TODO: JIT compile this
+    use arrow::array::*;
+    use DataType::*;
+    match dt {
+        Boolean => {
+            let c = col.as_any().downcast_ref::<BooleanArray>().unwrap();
+            row.set_bool(col_idx, c.value(row_idx));
+        }
+        UInt8 => {
+            let c = col.as_any().downcast_ref::<UInt8Array>().unwrap();
+            row.set_u8(col_idx, c.value(row_idx));
+        }
+        UInt16 => {
+            let c = col.as_any().downcast_ref::<UInt16Array>().unwrap();
+            row.set_u16(col_idx, c.value(row_idx));
+        }
+        UInt32 => {
+            let c = col.as_any().downcast_ref::<UInt32Array>().unwrap();
+            row.set_u32(col_idx, c.value(row_idx));
+        }
+        UInt64 => {
+            let c = col.as_any().downcast_ref::<UInt64Array>().unwrap();
+            row.set_u64(col_idx, c.value(row_idx));
+        }
+        Int8 => {
+            let c = col.as_any().downcast_ref::<Int8Array>().unwrap();
+            row.set_i8(col_idx, c.value(row_idx));
+        }
+        Int16 => {
+            let c = col.as_any().downcast_ref::<Int16Array>().unwrap();
+            row.set_i16(col_idx, c.value(row_idx));
+        }
+        Int32 => {
+            let c = col.as_any().downcast_ref::<Int32Array>().unwrap();
+            row.set_i32(col_idx, c.value(row_idx));
+        }
+        Int64 => {
+            let c = col.as_any().downcast_ref::<Int64Array>().unwrap();
+            row.set_i64(col_idx, c.value(row_idx));
+        }
+        Float32 => {
+            let c = col.as_any().downcast_ref::<Float32Array>().unwrap();
+            row.set_f32(col_idx, c.value(row_idx));
+        }
+        Float64 => {
+            let c = col.as_any().downcast_ref::<Float64Array>().unwrap();
+            row.set_f64(col_idx, c.value(row_idx));
+        }
+        Date32 => {
+            let c = col.as_any().downcast_ref::<Date32Array>().unwrap();
+            row.set_date32(col_idx, c.value(row_idx));
+        }
+        Date64 => {
+            let c = col.as_any().downcast_ref::<Date64Array>().unwrap();
+            row.set_date64(col_idx, c.value(row_idx));
+        }
+        Utf8 => {
+            let c = col.as_any().downcast_ref::<StringArray>().unwrap();
+            let str = c.value(row_idx);

Review comment:
       ```suggestion
               let s = c.value(row_idx);
   ```

##########
File path: datafusion/src/row/writer.rs
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Reusable row writer backed by Vec<u8> to stitch attributes together
+
+use crate::row::bitmap::{bytes_for, set_bit};
+use crate::row::{estimate_row_width, fixed_size, get_offsets, supported};
+use arrow::array::Array;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use std::cmp::max;
+use std::sync::Arc;
+
+/// Append batch from `row_idx` to `output` buffer start from `offset`
+/// # Panics
+///
+/// This function will panic if the output buffer doesn't have enough space to hold all the rows
+pub fn write_batch_unchecked(
+    output: &mut [u8],
+    offset: usize,
+    batch: &RecordBatch,
+    row_idx: usize,
+    schema: Arc<Schema>,
+) -> Vec<usize> {
+    let mut writer = RowWriter::new(&schema);
+    let mut current_offset = offset;
+    let mut offsets = vec![];
+    for cur_row in row_idx..batch.num_rows() {
+        offsets.push(current_offset);
+        let row_width = write_row(&mut writer, cur_row, batch, &schema);
+        output[current_offset..current_offset + row_width]
+            .copy_from_slice(writer.get_row());
+        current_offset += row_width;
+        writer.reset()
+    }
+    offsets
+}
+
+macro_rules! set_idx {
+    ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
+    }};
+}
+
+macro_rules! fn_set_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
+            }
+        }
+    };
+}
+
+/// Reusable row writer backed by Vec<u8>
+pub struct RowWriter {
+    data: Vec<u8>,
+    field_count: usize,
+    row_width: usize,
+    null_width: usize,
+    values_width: usize,
+    varlena_width: usize,
+    varlena_offset: usize,
+    field_offsets: Vec<usize>,
+}
+
+impl RowWriter {
+    /// new
+    pub fn new(schema: &Arc<Schema>) -> Self {
+        assert!(supported(schema));
+        let field_count = schema.fields().len();
+        let null_width = bytes_for(field_count);
+        let (field_offsets, values_width) = get_offsets(null_width, schema);
+        let mut init_capacity = estimate_row_width(null_width, schema);
+        if !fixed_size(schema) {
+            // double the capacity to avoid repeated resize
+            init_capacity *= 2;
+        }
+        Self {
+            data: vec![0; init_capacity],
+            field_count,
+            row_width: 0,
+            null_width,
+            values_width,
+            varlena_width: 0,
+            varlena_offset: null_width + values_width,
+            field_offsets,
+        }
+    }
+
+    /// Reset the row writer state for new tuple
+    pub fn reset(&mut self) {
+        self.data.fill(0);
+        self.row_width = 0;
+        self.varlena_width = 0;
+        self.varlena_offset = self.null_width + self.values_width;
+    }
+
+    #[inline]
+    fn assert_index_valid(&self, idx: usize) {
+        assert!(idx < self.field_count);
+    }
+
+    fn set_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, false)
+    }
+
+    fn set_non_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, true)
+    }
+
+    fn set_bool(&mut self, idx: usize, value: bool) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = if value { 1 } else { 0 };
+    }
+
+    fn set_u8(&mut self, idx: usize, value: u8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value;
+    }
+
+    fn_set_idx!(u16, 2);
+    fn_set_idx!(u32, 4);
+    fn_set_idx!(u64, 8);
+    fn_set_idx!(i16, 2);
+    fn_set_idx!(i32, 4);
+    fn_set_idx!(i64, 8);
+    fn_set_idx!(f32, 4);
+    fn_set_idx!(f64, 8);
+
+    fn set_i8(&mut self, idx: usize, value: i8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value.to_le_bytes()[0];
+    }
+
+    fn set_date32(&mut self, idx: usize, value: i32) {
+        set_idx!(4, self, idx, value)
+    }
+
+    fn set_date64(&mut self, idx: usize, value: i64) {
+        set_idx!(8, self, idx, value)
+    }
+
+    fn set_offset_size(&mut self, idx: usize, size: usize) {
+        let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64;
+        self.set_u64(idx, offset_and_size);
+    }
+
+    fn set_utf8(&mut self, idx: usize, value: &str) {
+        self.assert_index_valid(idx);
+        let bytes = value.as_bytes();
+        let size = bytes.len();
+        self.set_offset_size(idx, size);
+        let varlena_offset = self.varlena_offset;
+        self.data[varlena_offset..varlena_offset + size].copy_from_slice(bytes);
+        self.varlena_offset += size;
+        self.varlena_width += size;
+    }
+
+    fn set_binary(&mut self, idx: usize, value: &[u8]) {
+        self.assert_index_valid(idx);
+        let size = value.len();
+        self.set_offset_size(idx, size);
+        let varlena_offset = self.varlena_offset;
+        self.data[varlena_offset..varlena_offset + size].copy_from_slice(value);
+        self.varlena_offset += size;
+        self.varlena_width += size;
+    }
+
+    fn current_width(&self) -> usize {
+        self.null_width + self.values_width + self.varlena_width
+    }
+
+    /// End each row at 8-byte word boundary.
+    fn end_padding(&mut self) {
+        let payload_width = self.current_width();
+        self.row_width = (payload_width.saturating_add(7) / 8) * 8;

Review comment:
       `ceil(payload_width, 64) / 8`

##########
File path: datafusion/src/row/bitmap/fmt.rs
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Write;
+
+use super::is_set;
+
+/// Formats `bytes` taking into account an offset and length of the form
+pub fn fmt(
+    bytes: &[u8],
+    offset: usize,
+    length: usize,
+    f: &mut std::fmt::Formatter<'_>,
+) -> std::fmt::Result {
+    assert!(offset < 8);
+
+    f.write_char('[')?;
+    let mut remaining = length;
+    if remaining == 0 {
+        f.write_char(']')?;
+        return Ok(());
+    }
+
+    let first = bytes[0];
+    let bytes = &bytes[1..];
+    let empty_before = 8usize.saturating_sub(remaining + offset);
+    f.write_str("0b")?;
+    for _ in 0..empty_before {
+        f.write_char('_')?;
+    }
+    let until = std::cmp::min(8, offset + remaining);
+    for i in offset..until {

Review comment:
       `(until..offset).rev()` might be easier to follow than `offset + until - 1 - i`

##########
File path: datafusion/src/row/bitmap/fmt.rs
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Write;
+
+use super::is_set;
+
+/// Formats `bytes` taking into account an offset and length of the form
+pub fn fmt(
+    bytes: &[u8],
+    offset: usize,
+    length: usize,
+    f: &mut std::fmt::Formatter<'_>,
+) -> std::fmt::Result {
+    assert!(offset < 8);
+
+    f.write_char('[')?;
+    let mut remaining = length;
+    if remaining == 0 {
+        f.write_char(']')?;
+        return Ok(());
+    }
+
+    let first = bytes[0];
+    let bytes = &bytes[1..];
+    let empty_before = 8usize.saturating_sub(remaining + offset);
+    f.write_str("0b")?;
+    for _ in 0..empty_before {
+        f.write_char('_')?;
+    }
+    let until = std::cmp::min(8, offset + remaining);
+    for i in offset..until {
+        if is_set(first, offset + until - 1 - i) {
+            f.write_char('1')?;
+        } else {
+            f.write_char('0')?;
+        }
+    }
+    for _ in 0..offset {
+        f.write_char('_')?;
+    }
+    remaining -= until - offset;
+
+    if remaining == 0 {
+        f.write_char(']')?;
+        return Ok(());
+    }
+
+    let number_of_bytes = remaining / 8;
+    for byte in &bytes[..number_of_bytes] {
+        f.write_str(", ")?;
+        f.write_fmt(format_args!("{:#010b}", byte))?;
+    }
+    remaining -= number_of_bytes * 8;
+    if remaining == 0 {
+        f.write_char(']')?;
+        return Ok(());
+    }
+
+    let last = bytes[std::cmp::min((length + offset + 7) / 8, bytes.len() - 1)];
+    let remaining = (length + offset) % 8;
+    f.write_str(", ")?;
+    f.write_str("0b")?;
+    for _ in 0..(8 - remaining) {
+        f.write_char('_')?;
+    }
+    for i in 0..remaining {

Review comment:
       `(0..remaining).rev()` maybe




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r802429457



##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.
+//! - For fixed-length, sequential access fields, we store them directly.
+//!       E.g., 4 bytes for int and 1 byte for bool.
+//! - For fixed-length, update often fields, we store one 8-byte word per field.
+//! - For fields of non-primitive or variable-length types,
+//!       we append their actual content to the end of the var length region and
+//!       store their offset relative to row base and their length, packed into an 8-byte word.
+
+use arrow::datatypes::{DataType, Schema};
+use std::sync::Arc;
+
+mod bitmap;
+mod reader;
+mod writer;
+
+const UTF8_DEFAULT_SIZE: usize = 20;
+const BINARY_DEFAULT_SIZE: usize = 100;
+
+/// Get relative offsets for each field and total width for values
+fn get_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) {
+    let mut offsets = vec![];
+    let mut offset = null_width;
+    for f in schema.fields() {
+        offsets.push(offset);
+        offset += type_width(f.data_type());
+    }
+    (offsets, offset - null_width)
+}
+
+fn supported_type(dt: &DataType) -> bool {
+    use DataType::*;
+    matches!(
+        dt,
+        Boolean
+            | UInt8
+            | UInt16
+            | UInt32
+            | UInt64
+            | Int8
+            | Int16
+            | Int32
+            | Int64
+            | Float32
+            | Float64
+            | Date32
+            | Date64
+            | Utf8
+            | Binary
+    )
+}
+
+fn var_length(dt: &DataType) -> bool {
+    use DataType::*;
+    matches!(dt, Utf8 | Binary)
+}
+
+fn type_width(dt: &DataType) -> usize {
+    use DataType::*;
+    if var_length(dt) {
+        return 8;

Review comment:
       Great idea! I temporarily use `size_of::<u64>()`, and I can make it a type parameter for `RowWriter` and `RowReader` as we do for StringArray and LargeStringArray, for memory-saving purposes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r802432870



##########
File path: datafusion/src/row/bitmap/mod.rs
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! General utilities for null bit section handling
+//!
+//! Note: this is a tailored version based on [arrow2 bitmap utils](https://github.com/jorgecarleitao/arrow2/tree/main/src/bitmap/utils)

Review comment:
       The bitmap is rewritten on top of `arrow/util/bit_util`, along with a much simplified version of `fmt`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r803798784



##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.

Review comment:
       > finding where Row3 starts in the following picture needs to scan all columns of Row1 and Row2
   
   I think since we are doing in-memory processing, we can actually store each row's starting offset as a separate vector. just like the method shows:
   
   ```rust
   pub fn write_batch_unchecked(
       output: &mut [u8],
       offset: usize,
       batch: &RecordBatch,
       row_idx: usize,
       schema: Arc<Schema>,
   ) -> Vec<usize> {
       let mut writer = RowWriter::new(&schema);
       let mut current_offset = offset;
       let mut offsets = vec![];
       for cur_row in row_idx..batch.num_rows() {
           offsets.push(current_offset);
           let row_width = write_row(&mut writer, cur_row, batch);
           output[current_offset..current_offset + row_width]
               .copy_from_slice(writer.get_row());
           current_offset += row_width;
           writer.reset()
       }
       offsets
   }
   ```
   I'm thinking of just keeping the offset vector we got while writing, and using it hereafter.
   
   > I think getting optimal performance will come from being able to vectorize many operations for which fixed sized tuples are compelling:
   
   I'm afraid it will be hard to vectorize the operation on a row format since row width may easily exceed the SIMD lane?   
   
   > filling the variable length area from the back.
   Yes, I'm aware of the strategy. but we are using rows mainly during execution, unlike the DBMS systems using this to keep tuples in long-term stores, I think we can just store offset separately in a vector?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1032537356


   Thank @yjshen  -- I look forward to reviewing this carefully later today 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r803804906



##########
File path: datafusion/src/row/reader.rs
##########
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Accessing row from raw bytes

Review comment:
       The diagram is excellent! I put a to-do item yesterday but haven't got a chance to finish it yet. Do you mind I put this into the doc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1034262113


   > [Use directly] The current row implementation is mostly targeted at payload use cases, that we do not update or check by index after the first write, and only decompose to record batch at last. This is the case for sort payload, hash aggregate composed grouping key (we can directly compare raw bytes for equality), hash join key, and join payload.
   
   I am not sure about sorting payloads. It seems to me like copying around the sort payload will potentially be quite ineffecient
   
   Consider a table like
   ```sql
   CREATE TABLE sales (
     value float,
     first_name varchar(2000),
     last_name varchar(2000),
     address varchar(2000)
   )
   ```
    
   And a query like
   
   ```sql
   SELECT * from sales order by value;
   ```
   
   In this case only value needs to be compared, and the payload may be substantial
   
   I thought the current state of the art was to do something like 
   1. `RecordBatch`es --> `Rows` (only for sort key)
   2. Sort the `Rows` using `memcmp`
   3. Use `take` kernel to form the output rows (copy payload columns)
   
   This copies the payload columns only once
   
   If you instead use the Row to hold the payload, you end up
   
   1. `RecordBatch`es --> `Row payload`
   2. Form something to compare using
   2. Sort
   3. Now covert back from Row to `RecordBatch`
   
   Which results in copying the payloads *twice* - and for large tables this is a substantial overhead. 
   
   
   However, I agree a format like this be helpful for storing hash aggregate composed grouping keys, join keys (and maybe intermediate aggregates)
   
   I'll give this PR a good look tomorrow morning
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r801835166



##########
File path: datafusion/src/row/writer.rs
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Reusable row writer backed by Vec<u8> to stitch attributes together
+
+use crate::row::bitmap::{bytes_for, set_bit};
+use crate::row::{estimate_row_width, fixed_size, get_offsets, supported};
+use arrow::array::Array;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use std::cmp::max;
+use std::sync::Arc;
+
+/// Append batch from `row_idx` to `output` buffer start from `offset`
+/// # Panics
+///
+/// This function will panic if the output buffer doesn't have enough space to hold all the rows
+pub fn write_batch_unchecked(
+    output: &mut [u8],
+    offset: usize,
+    batch: &RecordBatch,
+    row_idx: usize,
+    schema: Arc<Schema>,
+) -> Vec<usize> {
+    let mut writer = RowWriter::new(&schema);
+    let mut current_offset = offset;
+    let mut offsets = vec![];
+    for cur_row in row_idx..batch.num_rows() {
+        offsets.push(current_offset);
+        let row_width = write_row(&mut writer, cur_row, batch, &schema);
+        output[current_offset..current_offset + row_width]
+            .copy_from_slice(writer.get_row());
+        current_offset += row_width;
+        writer.reset()
+    }
+    offsets
+}
+
+macro_rules! set_idx {
+    ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
+    }};
+}
+
+macro_rules! fn_set_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
+            }
+        }
+    };
+}
+
+/// Reusable row writer backed by Vec<u8>
+pub struct RowWriter {
+    data: Vec<u8>,
+    field_count: usize,
+    row_width: usize,
+    null_width: usize,
+    values_width: usize,
+    varlena_width: usize,
+    varlena_offset: usize,
+    field_offsets: Vec<usize>,
+}
+
+impl RowWriter {
+    /// new
+    pub fn new(schema: &Arc<Schema>) -> Self {
+        assert!(supported(schema));
+        let field_count = schema.fields().len();
+        let null_width = bytes_for(field_count);
+        let (field_offsets, values_width) = get_offsets(null_width, schema);
+        let mut init_capacity = estimate_row_width(null_width, schema);
+        if !fixed_size(schema) {
+            // double the capacity to avoid repeated resize
+            init_capacity *= 2;
+        }
+        Self {
+            data: vec![0; init_capacity],
+            field_count,
+            row_width: 0,
+            null_width,
+            values_width,
+            varlena_width: 0,
+            varlena_offset: null_width + values_width,
+            field_offsets,
+        }
+    }
+
+    /// Reset the row writer state for new tuple
+    pub fn reset(&mut self) {
+        self.data.fill(0);
+        self.row_width = 0;
+        self.varlena_width = 0;
+        self.varlena_offset = self.null_width + self.values_width;
+    }
+
+    #[inline]
+    fn assert_index_valid(&self, idx: usize) {
+        assert!(idx < self.field_count);
+    }
+
+    fn set_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, false)
+    }
+
+    fn set_non_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, true)
+    }
+
+    fn set_bool(&mut self, idx: usize, value: bool) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = if value { 1 } else { 0 };
+    }
+
+    fn set_u8(&mut self, idx: usize, value: u8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value;
+    }
+
+    fn_set_idx!(u16, 2);
+    fn_set_idx!(u32, 4);
+    fn_set_idx!(u64, 8);
+    fn_set_idx!(i16, 2);
+    fn_set_idx!(i32, 4);
+    fn_set_idx!(i64, 8);
+    fn_set_idx!(f32, 4);
+    fn_set_idx!(f64, 8);
+
+    fn set_i8(&mut self, idx: usize, value: i8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value.to_le_bytes()[0];
+    }
+
+    fn set_date32(&mut self, idx: usize, value: i32) {
+        set_idx!(4, self, idx, value)
+    }
+
+    fn set_date64(&mut self, idx: usize, value: i64) {
+        set_idx!(8, self, idx, value)
+    }
+
+    fn set_offset_size(&mut self, idx: usize, size: usize) {
+        let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64;
+        self.set_u64(idx, offset_and_size);
+    }
+
+    fn set_utf8(&mut self, idx: usize, value: &str) {
+        self.assert_index_valid(idx);
+        let bytes = value.as_bytes();
+        let size = bytes.len();
+        self.set_offset_size(idx, size);
+        let varlena_offset = self.varlena_offset;
+        self.data[varlena_offset..varlena_offset + size].copy_from_slice(bytes);
+        self.varlena_offset += size;
+        self.varlena_width += size;
+    }
+
+    fn set_binary(&mut self, idx: usize, value: &[u8]) {
+        self.assert_index_valid(idx);
+        let size = value.len();
+        self.set_offset_size(idx, size);
+        let varlena_offset = self.varlena_offset;
+        self.data[varlena_offset..varlena_offset + size].copy_from_slice(value);
+        self.varlena_offset += size;
+        self.varlena_width += size;
+    }
+
+    fn current_width(&self) -> usize {
+        self.null_width + self.values_width + self.varlena_width
+    }
+
+    /// End each row at 8-byte word boundary.
+    fn end_padding(&mut self) {
+        let payload_width = self.current_width();
+        self.row_width = (payload_width.saturating_add(7) / 8) * 8;
+        if self.data.capacity() < self.row_width {
+            self.data.resize(self.row_width, 0);
+        }
+    }
+
+    fn get_row(&self) -> &[u8] {
+        &self.data[0..self.row_width]
+    }
+}
+
+/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width
+fn write_row(

Review comment:
       I think having a vectorized version of this would also be helpful so an entire record batch could be converted at once
   
   

##########
File path: datafusion/src/row/writer.rs
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Reusable row writer backed by Vec<u8> to stitch attributes together
+
+use crate::row::bitmap::{bytes_for, set_bit};
+use crate::row::{estimate_row_width, fixed_size, get_offsets, supported};
+use arrow::array::Array;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use std::cmp::max;
+use std::sync::Arc;
+
+/// Append batch from `row_idx` to `output` buffer start from `offset`
+/// # Panics
+///
+/// This function will panic if the output buffer doesn't have enough space to hold all the rows
+pub fn write_batch_unchecked(
+    output: &mut [u8],
+    offset: usize,
+    batch: &RecordBatch,
+    row_idx: usize,
+    schema: Arc<Schema>,
+) -> Vec<usize> {
+    let mut writer = RowWriter::new(&schema);
+    let mut current_offset = offset;
+    let mut offsets = vec![];
+    for cur_row in row_idx..batch.num_rows() {
+        offsets.push(current_offset);
+        let row_width = write_row(&mut writer, cur_row, batch, &schema);
+        output[current_offset..current_offset + row_width]
+            .copy_from_slice(writer.get_row());
+        current_offset += row_width;
+        writer.reset()
+    }
+    offsets
+}
+
+macro_rules! set_idx {
+    ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
+    }};
+}
+
+macro_rules! fn_set_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
+            }
+        }
+    };
+}
+
+/// Reusable row writer backed by Vec<u8>
+pub struct RowWriter {
+    data: Vec<u8>,
+    field_count: usize,
+    row_width: usize,
+    null_width: usize,
+    values_width: usize,
+    varlena_width: usize,
+    varlena_offset: usize,
+    field_offsets: Vec<usize>,
+}
+
+impl RowWriter {
+    /// new
+    pub fn new(schema: &Arc<Schema>) -> Self {
+        assert!(supported(schema));
+        let field_count = schema.fields().len();
+        let null_width = bytes_for(field_count);
+        let (field_offsets, values_width) = get_offsets(null_width, schema);
+        let mut init_capacity = estimate_row_width(null_width, schema);
+        if !fixed_size(schema) {
+            // double the capacity to avoid repeated resize
+            init_capacity *= 2;
+        }
+        Self {
+            data: vec![0; init_capacity],
+            field_count,
+            row_width: 0,
+            null_width,
+            values_width,
+            varlena_width: 0,
+            varlena_offset: null_width + values_width,
+            field_offsets,
+        }
+    }
+
+    /// Reset the row writer state for new tuple
+    pub fn reset(&mut self) {
+        self.data.fill(0);
+        self.row_width = 0;
+        self.varlena_width = 0;
+        self.varlena_offset = self.null_width + self.values_width;
+    }
+
+    #[inline]
+    fn assert_index_valid(&self, idx: usize) {
+        assert!(idx < self.field_count);
+    }
+
+    fn set_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, false)
+    }
+
+    fn set_non_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, true)
+    }
+
+    fn set_bool(&mut self, idx: usize, value: bool) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = if value { 1 } else { 0 };
+    }
+
+    fn set_u8(&mut self, idx: usize, value: u8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value;
+    }
+
+    fn_set_idx!(u16, 2);
+    fn_set_idx!(u32, 4);
+    fn_set_idx!(u64, 8);
+    fn_set_idx!(i16, 2);
+    fn_set_idx!(i32, 4);
+    fn_set_idx!(i64, 8);
+    fn_set_idx!(f32, 4);
+    fn_set_idx!(f64, 8);
+
+    fn set_i8(&mut self, idx: usize, value: i8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value.to_le_bytes()[0];
+    }
+
+    fn set_date32(&mut self, idx: usize, value: i32) {
+        set_idx!(4, self, idx, value)
+    }
+
+    fn set_date64(&mut self, idx: usize, value: i64) {
+        set_idx!(8, self, idx, value)
+    }
+
+    fn set_offset_size(&mut self, idx: usize, size: usize) {
+        let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64;
+        self.set_u64(idx, offset_and_size);
+    }
+
+    fn set_utf8(&mut self, idx: usize, value: &str) {
+        self.assert_index_valid(idx);
+        let bytes = value.as_bytes();
+        let size = bytes.len();
+        self.set_offset_size(idx, size);
+        let varlena_offset = self.varlena_offset;
+        self.data[varlena_offset..varlena_offset + size].copy_from_slice(bytes);
+        self.varlena_offset += size;
+        self.varlena_width += size;
+    }
+
+    fn set_binary(&mut self, idx: usize, value: &[u8]) {
+        self.assert_index_valid(idx);
+        let size = value.len();
+        self.set_offset_size(idx, size);
+        let varlena_offset = self.varlena_offset;
+        self.data[varlena_offset..varlena_offset + size].copy_from_slice(value);
+        self.varlena_offset += size;
+        self.varlena_width += size;
+    }
+
+    fn current_width(&self) -> usize {
+        self.null_width + self.values_width + self.varlena_width
+    }
+
+    /// End each row at 8-byte word boundary.
+    fn end_padding(&mut self) {
+        let payload_width = self.current_width();
+        self.row_width = (payload_width.saturating_add(7) / 8) * 8;
+        if self.data.capacity() < self.row_width {
+            self.data.resize(self.row_width, 0);
+        }
+    }
+
+    fn get_row(&self) -> &[u8] {
+        &self.data[0..self.row_width]
+    }
+}
+
+/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width
+fn write_row(
+    row: &mut RowWriter,
+    row_idx: usize,
+    batch: &RecordBatch,
+    schema: &Arc<Schema>,
+) -> usize {
+    // Get the row from the batch denoted by row_idx
+    for ((i, f), col) in schema

Review comment:
       since the batch has the schema, we could probably avoid the `schema` as argument here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1035106524


   Thanks @alamb for all the thoughts! 
   
   The PR currently relies on `arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }` since the need to access ArrayBuilder function. Do you think I should  make this PR behind a non-default feature and just remove the `patch.crates-io` section?
   
   Besides, I'd like to incorporate you `bit_util` comments as well the row diagram into the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r803785009



##########
File path: datafusion/src/row/bitmap.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! General utilities for null bit section handling based on [arrow::util::bit_util]

Review comment:
       I agree. Let me just inline these wrappers over `bit_util`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r801524919



##########
File path: Cargo.toml
##########
@@ -31,3 +31,7 @@ members = [
 [profile.release]
 lto = true
 codegen-units = 1
+
+[patch.crates-io]
+arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }
+parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }

Review comment:
       Relies on https://github.com/apache/arrow-rs/commit/e375bba8bcd95f3b96c09a9227f5f196b19dcabd, will remove this once we have arrow 9.0.1 released.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1032495801


   Currently, the row format has not been hooked with the rest of the codebase. I'm not sure if it's appropriate to have its own PR or accompanied with a use case, such as in `SortExec`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r804066485



##########
File path: datafusion/src/row/reader.rs
##########
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Accessing row from raw bytes

Review comment:
       > The diagram is excellent! I put a to-do item yesterday but haven't got a chance to finish it yet. Do you mind I put this into the doc?
   
   I don't mind at all -- that is why I made it :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1032500724


   cc @alamb @Dandandan @houqp 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1033592720


   Thanks @alamb for the write-up of row use cases. 
   
   The current row implementation is mostly targeted at payload use cases, that we do not update or check by index after the first write, and only decompose to record batch at last. This is the case for sort payload, hash aggregate composed grouping key (we can directly compare raw bytes for equality), hash join key, and join payload.
   
   And we should change it a little bit by adhering to word-aligned initializing and updating for aggregation state (for CPU friendly), much as you suggested:
   ``` 
   let state = RowWriter::new()
     .for_aggregate(aggregate_exprs)
     .build();
   ```
   
   For composite sort key with no varlena, we shall remove the null-bits part, padding null attributes as all zeros or all ones (according to null first or null last sort option), and do raw bytes comparison.
   
   For composite sort key, if var length attributes (varlena) exist and not the last, direct comparison of raw bytes of the current row format doesn't fit. We need to store varlena in place, padding all sorting keys to the longest width, on which we could compare directly using raw bytes.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1032495801


   Currently, the row format has not been hooked with the rest of the codebase. I'm not sure if it's appropriate to have its own PR or should it accompanied with a use case, such as in `SortExec`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1033554588


   Thanks a lot for the detailed review  @tustvold 
   
   I made several changes, including bitmap rewrite, size rounding, some docs, and made the rest of the great suggestions as TODOs in the PR desc, in case it slipped away silently.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r803798784



##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.

Review comment:
       > finding where Row3 starts in the following picture needs to scan all columns of Row1 and Row2
   
   I think since we are doing in-memory processing, we can actually store each row's starting offset as a separate vector. just like the method shows:
   
   ```rust
   pub fn write_batch_unchecked(
       output: &mut [u8],
       offset: usize,
       batch: &RecordBatch,
       row_idx: usize,
       schema: Arc<Schema>,
   ) -> Vec<usize> {
       let mut writer = RowWriter::new(&schema);
       let mut current_offset = offset;
       let mut offsets = vec![];
       for cur_row in row_idx..batch.num_rows() {
           offsets.push(current_offset);
           let row_width = write_row(&mut writer, cur_row, batch);
           output[current_offset..current_offset + row_width]
               .copy_from_slice(writer.get_row());
           current_offset += row_width;
           writer.reset()
       }
       offsets
   }
   ```
   I'm thinking of just keeping the offset vector we got while writing, and using it hereafter.
   
   > I think getting optimal performance will come from being able to vectorize many operations for which fixed sized tuples are compelling:
   
   I'm afraid it will be hard to vectorize the operation on a row format since row width may easily exceed the SIMD lane?   
   
   > filling the variable length area from the back.
   
   Yes, I'm aware of the strategy. but we are using rows mainly during execution, unlike the DBMS systems using this to keep tuples in long-term stores, I think we can just store offset separately in a vector?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] xudong963 commented on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
xudong963 commented on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1035629117


   Thanks @yjshen . I didn't spare time to see this wonderful ticket, will enjoy it over the weekend.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r801528991



##########
File path: datafusion/src/row/bitmap/mod.rs
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! General utilities for null bit section handling
+//!
+//! Note: this is a tailored version based on [arrow2 bitmap utils](https://github.com/jorgecarleitao/arrow2/tree/main/src/bitmap/utils)

Review comment:
       FWIW this appears to itself be a copy of https://docs.rs/arrow/latest/arrow/util/bit_util/index.html




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r801608112



##########
File path: datafusion/src/row/writer.rs
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Reusable row writer backed by Vec<u8> to stitch attributes together
+
+use crate::row::bitmap::{bytes_for, set_bit};
+use crate::row::{estimate_row_width, fixed_size, get_offsets, supported};
+use arrow::array::Array;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use std::cmp::max;
+use std::sync::Arc;
+
+/// Append batch from `row_idx` to `output` buffer start from `offset`
+/// # Panics
+///
+/// This function will panic if the output buffer doesn't have enough space to hold all the rows
+pub fn write_batch_unchecked(
+    output: &mut [u8],
+    offset: usize,
+    batch: &RecordBatch,
+    row_idx: usize,
+    schema: Arc<Schema>,
+) -> Vec<usize> {
+    let mut writer = RowWriter::new(&schema);
+    let mut current_offset = offset;
+    let mut offsets = vec![];
+    for cur_row in row_idx..batch.num_rows() {
+        offsets.push(current_offset);
+        let row_width = write_row(&mut writer, cur_row, batch, &schema);
+        output[current_offset..current_offset + row_width]
+            .copy_from_slice(writer.get_row());
+        current_offset += row_width;
+        writer.reset()
+    }
+    offsets
+}
+
+macro_rules! set_idx {
+    ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
+    }};
+}
+
+macro_rules! fn_set_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
+            }
+        }
+    };
+}
+
+/// Reusable row writer backed by Vec<u8>
+pub struct RowWriter {
+    data: Vec<u8>,
+    field_count: usize,
+    row_width: usize,
+    null_width: usize,
+    values_width: usize,
+    varlena_width: usize,
+    varlena_offset: usize,
+    field_offsets: Vec<usize>,
+}
+
+impl RowWriter {
+    /// new
+    pub fn new(schema: &Arc<Schema>) -> Self {
+        assert!(supported(schema));
+        let field_count = schema.fields().len();
+        let null_width = bytes_for(field_count);
+        let (field_offsets, values_width) = get_offsets(null_width, schema);
+        let mut init_capacity = estimate_row_width(null_width, schema);
+        if !fixed_size(schema) {
+            // double the capacity to avoid repeated resize
+            init_capacity *= 2;
+        }
+        Self {
+            data: vec![0; init_capacity],
+            field_count,
+            row_width: 0,
+            null_width,
+            values_width,
+            varlena_width: 0,
+            varlena_offset: null_width + values_width,
+            field_offsets,
+        }
+    }
+
+    /// Reset the row writer state for new tuple
+    pub fn reset(&mut self) {
+        self.data.fill(0);
+        self.row_width = 0;
+        self.varlena_width = 0;
+        self.varlena_offset = self.null_width + self.values_width;
+    }
+
+    #[inline]
+    fn assert_index_valid(&self, idx: usize) {
+        assert!(idx < self.field_count);
+    }
+
+    fn set_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, false)
+    }
+
+    fn set_non_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, true)
+    }
+
+    fn set_bool(&mut self, idx: usize, value: bool) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = if value { 1 } else { 0 };
+    }
+
+    fn set_u8(&mut self, idx: usize, value: u8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value;
+    }
+
+    fn_set_idx!(u16, 2);
+    fn_set_idx!(u32, 4);
+    fn_set_idx!(u64, 8);
+    fn_set_idx!(i16, 2);
+    fn_set_idx!(i32, 4);
+    fn_set_idx!(i64, 8);
+    fn_set_idx!(f32, 4);
+    fn_set_idx!(f64, 8);
+
+    fn set_i8(&mut self, idx: usize, value: i8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value.to_le_bytes()[0];
+    }
+
+    fn set_date32(&mut self, idx: usize, value: i32) {
+        set_idx!(4, self, idx, value)
+    }
+
+    fn set_date64(&mut self, idx: usize, value: i64) {
+        set_idx!(8, self, idx, value)
+    }
+
+    fn set_offset_size(&mut self, idx: usize, size: usize) {
+        let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64;

Review comment:
       This should probably panic if `size` or `verlana_offset` are too large, on a related note - perhaps they should be `u32`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r803685458



##########
File path: datafusion/src/row/bitmap.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! General utilities for null bit section handling based on [arrow::util::bit_util]

Review comment:
       What do you think about directly using bit util?
   
   https://github.com/apache/arrow-rs/blob/master/arrow/src/util/bit_util.rs
   
   I feel the proliferation of bit manipulation code is somewhat repetitive

##########
File path: datafusion/src/lib.rs
##########
@@ -223,6 +223,8 @@ pub use arrow;
 pub use parquet;
 
 pub(crate) mod field_util;
+#[allow(dead_code)]

Review comment:
       Rather than `dead_code` how about we add a `feat(experimental)` that is only enabled when we are iterating on this code?
   
   That way we will not impose any compile time overhead on other users of DataFusion who won't get benefit from this code (yet). I am thinking to try and avoid undoing the progress that is being made on #348 
   
   
   
   

##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.

Review comment:
       One classic database technique to pack such data into a single block (rather than a separate variable length area) is to preallocate the page (e.g. 32K or something) and then write rows into the front of the page, but filling the variable length area from the *back*. 

##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.

Review comment:
       This describes a variable length tuple -- so if you pack a bunch of `Row`s together in memory it will not be possible to find some arbitrary location quickly
   
   For example, finding where `Row3 `starts in the following picture needs to scan  all columns of Row1 and Row2
   
   ```
   ┌─────────────────────────────────┐
   │              Row1               │
   ├──────────┬──────────────────────┤
   │   Row2   │         Row3         │
   ├──────────┴─┬────────────────────┤
   │    Row4    │        Row5        │
   └────────────┴────────────────────┘
   ```
   
   
   The benefit of this strategy is that tuple construction will be very fast and memory usage optimal
   
   There are other strategies that have a fixed width tuples, as discussed on https://github.com/apache/arrow-datafusion/issues/1708 that have benefits (though are likely not as memory optimal)
   
   I think getting optimal performance will come from being able to vectorize many operations for which fixed sized tuples are compelling:
   
   ```
   ┌───────────┬──┐               ┌─────────────┐
   │     Row1  │  ├──────┐        │             │
   ├───────────┼──┤      │ ┌──────┼▶            │
   │     Row2  │  │──────┼┐├──────┼─▶           │
   ├───────────┼──┤      │││      │             │
   │     Row3  │  │──────┼┼┤      │  variable   │
   ├───────────┼──┤      └┼┼─────▶│ length area │
   │     Row4  │  │───────┼┘      │             │
   ├───────────┼──┤       └──────▶│             │
   │     Row5  │  │───────────┐   │             │
   └───────────┴──┘           └───┼───────▶     │
                                  └─────────────┘
   ```
   
   Maybe I can find some time this weekend to play around with some ideas

##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.
+//! - For fixed-length, sequential access fields, we store them directly.
+//!       E.g., 4 bytes for int and 1 byte for bool.
+//! - For fixed-length, update often fields, we store one 8-byte word per field.

Review comment:
       I don't really see a notion of "update often" appearing in this code. Maybe it is future work

##########
File path: datafusion/src/row/reader.rs
##########
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Accessing row from raw bytes

Review comment:
       Here is one for consieration:
   
   ```text
                                                                                                           
    Row Layout                                                                                             
                                                                                                           
   ┌────────────────┬──────────────────────────┬───────────────────────┐        ┌───────────────────────┐  
   │Validity Bitmask│    Fixed Width Field     │ Variable Width Field  │   ...  │     vardata area      │  
   │ (byte aligned) │   (native type width)    │(len + vardata offset) │        │   (variable length)   │  
   └────────────────┴──────────────────────────┴───────────────────────┘        └───────────────────────┘  
                                                                                                           
                                                                                                           
                                                                                                           
    For example, given the schema (Int8, Float32, Utf8, Utf8)                                              
                                                                                                           
    Encoding the tuple (1, NULL, "FooBar", "baz")                                                          
                                                                                                           
    Requires 35 bytes as shown                                                                             
   ┌────────┬────────┬──────────────┬──────────────────────┬──────────────────────┬───────────────────────┐
   │0b000110│  0x01  │  0x00000000  │0x00000000  0x00000006│0x00000006  0x00000003│       FooBarbaz       │
   └────────┴────────┴──────────────┴──────────────────────┴──────────────────────┴───────────────────────┘
   0        1         2             10                     18                     26                     35
                                                                                                           
    Validity    Int8  Float32 Field       Utf8 Field 1         Utf8 Field 2          Variable length       
      Mask     Field    (4 bytes)           Offset: 0            Offset: 6                area             
    (1 byte)  (1 byte)                       Size: 6              Size: 3               (9 bytes)          
                                            (8 bytes)            (8 bytes)                                 
   ```
   
   Also attaching the monopic file in case anyone finds that useful: 
   [drawing.zip](https://github.com/apache/arrow-datafusion/files/8041744/drawing.zip)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1034988805


   @yjshen  let me know if you want to make any changes to this PR otherwise I'll merge it in and we can iterate from there


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r803800523



##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.
+//! - For fixed-length, sequential access fields, we store them directly.
+//!       E.g., 4 bytes for int and 1 byte for bool.
+//! - For fixed-length, update often fields, we store one 8-byte word per field.

Review comment:
       Yes, it's not implemented yet. As discussed in `[Minor adapt]` for aggregation state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1033592720


   Thanks @alamb for the write-up of row use cases. 
   
   **[Use directly]** The current row implementation is mostly targeted at payload use cases, that we do not update or check by index after the first write, and only decompose to record batch at last. This is the case for sort payload, hash aggregate composed grouping key (we can directly compare raw bytes for equality), hash join key, and join payload.
   
   **[Minor adapt]** We should change it a little bit by adhering to word-aligned initializing and updating for aggregation state (for CPU friendly), much as you suggested:
   ``` 
   let state = RowWriter::new()
     .for_aggregate(aggregate_exprs)
     .build();
   ```
   
   **[Minor adapt]** For composite sort key with no varlena, we shall remove the null-bits part, padding null attributes bytes as all 0xFF or all 0x00 (according to null first or null last sort option), and do raw bytes comparison.
   
   **[NOT FIT]** For composite sort key, if var length attributes (varlena) exist and not the last, direct comparison of raw bytes of the current row format doesn't fit. We need to store varlena in place, padding all sorting keys to the longest width, on which we could compare directly using raw bytes.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r803897041



##########
File path: datafusion/src/lib.rs
##########
@@ -223,6 +223,8 @@ pub use arrow;
 pub use parquet;
 
 pub(crate) mod field_util;
+#[allow(dead_code)]

Review comment:
       Added as feature `row`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r801608112



##########
File path: datafusion/src/row/writer.rs
##########
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Reusable row writer backed by Vec<u8> to stitch attributes together
+
+use crate::row::bitmap::{bytes_for, set_bit};
+use crate::row::{estimate_row_width, fixed_size, get_offsets, supported};
+use arrow::array::Array;
+use arrow::datatypes::{DataType, Schema};
+use arrow::record_batch::RecordBatch;
+use std::cmp::max;
+use std::sync::Arc;
+
+/// Append batch from `row_idx` to `output` buffer start from `offset`
+/// # Panics
+///
+/// This function will panic if the output buffer doesn't have enough space to hold all the rows
+pub fn write_batch_unchecked(
+    output: &mut [u8],
+    offset: usize,
+    batch: &RecordBatch,
+    row_idx: usize,
+    schema: Arc<Schema>,
+) -> Vec<usize> {
+    let mut writer = RowWriter::new(&schema);
+    let mut current_offset = offset;
+    let mut offsets = vec![];
+    for cur_row in row_idx..batch.num_rows() {
+        offsets.push(current_offset);
+        let row_width = write_row(&mut writer, cur_row, batch, &schema);
+        output[current_offset..current_offset + row_width]
+            .copy_from_slice(writer.get_row());
+        current_offset += row_width;
+        writer.reset()
+    }
+    offsets
+}
+
+macro_rules! set_idx {
+    ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{
+        $SELF.assert_index_valid($IDX);
+        let offset = $SELF.field_offsets[$IDX];
+        $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes());
+    }};
+}
+
+macro_rules! fn_set_idx {
+    ($NATIVE: ident, $WIDTH: literal) => {
+        paste::item! {
+            fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) {
+                self.assert_index_valid(idx);
+                let offset = self.field_offsets[idx];
+                self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes());
+            }
+        }
+    };
+}
+
+/// Reusable row writer backed by Vec<u8>
+pub struct RowWriter {
+    data: Vec<u8>,
+    field_count: usize,
+    row_width: usize,
+    null_width: usize,
+    values_width: usize,
+    varlena_width: usize,
+    varlena_offset: usize,
+    field_offsets: Vec<usize>,
+}
+
+impl RowWriter {
+    /// new
+    pub fn new(schema: &Arc<Schema>) -> Self {
+        assert!(supported(schema));
+        let field_count = schema.fields().len();
+        let null_width = bytes_for(field_count);
+        let (field_offsets, values_width) = get_offsets(null_width, schema);
+        let mut init_capacity = estimate_row_width(null_width, schema);
+        if !fixed_size(schema) {
+            // double the capacity to avoid repeated resize
+            init_capacity *= 2;
+        }
+        Self {
+            data: vec![0; init_capacity],
+            field_count,
+            row_width: 0,
+            null_width,
+            values_width,
+            varlena_width: 0,
+            varlena_offset: null_width + values_width,
+            field_offsets,
+        }
+    }
+
+    /// Reset the row writer state for new tuple
+    pub fn reset(&mut self) {
+        self.data.fill(0);
+        self.row_width = 0;
+        self.varlena_width = 0;
+        self.varlena_offset = self.null_width + self.values_width;
+    }
+
+    #[inline]
+    fn assert_index_valid(&self, idx: usize) {
+        assert!(idx < self.field_count);
+    }
+
+    fn set_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, false)
+    }
+
+    fn set_non_null_at(&mut self, idx: usize) {
+        let null_bits = &mut self.data[0..self.null_width];
+        set_bit(null_bits, idx, true)
+    }
+
+    fn set_bool(&mut self, idx: usize, value: bool) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = if value { 1 } else { 0 };
+    }
+
+    fn set_u8(&mut self, idx: usize, value: u8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value;
+    }
+
+    fn_set_idx!(u16, 2);
+    fn_set_idx!(u32, 4);
+    fn_set_idx!(u64, 8);
+    fn_set_idx!(i16, 2);
+    fn_set_idx!(i32, 4);
+    fn_set_idx!(i64, 8);
+    fn_set_idx!(f32, 4);
+    fn_set_idx!(f64, 8);
+
+    fn set_i8(&mut self, idx: usize, value: i8) {
+        self.assert_index_valid(idx);
+        let offset = self.field_offsets[idx];
+        self.data[offset] = value.to_le_bytes()[0];
+    }
+
+    fn set_date32(&mut self, idx: usize, value: i32) {
+        set_idx!(4, self, idx, value)
+    }
+
+    fn set_date64(&mut self, idx: usize, value: i64) {
+        set_idx!(8, self, idx, value)
+    }
+
+    fn set_offset_size(&mut self, idx: usize, size: usize) {
+        let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64;

Review comment:
       This should probably panic if `size` or `verlana_offset` are too large, on a related note - perhaps they should be `u64`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r802423209



##########
File path: datafusion/src/row/mod.rs
##########
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An implementation of Row backed by raw bytes
+//!
+//! Each tuple consists of up to three parts: [null bit set] [values] [var length data]
+//!
+//! The null bit set is used for null tracking and is aligned to 1-byte. It stores
+//! one bit per field.
+//!
+//! In the region of the values, we store the fields in the order they are defined in the schema.
+//! - For fixed-length, sequential access fields, we store them directly.
+//!       E.g., 4 bytes for int and 1 byte for bool.
+//! - For fixed-length, update often fields, we store one 8-byte word per field.
+//! - For fields of non-primitive or variable-length types,
+//!       we append their actual content to the end of the var length region and
+//!       store their offset relative to row base and their length, packed into an 8-byte word.
+
+use arrow::datatypes::{DataType, Schema};
+use std::sync::Arc;
+
+mod bitmap;
+mod reader;
+mod writer;
+
+const UTF8_DEFAULT_SIZE: usize = 20;
+const BINARY_DEFAULT_SIZE: usize = 100;
+
+/// Get relative offsets for each field and total width for values
+fn get_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) {
+    let mut offsets = vec![];
+    let mut offset = null_width;
+    for f in schema.fields() {
+        offsets.push(offset);
+        offset += type_width(f.data_type());
+    }
+    (offsets, offset - null_width)
+}
+
+fn supported_type(dt: &DataType) -> bool {
+    use DataType::*;
+    matches!(
+        dt,
+        Boolean
+            | UInt8
+            | UInt16
+            | UInt32
+            | UInt64
+            | Int8
+            | Int16
+            | Int32
+            | Int64
+            | Float32
+            | Float64
+            | Date32
+            | Date64
+            | Utf8
+            | Binary
+    )
+}
+
+fn var_length(dt: &DataType) -> bool {
+    use DataType::*;
+    matches!(dt, Utf8 | Binary)
+}
+
+fn type_width(dt: &DataType) -> usize {
+    use DataType::*;
+    if var_length(dt) {
+        return 8;
+    }
+    match dt {
+        Boolean | UInt8 | Int8 => 1,
+        UInt16 | Int16 => 2,
+        UInt32 | Int32 | Float32 | Date32 => 4,
+        UInt64 | Int64 | Float64 | Date64 => 8,
+        _ => unreachable!(),
+    }
+}
+
+fn estimate_row_width(null_width: usize, schema: &Arc<Schema>) -> usize {
+    let mut width = null_width;
+    for f in schema.fields() {
+        width += type_width(f.data_type());
+        match f.data_type() {
+            DataType::Utf8 => width += UTF8_DEFAULT_SIZE,
+            DataType::Binary => width += BINARY_DEFAULT_SIZE,
+            _ => {}
+        }
+    }
+    (width.saturating_add(7) / 8) * 8
+}
+
+fn fixed_size(schema: &Arc<Schema>) -> bool {
+    schema.fields().iter().all(|f| !var_length(f.data_type()))
+}
+
+fn supported(schema: &Arc<Schema>) -> bool {
+    schema
+        .fields()
+        .iter()
+        .all(|f| supported_type(f.data_type()))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::datasource::file_format::parquet::ParquetFormat;
+    use crate::datasource::file_format::FileFormat;
+    use crate::datasource::object_store::local::{
+        local_object_reader, local_object_reader_stream, local_unpartitioned_file,
+        LocalFileSystem,
+    };
+    use crate::error::Result;
+    use crate::execution::runtime_env::RuntimeEnv;
+    use crate::physical_plan::file_format::FileScanConfig;
+    use crate::physical_plan::{collect, ExecutionPlan};
+    use crate::row::reader::read_as_batch;
+    use crate::row::writer::write_batch_unchecked;
+    use arrow::record_batch::RecordBatch;
+    use arrow::{array::*, datatypes::*};
+    use DataType::*;
+
+    macro_rules! fn_test_single_type {
+        ($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
+            paste::item! {
+                #[test]
+                #[allow(non_snake_case)]
+                fn [<test_single_ $TYPE>]() -> Result<()> {
+                    let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
+                    let a = $ARRAY::from($VEC);
+                    let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
+                    let mut vector = vec![0; 1024];
+                    let row_offsets =
+                        { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
+                    let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? };
+                    assert_eq!(batch, output_batch);
+                    Ok(())
+                }
+            }
+        };
+    }
+
+    fn_test_single_type!(
+        BooleanArray,
+        Boolean,
+        vec![Some(true), Some(false), None, Some(true), None]
+    );
+
+    fn_test_single_type!(
+        Int8Array,
+        Int8,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Int16Array,
+        Int16,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Int32Array,
+        Int32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Int64Array,
+        Int64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        UInt8Array,
+        UInt8,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        UInt16Array,
+        UInt16,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        UInt32Array,
+        UInt32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        UInt64Array,
+        UInt64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Float32Array,
+        Float32,
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+    );
+
+    fn_test_single_type!(
+        Float64Array,
+        Float64,
+        vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
+    );
+
+    fn_test_single_type!(
+        Date32Array,
+        Date32,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        Date64Array,
+        Date64,
+        vec![Some(5), Some(7), None, Some(0), Some(111)]
+    );
+
+    fn_test_single_type!(
+        StringArray,
+        Utf8,
+        vec![Some("hello"), Some("world"), None, Some(""), Some("")]
+    );
+
+    #[test]
+    fn test_single_binary() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)]));
+        let values: Vec<Option<&[u8]>> =
+            vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")];
+        let a = BinaryArray::from_opt_vec(values);
+        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
+        let mut vector = vec![0; 8192];
+        let row_offsets =
+            { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) };
+        let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? };
+        assert_eq!(batch, output_batch);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_parquet() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
+        let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
+        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+        let schema = exec.schema().clone();
+
+        let batches = collect(exec, runtime).await?;
+        assert_eq!(1, batches.len());
+        let batch = &batches[0];
+
+        let mut vector = vec![0; 20480];
+        let row_offsets =
+            { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) };
+        let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? };
+        assert_eq!(*batch, output_batch);
+
+        Ok(())
+    }
+
+    #[test]
+    #[should_panic(expected = "supported(schema)")]
+    fn test_unsupported_type_write() {
+        let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8]));

Review comment:
       I add this as a TODO item and will add this later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1033592720


   Thanks @alamb for the write-up of row use cases. 
   
   The current row implementation is mostly targeted at payload use cases, that we do not update or check by index after the first write, and only decompose to record batch at last. This is the case for sort payload, hash aggregate composed grouping key (we can directly compare raw bytes for equality), hash join key, and join payload.
   
   And we should change it a little bit by adhering to word-aligned initializing and updating for aggregation state (for CPU friendly), much as you suggested:
   ``` 
   let state = RowWriter::new()
     .for_aggregate(aggregate_exprs)
     .build();
   ```
   
   For composite sort key with no varlena, we shall remove the null-bits part, padding null attributes bytes as all 0xFF or all 0x00 (according to null first or null last sort option), and do raw bytes comparison.
   
   For composite sort key, if var length attributes (varlena) exist and not the last, direct comparison of raw bytes of the current row format doesn't fit. We need to store varlena in place, padding all sorting keys to the longest width, on which we could compare directly using raw bytes.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r803909394



##########
File path: datafusion/src/row/reader.rs
##########
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Accessing row from raw bytes

Review comment:
       I've adapted the fig a little bit and put it on the module doc. Many thanks, @alamb!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1035106524


   Thanks @alamb for all the thoughts! 
   
   ~~The PR currently relies on `arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }` since the need to access ArrayBuilder function. Do you think I should  make this PR behind a non-default feature and just remove the `patch.crates-io` section?~~
   Remove the `patch.crates-io` section, move this PR all behind the feature `row`. I've tested `row` feature succeed locally. While experimenting on `row`, please add these back before arrow 9.0.3 is released:
   
   ```
   [patch.crates-io]
   arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }
   parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }
   ```
   
   ~~Besides, I'd like to incorporate your `bit_util` comments as well the row diagram into the PR.~~ [done]
   
   I've listed several TODOs in the PR description, will do as follow-ups.
   
   I think the PR is ready to be merged now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #1782: Introduce `Row` format backed by raw bytes

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #1782:
URL: https://github.com/apache/arrow-datafusion/pull/1782#issuecomment-1035106524


   Thanks @alamb for all the thoughts! 
   
   ~~The PR currently relies on `arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }` since the need to access ArrayBuilder function. Do you think I should  make this PR behind a non-default feature and just remove the `patch.crates-io` section?~~
   Remove the `patch.crates-io` section, move this PR all behind the feature `row`. I've tested `row` feature succeed locally. While experimenting on `row`, please add these back before arrow 9.0.3 is released:
   
   ```
   [patch.crates-io]
   arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }
   parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" }
   ```
   
   ~~Besides, I'd like to incorporate you `bit_util` comments as well the row diagram into the PR.~~
   
   I've listed several TODOs in the PR description, will do as follow-up PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org