You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/21 17:30:22 UTC
[arrow-datafusion] branch master updated: Make row its crate to make it accessible from physical-expr (#2283)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new cd8bfb46b Make row its crate to make it accessible from physical-expr (#2283)
cd8bfb46b is described below
commit cd8bfb46b2f20ac9d44d22e66883e41cecd683e1
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Fri Apr 22 01:30:16 2022 +0800
Make row its crate to make it accessible from physical-expr (#2283)
---
Cargo.toml | 1 +
datafusion/core/Cargo.toml | 7 +-
datafusion/core/src/lib.rs | 6 +-
datafusion/core/tests/row.rs | 108 +++++++++++++++++++++
datafusion/row/Cargo.toml | 45 +++++++++
datafusion/{core/src/row => row/src}/jit/mod.rs | 8 +-
datafusion/{core/src/row => row/src}/jit/reader.rs | 12 +--
datafusion/{core/src/row => row/src}/jit/writer.rs | 12 +--
datafusion/{core/src/row => row/src}/layout.rs | 3 +-
datafusion/{core/src/row/mod.rs => row/src/lib.rs} | 97 +-----------------
datafusion/{core/src/row => row/src}/reader.rs | 10 +-
datafusion/{core/src/row => row/src}/validity.rs | 0
datafusion/{core/src/row => row/src}/writer.rs | 4 +-
13 files changed, 192 insertions(+), 121 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 0c0142849..fefd5679a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,6 +23,7 @@ members = [
"datafusion/jit",
"datafusion/physical-expr",
"datafusion/proto",
+ "datafusion/row",
"datafusion-examples",
"benchmarks",
"ballista/rust/client",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index ea4d80981..533b38b81 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -49,7 +49,7 @@ jit = ["datafusion-jit"]
pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions"]
# Used to enable row format experiment
-row = []
+row = ["datafusion-row"]
simd = ["arrow/simd"]
unicode_expressions = ["datafusion-physical-expr/regex_expressions"]
@@ -64,6 +64,7 @@ datafusion-data-access = { path = "../../data-access", version = "1.0.0" }
datafusion-expr = { path = "../expr", version = "7.0.0" }
datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
datafusion-physical-expr = { path = "../physical-expr", version = "7.0.0" }
+datafusion-row = { path = "../row", version = "7.0.0", optional = true }
futures = "0.3"
hashbrown = { version = "0.12", features = ["raw"] }
lazy_static = { version = "^1.4.0" }
@@ -125,3 +126,7 @@ name = "parquet_query_sql"
harness = false
name = "jit"
required-features = ["row", "jit"]
+
+[[test]]
+name = "row"
+required-features = ["row"]
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index a2bc7e7ea..fd08d3a0a 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -231,12 +231,12 @@ pub use datafusion_data_access;
pub use datafusion_expr as logical_expr;
pub use datafusion_physical_expr as physical_expr;
+#[cfg(feature = "row")]
+pub use datafusion_row as row;
+
#[cfg(feature = "jit")]
pub use datafusion_jit as jit;
-#[cfg(feature = "row")]
-pub mod row;
-
pub mod from_slice;
#[cfg(test)]
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
new file mode 100644
index 000000000..bda107f13
--- /dev/null
+++ b/datafusion/core/tests/row.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::listing::local_unpartitioned_file;
+use datafusion::error::Result;
+use datafusion::physical_plan::file_format::FileScanConfig;
+use datafusion::physical_plan::{collect, ExecutionPlan};
+use datafusion::prelude::SessionContext;
+use datafusion_data_access::object_store::local::LocalFileSystem;
+use datafusion_data_access::object_store::local::{
+ local_object_reader, local_object_reader_stream,
+};
+use datafusion_row::layout::RowType::{Compact, WordAligned};
+use datafusion_row::reader::read_as_batch;
+use datafusion_row::writer::write_batch_unchecked;
+use std::sync::Arc;
+
+#[tokio::test]
+async fn test_with_parquet() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
+ let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let schema = exec.schema().clone();
+
+ let batches = collect(exec, task_ctx).await?;
+ assert_eq!(1, batches.len());
+ let batch = &batches[0];
+
+ let mut vector = vec![0; 20480];
+ let row_offsets =
+ { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) };
+ let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
+ assert_eq!(*batch, output_batch);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_with_parquet_word_aligned() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
+ let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+ let schema = exec.schema().clone();
+
+ let batches = collect(exec, task_ctx).await?;
+ assert_eq!(1, batches.len());
+ let batch = &batches[0];
+
+ let mut vector = vec![0; 20480];
+ let row_offsets =
+ { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned) };
+ let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? };
+ assert_eq!(*batch, output_batch);
+
+ Ok(())
+}
+
+async fn get_exec(
+ file_name: &str,
+ projection: &Option<Vec<usize>>,
+ limit: Option<usize>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ let testdata = datafusion::test_util::parquet_test_data();
+ let filename = format!("{}/{}", testdata, file_name);
+ let format = ParquetFormat::default();
+ let file_schema = format
+ .infer_schema(local_object_reader_stream(vec![filename.clone()]))
+ .await
+ .expect("Schema inference");
+ let statistics = format
+ .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
+ .await
+ .expect("Stats inference");
+ let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
+ let exec = format
+ .create_physical_plan(
+ FileScanConfig {
+ object_store: Arc::new(LocalFileSystem {}),
+ file_schema,
+ file_groups,
+ statistics,
+ projection: projection.clone(),
+ limit,
+ table_partition_cols: vec![],
+ },
+ &[],
+ )
+ .await?;
+ Ok(exec)
+}
diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml
new file mode 100644
index 000000000..26b517300
--- /dev/null
+++ b/datafusion/row/Cargo.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-row"
+description = "Row backed by raw bytes for DataFusion query engine"
+version = "7.0.0"
+homepage = "https://github.com/apache/arrow-datafusion"
+repository = "https://github.com/apache/arrow-datafusion"
+readme = "../README.md"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = [ "arrow", "query", "sql" ]
+edition = "2021"
+rust-version = "1.59"
+
+[lib]
+name = "datafusion_row"
+path = "src/lib.rs"
+
+[features]
+# Used to enable JIT code generation
+jit = ["datafusion-jit"]
+
+[dependencies]
+arrow = { version = "12" }
+datafusion-common = { path = "../common", version = "7.0.0" }
+datafusion-expr = { path = "../expr", version = "7.0.0" }
+datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
+paste = "^1.0"
+rand = "0.8"
diff --git a/datafusion/core/src/row/jit/mod.rs b/datafusion/row/src/jit/mod.rs
similarity index 97%
rename from datafusion/core/src/row/jit/mod.rs
rename to datafusion/row/src/jit/mod.rs
index 7ee76a9b4..03b8eed18 100644
--- a/datafusion/core/src/row/jit/mod.rs
+++ b/datafusion/row/src/jit/mod.rs
@@ -43,12 +43,12 @@ fn fn_name<T>(f: T) -> &'static str {
#[cfg(test)]
mod tests {
- use crate::error::Result;
- use crate::row::jit::reader::read_as_batch_jit;
- use crate::row::jit::writer::write_batch_unchecked_jit;
- use crate::row::layout::RowType::{Compact, WordAligned};
+ use crate::jit::reader::read_as_batch_jit;
+ use crate::jit::writer::write_batch_unchecked_jit;
+ use crate::layout::RowType::{Compact, WordAligned};
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
+ use datafusion_common::Result;
use datafusion_jit::api::Assembler;
use std::sync::Arc;
use DataType::*;
diff --git a/datafusion/core/src/row/jit/reader.rs b/datafusion/row/src/jit/reader.rs
similarity index 97%
rename from datafusion/core/src/row/jit/reader.rs
rename to datafusion/row/src/jit/reader.rs
index 80e10131f..276271a20 100644
--- a/datafusion/core/src/row/jit/reader.rs
+++ b/datafusion/row/src/jit/reader.rs
@@ -17,16 +17,16 @@
//! Accessing row from raw bytes with JIT
-use crate::error::{DataFusionError, Result};
+use crate::jit::fn_name;
+use crate::layout::RowType;
+use crate::reader::RowReader;
+use crate::reader::*;
use crate::reg_fn;
-use crate::row::jit::fn_name;
-use crate::row::layout::RowType;
-use crate::row::reader::RowReader;
-use crate::row::reader::*;
-use crate::row::MutableRecordBatch;
+use crate::MutableRecordBatch;
use arrow::array::ArrayBuilder;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
use datafusion_jit::api::Assembler;
use datafusion_jit::api::GeneratedFunction;
use datafusion_jit::ast::{I64, PTR};
diff --git a/datafusion/core/src/row/jit/writer.rs b/datafusion/row/src/jit/writer.rs
similarity index 97%
rename from datafusion/core/src/row/jit/writer.rs
rename to datafusion/row/src/jit/writer.rs
index ae9ff1308..3d8a0ba07 100644
--- a/datafusion/core/src/row/jit/writer.rs
+++ b/datafusion/row/src/jit/writer.rs
@@ -17,16 +17,16 @@
//! Reusable JIT version of row writer backed by Vec<u8> to stitch attributes together
-use crate::error::Result;
+use crate::jit::fn_name;
+use crate::layout::RowType;
use crate::reg_fn;
-use crate::row::jit::fn_name;
-use crate::row::layout::RowType;
-use crate::row::schema_null_free;
-use crate::row::writer::RowWriter;
-use crate::row::writer::*;
+use crate::schema_null_free;
+use crate::writer::RowWriter;
+use crate::writer::*;
use arrow::array::Array;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
use datafusion_jit::api::CodeBlock;
use datafusion_jit::api::{Assembler, GeneratedFunction};
use datafusion_jit::ast::Expr;
diff --git a/datafusion/core/src/row/layout.rs b/datafusion/row/src/layout.rs
similarity index 99%
rename from datafusion/core/src/row/layout.rs
rename to datafusion/row/src/layout.rs
index c14f9fa69..b017d1958 100644
--- a/datafusion/core/src/row/layout.rs
+++ b/datafusion/row/src/layout.rs
@@ -17,7 +17,7 @@
//! Various row layout for different use case
-use crate::row::schema_null_free;
+use crate::schema_null_free;
use arrow::datatypes::{DataType, Schema};
use arrow::util::bit_util::{ceil, round_upto_power_of_2};
@@ -41,7 +41,6 @@ pub enum RowType {
#[derive(Debug)]
pub(crate) struct RowLayout {
/// Type of the layout
- #[allow(dead_code)]
row_type: RowType,
/// If a row is null free according to its schema
pub(crate) null_free: bool,
diff --git a/datafusion/core/src/row/mod.rs b/datafusion/row/src/lib.rs
similarity index 79%
rename from datafusion/core/src/row/mod.rs
rename to datafusion/row/src/lib.rs
index f8e9ff273..54c112dd5 100644
--- a/datafusion/core/src/row/mod.rs
+++ b/datafusion/row/src/lib.rs
@@ -56,7 +56,7 @@ use std::sync::Arc;
#[cfg(feature = "jit")]
pub mod jit;
-mod layout;
+pub mod layout;
pub mod reader;
mod validity;
pub mod writer;
@@ -108,22 +108,12 @@ fn make_batch(
#[cfg(test)]
mod tests {
use super::*;
- use crate::datasource::file_format::parquet::ParquetFormat;
- use crate::datasource::file_format::FileFormat;
- use crate::datasource::listing::local_unpartitioned_file;
- use crate::error::Result;
- use crate::physical_plan::file_format::FileScanConfig;
- use crate::physical_plan::{collect, ExecutionPlan};
- use crate::prelude::SessionContext;
- use crate::row::layout::RowType::{Compact, WordAligned};
- use crate::row::reader::read_as_batch;
- use crate::row::writer::write_batch_unchecked;
+ use crate::layout::RowType::{Compact, WordAligned};
+ use crate::reader::read_as_batch;
+ use crate::writer::write_batch_unchecked;
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
- use datafusion_data_access::object_store::local::LocalFileSystem;
- use datafusion_data_access::object_store::local::{
- local_object_reader, local_object_reader_stream,
- };
+ use datafusion_common::Result;
use DataType::*;
macro_rules! fn_test_single_type {
@@ -389,49 +379,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_with_parquet() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
- let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
- let schema = exec.schema().clone();
-
- let batches = collect(exec, task_ctx).await?;
- assert_eq!(1, batches.len());
- let batch = &batches[0];
-
- let mut vector = vec![0; 20480];
- let row_offsets =
- { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
- assert_eq!(*batch, output_batch);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_with_parquet_word_aligned() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
- let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
- let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
- let schema = exec.schema().clone();
-
- let batches = collect(exec, task_ctx).await?;
- assert_eq!(1, batches.len());
- let batch = &batches[0];
-
- let mut vector = vec![0; 20480];
- let row_offsets = {
- write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned)
- };
- let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? };
- assert_eq!(*batch, output_batch);
-
- Ok(())
- }
-
#[test]
#[should_panic(expected = "row_supported(schema, row_type)")]
fn test_unsupported_type_write() {
@@ -454,38 +401,4 @@ mod tests {
let row_offsets = vec![0];
read_as_batch(&vector, schema, &row_offsets, Compact).unwrap();
}
-
- async fn get_exec(
- file_name: &str,
- projection: &Option<Vec<usize>>,
- limit: Option<usize>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let testdata = crate::test_util::parquet_test_data();
- let filename = format!("{}/{}", testdata, file_name);
- let format = ParquetFormat::default();
- let file_schema = format
- .infer_schema(local_object_reader_stream(vec![filename.clone()]))
- .await
- .expect("Schema inference");
- let statistics = format
- .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
- .await
- .expect("Stats inference");
- let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
- let exec = format
- .create_physical_plan(
- FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
- file_schema,
- file_groups,
- statistics,
- projection: projection.clone(),
- limit,
- table_partition_cols: vec![],
- },
- &[],
- )
- .await?;
- Ok(exec)
- }
}
diff --git a/datafusion/core/src/row/reader.rs b/datafusion/row/src/reader.rs
similarity index 98%
rename from datafusion/core/src/row/reader.rs
rename to datafusion/row/src/reader.rs
index abaf57c14..e7ee004b0 100644
--- a/datafusion/core/src/row/reader.rs
+++ b/datafusion/row/src/reader.rs
@@ -17,14 +17,14 @@
//! Accessing row from raw bytes
-use crate::error::{DataFusionError, Result};
-use crate::row::layout::{RowLayout, RowType};
-use crate::row::validity::{all_valid, NullBitsFormatter};
-use crate::row::MutableRecordBatch;
+use crate::layout::{RowLayout, RowType};
+use crate::validity::{all_valid, NullBitsFormatter};
+use crate::MutableRecordBatch;
use arrow::array::*;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util::get_bit_raw;
+use datafusion_common::{DataFusionError, Result};
use std::sync::Arc;
/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch
@@ -43,7 +43,7 @@ pub fn read_as_batch(
read_row(&row, &mut output, &schema);
}
- Ok(output.output()?)
+ output.output().map_err(DataFusionError::ArrowError)
}
macro_rules! get_idx {
diff --git a/datafusion/core/src/row/validity.rs b/datafusion/row/src/validity.rs
similarity index 100%
rename from datafusion/core/src/row/validity.rs
rename to datafusion/row/src/validity.rs
diff --git a/datafusion/core/src/row/writer.rs b/datafusion/row/src/writer.rs
similarity index 99%
rename from datafusion/core/src/row/writer.rs
rename to datafusion/row/src/writer.rs
index 920eb9963..6b9ffdc0e 100644
--- a/datafusion/core/src/row/writer.rs
+++ b/datafusion/row/src/writer.rs
@@ -17,12 +17,12 @@
//! Reusable row writer backed by Vec<u8> to stitch attributes together
-use crate::error::Result;
-use crate::row::layout::{estimate_row_width, RowLayout, RowType};
+use crate::layout::{estimate_row_width, RowLayout, RowType};
use arrow::array::*;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util::{round_upto_power_of_2, set_bit_raw, unset_bit_raw};
+use datafusion_common::Result;
use std::cmp::max;
use std::sync::Arc;