You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/21 17:30:22 UTC

[arrow-datafusion] branch master updated: Make row its crate to make it accessible from physical-expr (#2283)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new cd8bfb46b Make row its crate to make it accessible from physical-expr (#2283)
cd8bfb46b is described below

commit cd8bfb46b2f20ac9d44d22e66883e41cecd683e1
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Fri Apr 22 01:30:16 2022 +0800

    Make row its crate to make it accessible from physical-expr (#2283)
---
 Cargo.toml                                         |   1 +
 datafusion/core/Cargo.toml                         |   7 +-
 datafusion/core/src/lib.rs                         |   6 +-
 datafusion/core/tests/row.rs                       | 108 +++++++++++++++++++++
 datafusion/row/Cargo.toml                          |  45 +++++++++
 datafusion/{core/src/row => row/src}/jit/mod.rs    |   8 +-
 datafusion/{core/src/row => row/src}/jit/reader.rs |  12 +--
 datafusion/{core/src/row => row/src}/jit/writer.rs |  12 +--
 datafusion/{core/src/row => row/src}/layout.rs     |   3 +-
 datafusion/{core/src/row/mod.rs => row/src/lib.rs} |  97 +-----------------
 datafusion/{core/src/row => row/src}/reader.rs     |  10 +-
 datafusion/{core/src/row => row/src}/validity.rs   |   0
 datafusion/{core/src/row => row/src}/writer.rs     |   4 +-
 13 files changed, 192 insertions(+), 121 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 0c0142849..fefd5679a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,6 +23,7 @@ members = [
     "datafusion/jit",
     "datafusion/physical-expr",
     "datafusion/proto",
+    "datafusion/row",
     "datafusion-examples",
     "benchmarks",
     "ballista/rust/client",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index ea4d80981..533b38b81 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -49,7 +49,7 @@ jit = ["datafusion-jit"]
 pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]
 regex_expressions = ["datafusion-physical-expr/regex_expressions"]
 # Used to enable row format experiment
-row = []
+row = ["datafusion-row"]
 simd = ["arrow/simd"]
 unicode_expressions = ["datafusion-physical-expr/regex_expressions"]
 
@@ -64,6 +64,7 @@ datafusion-data-access = { path = "../../data-access", version = "1.0.0" }
 datafusion-expr = { path = "../expr", version = "7.0.0" }
 datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
 datafusion-physical-expr = { path = "../physical-expr", version = "7.0.0" }
+datafusion-row = { path = "../row", version = "7.0.0", optional = true }
 futures = "0.3"
 hashbrown = { version = "0.12", features = ["raw"] }
 lazy_static = { version = "^1.4.0" }
@@ -125,3 +126,7 @@ name = "parquet_query_sql"
 harness = false
 name = "jit"
 required-features = ["row", "jit"]
+
+[[test]]
+name = "row"
+required-features = ["row"]
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index a2bc7e7ea..fd08d3a0a 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -231,12 +231,12 @@ pub use datafusion_data_access;
 pub use datafusion_expr as logical_expr;
 pub use datafusion_physical_expr as physical_expr;
 
+#[cfg(feature = "row")]
+pub use datafusion_row as row;
+
 #[cfg(feature = "jit")]
 pub use datafusion_jit as jit;
 
-#[cfg(feature = "row")]
-pub mod row;
-
 pub mod from_slice;
 
 #[cfg(test)]
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
new file mode 100644
index 000000000..bda107f13
--- /dev/null
+++ b/datafusion/core/tests/row.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::FileFormat;
+use datafusion::datasource::listing::local_unpartitioned_file;
+use datafusion::error::Result;
+use datafusion::physical_plan::file_format::FileScanConfig;
+use datafusion::physical_plan::{collect, ExecutionPlan};
+use datafusion::prelude::SessionContext;
+use datafusion_data_access::object_store::local::LocalFileSystem;
+use datafusion_data_access::object_store::local::{
+    local_object_reader, local_object_reader_stream,
+};
+use datafusion_row::layout::RowType::{Compact, WordAligned};
+use datafusion_row::reader::read_as_batch;
+use datafusion_row::writer::write_batch_unchecked;
+use std::sync::Arc;
+
+#[tokio::test]
+async fn test_with_parquet() -> Result<()> {
+    let session_ctx = SessionContext::new();
+    let task_ctx = session_ctx.task_ctx();
+    let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
+    let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+    let schema = exec.schema().clone();
+
+    let batches = collect(exec, task_ctx).await?;
+    assert_eq!(1, batches.len());
+    let batch = &batches[0];
+
+    let mut vector = vec![0; 20480];
+    let row_offsets =
+        { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) };
+    let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
+    assert_eq!(*batch, output_batch);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_with_parquet_word_aligned() -> Result<()> {
+    let session_ctx = SessionContext::new();
+    let task_ctx = session_ctx.task_ctx();
+    let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
+    let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
+    let schema = exec.schema().clone();
+
+    let batches = collect(exec, task_ctx).await?;
+    assert_eq!(1, batches.len());
+    let batch = &batches[0];
+
+    let mut vector = vec![0; 20480];
+    let row_offsets =
+        { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned) };
+    let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? };
+    assert_eq!(*batch, output_batch);
+
+    Ok(())
+}
+
+async fn get_exec(
+    file_name: &str,
+    projection: &Option<Vec<usize>>,
+    limit: Option<usize>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    let testdata = datafusion::test_util::parquet_test_data();
+    let filename = format!("{}/{}", testdata, file_name);
+    let format = ParquetFormat::default();
+    let file_schema = format
+        .infer_schema(local_object_reader_stream(vec![filename.clone()]))
+        .await
+        .expect("Schema inference");
+    let statistics = format
+        .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
+        .await
+        .expect("Stats inference");
+    let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
+    let exec = format
+        .create_physical_plan(
+            FileScanConfig {
+                object_store: Arc::new(LocalFileSystem {}),
+                file_schema,
+                file_groups,
+                statistics,
+                projection: projection.clone(),
+                limit,
+                table_partition_cols: vec![],
+            },
+            &[],
+        )
+        .await?;
+    Ok(exec)
+}
diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml
new file mode 100644
index 000000000..26b517300
--- /dev/null
+++ b/datafusion/row/Cargo.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-row"
+description = "Row backed by raw bytes for DataFusion query engine"
+version = "7.0.0"
+homepage = "https://github.com/apache/arrow-datafusion"
+repository = "https://github.com/apache/arrow-datafusion"
+readme = "../README.md"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = [ "arrow", "query", "sql" ]
+edition = "2021"
+rust-version = "1.59"
+
+[lib]
+name = "datafusion_row"
+path = "src/lib.rs"
+
+[features]
+# Used to enable JIT code generation
+jit = ["datafusion-jit"]
+
+[dependencies]
+arrow = { version = "12" }
+datafusion-common = { path = "../common", version = "7.0.0" }
+datafusion-expr = { path = "../expr", version = "7.0.0" }
+datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
+paste = "^1.0"
+rand = "0.8"
diff --git a/datafusion/core/src/row/jit/mod.rs b/datafusion/row/src/jit/mod.rs
similarity index 97%
rename from datafusion/core/src/row/jit/mod.rs
rename to datafusion/row/src/jit/mod.rs
index 7ee76a9b4..03b8eed18 100644
--- a/datafusion/core/src/row/jit/mod.rs
+++ b/datafusion/row/src/jit/mod.rs
@@ -43,12 +43,12 @@ fn fn_name<T>(f: T) -> &'static str {
 
 #[cfg(test)]
 mod tests {
-    use crate::error::Result;
-    use crate::row::jit::reader::read_as_batch_jit;
-    use crate::row::jit::writer::write_batch_unchecked_jit;
-    use crate::row::layout::RowType::{Compact, WordAligned};
+    use crate::jit::reader::read_as_batch_jit;
+    use crate::jit::writer::write_batch_unchecked_jit;
+    use crate::layout::RowType::{Compact, WordAligned};
     use arrow::record_batch::RecordBatch;
     use arrow::{array::*, datatypes::*};
+    use datafusion_common::Result;
     use datafusion_jit::api::Assembler;
     use std::sync::Arc;
     use DataType::*;
diff --git a/datafusion/core/src/row/jit/reader.rs b/datafusion/row/src/jit/reader.rs
similarity index 97%
rename from datafusion/core/src/row/jit/reader.rs
rename to datafusion/row/src/jit/reader.rs
index 80e10131f..276271a20 100644
--- a/datafusion/core/src/row/jit/reader.rs
+++ b/datafusion/row/src/jit/reader.rs
@@ -17,16 +17,16 @@
 
 //! Accessing row from raw bytes with JIT
 
-use crate::error::{DataFusionError, Result};
+use crate::jit::fn_name;
+use crate::layout::RowType;
+use crate::reader::RowReader;
+use crate::reader::*;
 use crate::reg_fn;
-use crate::row::jit::fn_name;
-use crate::row::layout::RowType;
-use crate::row::reader::RowReader;
-use crate::row::reader::*;
-use crate::row::MutableRecordBatch;
+use crate::MutableRecordBatch;
 use arrow::array::ArrayBuilder;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
 use datafusion_jit::api::Assembler;
 use datafusion_jit::api::GeneratedFunction;
 use datafusion_jit::ast::{I64, PTR};
diff --git a/datafusion/core/src/row/jit/writer.rs b/datafusion/row/src/jit/writer.rs
similarity index 97%
rename from datafusion/core/src/row/jit/writer.rs
rename to datafusion/row/src/jit/writer.rs
index ae9ff1308..3d8a0ba07 100644
--- a/datafusion/core/src/row/jit/writer.rs
+++ b/datafusion/row/src/jit/writer.rs
@@ -17,16 +17,16 @@
 
 //! Reusable JIT version of row writer backed by Vec<u8> to stitch attributes together
 
-use crate::error::Result;
+use crate::jit::fn_name;
+use crate::layout::RowType;
 use crate::reg_fn;
-use crate::row::jit::fn_name;
-use crate::row::layout::RowType;
-use crate::row::schema_null_free;
-use crate::row::writer::RowWriter;
-use crate::row::writer::*;
+use crate::schema_null_free;
+use crate::writer::RowWriter;
+use crate::writer::*;
 use arrow::array::Array;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
 use datafusion_jit::api::CodeBlock;
 use datafusion_jit::api::{Assembler, GeneratedFunction};
 use datafusion_jit::ast::Expr;
diff --git a/datafusion/core/src/row/layout.rs b/datafusion/row/src/layout.rs
similarity index 99%
rename from datafusion/core/src/row/layout.rs
rename to datafusion/row/src/layout.rs
index c14f9fa69..b017d1958 100644
--- a/datafusion/core/src/row/layout.rs
+++ b/datafusion/row/src/layout.rs
@@ -17,7 +17,7 @@
 
 //! Various row layout for different use case
 
-use crate::row::schema_null_free;
+use crate::schema_null_free;
 use arrow::datatypes::{DataType, Schema};
 use arrow::util::bit_util::{ceil, round_upto_power_of_2};
 
@@ -41,7 +41,6 @@ pub enum RowType {
 #[derive(Debug)]
 pub(crate) struct RowLayout {
     /// Type of the layout
-    #[allow(dead_code)]
     row_type: RowType,
     /// If a row is null free according to its schema
     pub(crate) null_free: bool,
diff --git a/datafusion/core/src/row/mod.rs b/datafusion/row/src/lib.rs
similarity index 79%
rename from datafusion/core/src/row/mod.rs
rename to datafusion/row/src/lib.rs
index f8e9ff273..54c112dd5 100644
--- a/datafusion/core/src/row/mod.rs
+++ b/datafusion/row/src/lib.rs
@@ -56,7 +56,7 @@ use std::sync::Arc;
 
 #[cfg(feature = "jit")]
 pub mod jit;
-mod layout;
+pub mod layout;
 pub mod reader;
 mod validity;
 pub mod writer;
@@ -108,22 +108,12 @@ fn make_batch(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::datasource::file_format::parquet::ParquetFormat;
-    use crate::datasource::file_format::FileFormat;
-    use crate::datasource::listing::local_unpartitioned_file;
-    use crate::error::Result;
-    use crate::physical_plan::file_format::FileScanConfig;
-    use crate::physical_plan::{collect, ExecutionPlan};
-    use crate::prelude::SessionContext;
-    use crate::row::layout::RowType::{Compact, WordAligned};
-    use crate::row::reader::read_as_batch;
-    use crate::row::writer::write_batch_unchecked;
+    use crate::layout::RowType::{Compact, WordAligned};
+    use crate::reader::read_as_batch;
+    use crate::writer::write_batch_unchecked;
     use arrow::record_batch::RecordBatch;
     use arrow::{array::*, datatypes::*};
-    use datafusion_data_access::object_store::local::LocalFileSystem;
-    use datafusion_data_access::object_store::local::{
-        local_object_reader, local_object_reader_stream,
-    };
+    use datafusion_common::Result;
     use DataType::*;
 
     macro_rules! fn_test_single_type {
@@ -389,49 +379,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_with_parquet() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
-        let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
-        let schema = exec.schema().clone();
-
-        let batches = collect(exec, task_ctx).await?;
-        assert_eq!(1, batches.len());
-        let batch = &batches[0];
-
-        let mut vector = vec![0; 20480];
-        let row_offsets =
-            { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) };
-        let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
-        assert_eq!(*batch, output_batch);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_with_parquet_word_aligned() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
-        let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]);
-        let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
-        let schema = exec.schema().clone();
-
-        let batches = collect(exec, task_ctx).await?;
-        assert_eq!(1, batches.len());
-        let batch = &batches[0];
-
-        let mut vector = vec![0; 20480];
-        let row_offsets = {
-            write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned)
-        };
-        let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? };
-        assert_eq!(*batch, output_batch);
-
-        Ok(())
-    }
-
     #[test]
     #[should_panic(expected = "row_supported(schema, row_type)")]
     fn test_unsupported_type_write() {
@@ -454,38 +401,4 @@ mod tests {
         let row_offsets = vec![0];
         read_as_batch(&vector, schema, &row_offsets, Compact).unwrap();
     }
-
-    async fn get_exec(
-        file_name: &str,
-        projection: &Option<Vec<usize>>,
-        limit: Option<usize>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let testdata = crate::test_util::parquet_test_data();
-        let filename = format!("{}/{}", testdata, file_name);
-        let format = ParquetFormat::default();
-        let file_schema = format
-            .infer_schema(local_object_reader_stream(vec![filename.clone()]))
-            .await
-            .expect("Schema inference");
-        let statistics = format
-            .infer_stats(local_object_reader(filename.clone()), file_schema.clone())
-            .await
-            .expect("Stats inference");
-        let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]];
-        let exec = format
-            .create_physical_plan(
-                FileScanConfig {
-                    object_store: Arc::new(LocalFileSystem {}),
-                    file_schema,
-                    file_groups,
-                    statistics,
-                    projection: projection.clone(),
-                    limit,
-                    table_partition_cols: vec![],
-                },
-                &[],
-            )
-            .await?;
-        Ok(exec)
-    }
 }
diff --git a/datafusion/core/src/row/reader.rs b/datafusion/row/src/reader.rs
similarity index 98%
rename from datafusion/core/src/row/reader.rs
rename to datafusion/row/src/reader.rs
index abaf57c14..e7ee004b0 100644
--- a/datafusion/core/src/row/reader.rs
+++ b/datafusion/row/src/reader.rs
@@ -17,14 +17,14 @@
 
 //! Accessing row from raw bytes
 
-use crate::error::{DataFusionError, Result};
-use crate::row::layout::{RowLayout, RowType};
-use crate::row::validity::{all_valid, NullBitsFormatter};
-use crate::row::MutableRecordBatch;
+use crate::layout::{RowLayout, RowType};
+use crate::validity::{all_valid, NullBitsFormatter};
+use crate::MutableRecordBatch;
 use arrow::array::*;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
 use arrow::util::bit_util::get_bit_raw;
+use datafusion_common::{DataFusionError, Result};
 use std::sync::Arc;
 
 /// Read `data` of raw-bytes rows starting at `offsets` out to a record batch
@@ -43,7 +43,7 @@ pub fn read_as_batch(
         read_row(&row, &mut output, &schema);
     }
 
-    Ok(output.output()?)
+    output.output().map_err(DataFusionError::ArrowError)
 }
 
 macro_rules! get_idx {
diff --git a/datafusion/core/src/row/validity.rs b/datafusion/row/src/validity.rs
similarity index 100%
rename from datafusion/core/src/row/validity.rs
rename to datafusion/row/src/validity.rs
diff --git a/datafusion/core/src/row/writer.rs b/datafusion/row/src/writer.rs
similarity index 99%
rename from datafusion/core/src/row/writer.rs
rename to datafusion/row/src/writer.rs
index 920eb9963..6b9ffdc0e 100644
--- a/datafusion/core/src/row/writer.rs
+++ b/datafusion/row/src/writer.rs
@@ -17,12 +17,12 @@
 
 //! Reusable row writer backed by Vec<u8> to stitch attributes together
 
-use crate::error::Result;
-use crate::row::layout::{estimate_row_width, RowLayout, RowType};
+use crate::layout::{estimate_row_width, RowLayout, RowType};
 use arrow::array::*;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
 use arrow::util::bit_util::{round_upto_power_of_2, set_bit_raw, unset_bit_raw};
+use datafusion_common::Result;
 use std::cmp::max;
 use std::sync::Arc;