You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/19 19:53:07 UTC

[arrow-datafusion] branch master updated: [fix] moved aggr_test_schema to test_utils (#1338)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0facd4d  [fix] moved aggr_test_schema to test_utils (#1338)
0facd4d is described below

commit 0facd4d483e8c289ee4e3a89487d0cd1ede1a110
Author: rdettai <rd...@gmail.com>
AuthorDate: Fri Nov 19 20:51:41 2021 +0100

    [fix] moved aggr_test_schema to test_utils (#1338)
    
    This avoids that its duplicated in the benchmark package.
---
 datafusion/src/execution/dataframe_impl.rs         |  6 ++--
 .../src/physical_plan/coalesce_partitions.rs       |  3 +-
 datafusion/src/physical_plan/empty.rs              | 11 +++---
 datafusion/src/physical_plan/file_format/csv.rs    |  2 +-
 datafusion/src/physical_plan/file_format/mod.rs    |  5 +--
 datafusion/src/physical_plan/filter.rs             |  5 +--
 datafusion/src/physical_plan/limit.rs              |  4 +--
 datafusion/src/physical_plan/projection.rs         |  3 +-
 datafusion/src/physical_plan/sort.rs               |  3 +-
 .../src/physical_plan/sort_preserving_merge.rs     | 12 +++----
 datafusion/src/physical_plan/union.rs              |  4 +--
 datafusion/src/physical_plan/values.rs             |  4 +--
 datafusion/src/physical_plan/windows/mod.rs        |  5 +--
 datafusion/src/test/mod.rs                         | 21 +-----------
 datafusion/src/test_util.rs                        | 25 ++++++++++++--
 datafusion/tests/common.rs                         | 40 ----------------------
 datafusion/tests/path_partition.rs                 |  6 ++--
 datafusion/tests/sql.rs                            |  5 ++-
 18 files changed, 64 insertions(+), 100 deletions(-)

diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index f565f5c..2887e29 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -257,11 +257,11 @@ mod tests {
 
     use super::*;
     use crate::execution::options::CsvReadOptions;
-    use crate::logical_plan::*;
+    use crate::physical_plan::functions::ScalarFunctionImplementation;
     use crate::physical_plan::functions::Volatility;
     use crate::physical_plan::{window_functions, ColumnarValue};
     use crate::{assert_batches_sorted_eq, execution::context::ExecutionContext};
-    use crate::{physical_plan::functions::ScalarFunctionImplementation, test};
+    use crate::{logical_plan::*, test_util};
     use arrow::datatypes::DataType;
 
     #[tokio::test]
@@ -510,7 +510,7 @@ mod tests {
         ctx: &mut ExecutionContext,
         table_name: &str,
     ) -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let testdata = crate::test_util::arrow_test_data();
         ctx.register_csv(
             table_name,
diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs
index 9c133de..089c6b4 100644
--- a/datafusion/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -211,10 +211,11 @@ mod tests {
     use crate::physical_plan::{collect, common};
     use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
     use crate::test::{self, assert_is_pending};
+    use crate::test_util;
 
     #[tokio::test]
     async fn merge() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
 
         let num_partitions = 4;
         let (_, files) =
diff --git a/datafusion/src/physical_plan/empty.rs b/datafusion/src/physical_plan/empty.rs
index 430beaf..46b5002 100644
--- a/datafusion/src/physical_plan/empty.rs
+++ b/datafusion/src/physical_plan/empty.rs
@@ -148,12 +148,11 @@ impl ExecutionPlan for EmptyExec {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::physical_plan::common;
-    use crate::test;
+    use crate::{physical_plan::common, test_util};
 
     #[tokio::test]
     async fn empty() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
 
         let empty = EmptyExec::new(false, schema.clone());
         assert_eq!(empty.schema(), schema);
@@ -168,7 +167,7 @@ mod tests {
 
     #[test]
     fn with_new_children() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let empty = EmptyExec::new(false, schema);
 
         let empty2 = empty.with_new_children(vec![])?;
@@ -184,7 +183,7 @@ mod tests {
 
     #[tokio::test]
     async fn invalid_execute() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let empty = EmptyExec::new(false, schema);
 
         // ask for the wrong partition
@@ -195,7 +194,7 @@ mod tests {
 
     #[tokio::test]
     async fn produce_one_row() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let empty = EmptyExec::new(true, schema);
 
         let iter = empty.execute(0).await?;
diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs
index 0057e9e..efea300 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -167,7 +167,7 @@ mod tests {
     use crate::{
         datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
         scalar::ScalarValue,
-        test::aggr_test_schema,
+        test_util::aggr_test_schema,
     };
     use futures::StreamExt;
 
diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs
index 3ad8ffe..17ec9f1 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -277,8 +277,9 @@ fn create_dict_array(
 
 #[cfg(test)]
 mod tests {
-    use crate::test::{
-        aggr_test_schema, build_table_i32, columns, object_store::TestObjectStore,
+    use crate::{
+        test::{build_table_i32, columns, object_store::TestObjectStore},
+        test_util::aggr_test_schema,
     };
 
     use super::*;
diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs
index fe0f103..a32371a 100644
--- a/datafusion/src/physical_plan/filter.rs
+++ b/datafusion/src/physical_plan/filter.rs
@@ -227,13 +227,14 @@ mod tests {
     use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
     use crate::physical_plan::ExecutionPlan;
     use crate::scalar::ScalarValue;
-    use crate::test::{self};
+    use crate::test;
+    use crate::test_util;
     use crate::{logical_plan::Operator, physical_plan::collect};
     use std::iter::Iterator;
 
     #[tokio::test]
     async fn simple_predicate() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
 
         let partitions = 4;
         let (_, files) =
diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs
index f9c392a..ef492ec 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -388,11 +388,11 @@ mod tests {
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::common;
     use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
-    use crate::test;
+    use crate::{test, test_util};
 
     #[tokio::test]
     async fn limit() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
 
         let num_partitions = 4;
         let (_, files) =
diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs
index eb335c2..eb0c4b8 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -264,11 +264,12 @@ mod tests {
     use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
     use crate::scalar::ScalarValue;
     use crate::test::{self};
+    use crate::test_util;
     use futures::future;
 
     #[tokio::test]
     async fn project_first_column() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
 
         let partitions = 4;
         let (_, files) =
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index a606906..5eb29bb 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -321,13 +321,14 @@ mod tests {
     use crate::test::assert_is_pending;
     use crate::test::exec::assert_strong_count_converges_to_zero;
     use crate::test::{self, exec::BlockingExec};
+    use crate::test_util;
     use arrow::array::*;
     use arrow::datatypes::*;
     use futures::FutureExt;
 
     #[tokio::test]
     async fn test_sort() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let partitions = 4;
         let (_, files) =
             test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs
index 62f4b94..3f4827b 100644
--- a/datafusion/src/physical_plan/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sort_preserving_merge.rs
@@ -664,7 +664,6 @@ mod tests {
     use std::iter::FromIterator;
 
     use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
-    use crate::assert_batches_eq;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::expressions::col;
     use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
@@ -672,6 +671,7 @@ mod tests {
     use crate::physical_plan::sort::SortExec;
     use crate::physical_plan::{collect, common};
     use crate::test::{self, assert_is_pending};
+    use crate::{assert_batches_eq, test_util};
 
     use super::*;
     use arrow::datatypes::{DataType, Field, Schema};
@@ -930,7 +930,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_partition_sort() {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let partitions = 4;
         let (_, files) =
             test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
@@ -1013,7 +1013,7 @@ mod tests {
         sort: Vec<PhysicalSortExpr>,
         sizes: &[usize],
     ) -> Arc<dyn ExecutionPlan> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let partitions = 4;
         let (_, files) =
             test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
@@ -1041,7 +1041,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_partition_sort_streaming_input() {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let sort = vec![
             // uint8
             PhysicalSortExpr {
@@ -1080,7 +1080,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_partition_sort_streaming_input_output() {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
 
         let sort = vec![
             // float64
@@ -1195,7 +1195,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_async() {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let sort = vec![PhysicalSortExpr {
             expr: col("c12", &schema).unwrap(),
             options: SortOptions::default(),
diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs
index 418be63..79c5072 100644
--- a/datafusion/src/physical_plan/union.rs
+++ b/datafusion/src/physical_plan/union.rs
@@ -219,7 +219,7 @@ fn stats_union(mut left: Statistics, right: Statistics) -> Statistics {
 mod tests {
     use super::*;
     use crate::datasource::object_store::{local::LocalFileSystem, ObjectStore};
-    use crate::test;
+    use crate::{test, test_util};
 
     use crate::{
         physical_plan::{
@@ -232,7 +232,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_union_partitions() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem {});
 
         // Create csv's with different partitioning
diff --git a/datafusion/src/physical_plan/values.rs b/datafusion/src/physical_plan/values.rs
index de15d40..f4f8ccb 100644
--- a/datafusion/src/physical_plan/values.rs
+++ b/datafusion/src/physical_plan/values.rs
@@ -170,11 +170,11 @@ impl ExecutionPlan for ValuesExec {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::test;
+    use crate::test_util;
 
     #[tokio::test]
     async fn values_empty_case() -> Result<()> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let empty = ValuesExec::try_new(schema, vec![]);
         assert!(!empty.is_ok());
         Ok(())
diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs
index 28bf402..8b182f9 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -181,14 +181,15 @@ mod tests {
     use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
     use crate::physical_plan::{collect, Statistics};
     use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
-    use crate::test::{self, aggr_test_schema, assert_is_pending};
+    use crate::test::{self, assert_is_pending};
+    use crate::test_util::{self, aggr_test_schema};
     use arrow::array::*;
     use arrow::datatypes::{DataType, Field, SchemaRef};
     use arrow::record_batch::RecordBatch;
     use futures::FutureExt;
 
     fn create_test_schema(partitions: usize) -> Result<(Arc<CsvExec>, SchemaRef)> {
-        let schema = test::aggr_test_schema();
+        let schema = test_util::aggr_test_schema();
         let (_, files) =
             test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
         let csv = CsvExec::new(
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index c13df55..16c1383 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -26,7 +26,7 @@ use array::{
     TimestampNanosecondArray, TimestampSecondArray,
 };
 use arrow::array::{self, Int32Array};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema};
 use arrow::record_batch::RecordBatch;
 use futures::{Future, FutureExt};
 use std::fs::File;
@@ -104,25 +104,6 @@ pub fn create_partitioned_csv(
     Ok((tmp_dir.into_path().to_str().unwrap().to_string(), groups))
 }
 
-/// Get the schema for the aggregate_test_* csv files
-pub fn aggr_test_schema() -> SchemaRef {
-    Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Utf8, false),
-        Field::new("c2", DataType::UInt32, false),
-        Field::new("c3", DataType::Int8, false),
-        Field::new("c4", DataType::Int16, false),
-        Field::new("c5", DataType::Int32, false),
-        Field::new("c6", DataType::Int64, false),
-        Field::new("c7", DataType::UInt8, false),
-        Field::new("c8", DataType::UInt16, false),
-        Field::new("c9", DataType::UInt32, false),
-        Field::new("c10", DataType::UInt64, false),
-        Field::new("c11", DataType::Float32, false),
-        Field::new("c12", DataType::Float64, false),
-        Field::new("c13", DataType::Utf8, false),
-    ]))
-}
-
 /// some tests share a common table with different names
 pub fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
     let schema = Schema::new(vec![
diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs
index 03e0054..b87b756 100644
--- a/datafusion/src/test_util.rs
+++ b/datafusion/src/test_util.rs
@@ -17,7 +17,9 @@
 
 //! Utility functions to make testing DataFusion based crates easier
 
-use std::{env, error::Error, path::PathBuf};
+use std::{env, error::Error, path::PathBuf, sync::Arc};
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 
 /// Compares formatted output of a record batch with an expected
 /// vector of strings, with the result of pretty formatting record
@@ -155,7 +157,7 @@ pub fn arrow_test_data() -> String {
     }
 }
 
-/// Returns the parquest test data directory, which is by default
+/// Returns the parquet test data directory, which is by default
 /// stored in a git submodule rooted at
 /// `parquest-testing/data`.
 ///
@@ -225,6 +227,25 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn
     }
 }
 
+/// Get the schema for the aggregate_test_* csv files
+pub fn aggr_test_schema() -> SchemaRef {
+    Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Utf8, false),
+        Field::new("c2", DataType::UInt32, false),
+        Field::new("c3", DataType::Int8, false),
+        Field::new("c4", DataType::Int16, false),
+        Field::new("c5", DataType::Int32, false),
+        Field::new("c6", DataType::Int64, false),
+        Field::new("c7", DataType::UInt8, false),
+        Field::new("c8", DataType::UInt16, false),
+        Field::new("c9", DataType::UInt32, false),
+        Field::new("c10", DataType::UInt64, false),
+        Field::new("c11", DataType::Float32, false),
+        Field::new("c12", DataType::Float64, false),
+        Field::new("c13", DataType::Utf8, false),
+    ]))
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/tests/common.rs b/datafusion/tests/common.rs
deleted file mode 100644
index 3490db5..0000000
--- a/datafusion/tests/common.rs
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! methods that are common to multiple integration test setups
-
-use std::sync::Arc;
-
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-
-pub fn aggr_test_schema() -> SchemaRef {
-    Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Utf8, false),
-        Field::new("c2", DataType::UInt32, false),
-        Field::new("c3", DataType::Int8, false),
-        Field::new("c4", DataType::Int16, false),
-        Field::new("c5", DataType::Int32, false),
-        Field::new("c6", DataType::Int64, false),
-        Field::new("c7", DataType::UInt8, false),
-        Field::new("c8", DataType::UInt16, false),
-        Field::new("c9", DataType::UInt32, false),
-        Field::new("c10", DataType::UInt64, false),
-        Field::new("c11", DataType::Float32, false),
-        Field::new("c12", DataType::Float64, false),
-        Field::new("c13", DataType::Utf8, false),
-    ]))
-}
diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs
index 7895110..e68ef32 100644
--- a/datafusion/tests/path_partition.rs
+++ b/datafusion/tests/path_partition.rs
@@ -33,12 +33,10 @@ use datafusion::{
     error::{DataFusionError, Result},
     physical_plan::ColumnStatistics,
     prelude::ExecutionContext,
-    test_util::{arrow_test_data, parquet_test_data},
+    test_util::{self, arrow_test_data, parquet_test_data},
 };
 use futures::{stream, StreamExt};
 
-mod common;
-
 #[tokio::test]
 async fn csv_filter_with_file_col() -> Result<()> {
     let mut ctx = ExecutionContext::new();
@@ -281,7 +279,7 @@ fn register_partitioned_aggregate_csv(
 ) {
     let testdata = arrow_test_data();
     let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata);
-    let file_schema = common::aggr_test_schema();
+    let file_schema = test_util::aggr_test_schema();
     let object_store = MirroringObjectStore::new_arc(csv_file_path, store_paths);
 
     let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index ec73494..8349669 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -43,6 +43,7 @@ use datafusion::physical_plan::metrics::MetricValue;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::physical_plan::ExecutionPlanVisitor;
 use datafusion::prelude::*;
+use datafusion::test_util;
 use datafusion::{datasource::MemTable, physical_plan::collect};
 use datafusion::{
     error::{DataFusionError, Result},
@@ -50,8 +51,6 @@ use datafusion::{
 };
 use datafusion::{execution::context::ExecutionContext, physical_plan::displayable};
 
-mod common;
-
 #[tokio::test]
 async fn nyc() -> Result<()> {
     // schema for nyxtaxi csv files
@@ -3461,7 +3460,7 @@ async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
 
 async fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> {
     let testdata = datafusion::test_util::arrow_test_data();
-    let schema = common::aggr_test_schema();
+    let schema = test_util::aggr_test_schema();
     ctx.register_csv(
         "aggregate_test_100",
         &format!("{}/csv/aggregate_test_100.csv", testdata),