You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/19 19:53:07 UTC
[arrow-datafusion] branch master updated: [fix] moved aggr_test_schema to test_utils (#1338)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0facd4d [fix] moved aggr_test_schema to test_utils (#1338)
0facd4d is described below
commit 0facd4d483e8c289ee4e3a89487d0cd1ede1a110
Author: rdettai <rd...@gmail.com>
AuthorDate: Fri Nov 19 20:51:41 2021 +0100
[fix] moved aggr_test_schema to test_utils (#1338)
This avoids that its duplicated in the benchmark package.
---
datafusion/src/execution/dataframe_impl.rs | 6 ++--
.../src/physical_plan/coalesce_partitions.rs | 3 +-
datafusion/src/physical_plan/empty.rs | 11 +++---
datafusion/src/physical_plan/file_format/csv.rs | 2 +-
datafusion/src/physical_plan/file_format/mod.rs | 5 +--
datafusion/src/physical_plan/filter.rs | 5 +--
datafusion/src/physical_plan/limit.rs | 4 +--
datafusion/src/physical_plan/projection.rs | 3 +-
datafusion/src/physical_plan/sort.rs | 3 +-
.../src/physical_plan/sort_preserving_merge.rs | 12 +++----
datafusion/src/physical_plan/union.rs | 4 +--
datafusion/src/physical_plan/values.rs | 4 +--
datafusion/src/physical_plan/windows/mod.rs | 5 +--
datafusion/src/test/mod.rs | 21 +-----------
datafusion/src/test_util.rs | 25 ++++++++++++--
datafusion/tests/common.rs | 40 ----------------------
datafusion/tests/path_partition.rs | 6 ++--
datafusion/tests/sql.rs | 5 ++-
18 files changed, 64 insertions(+), 100 deletions(-)
diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index f565f5c..2887e29 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -257,11 +257,11 @@ mod tests {
use super::*;
use crate::execution::options::CsvReadOptions;
- use crate::logical_plan::*;
+ use crate::physical_plan::functions::ScalarFunctionImplementation;
use crate::physical_plan::functions::Volatility;
use crate::physical_plan::{window_functions, ColumnarValue};
use crate::{assert_batches_sorted_eq, execution::context::ExecutionContext};
- use crate::{physical_plan::functions::ScalarFunctionImplementation, test};
+ use crate::{logical_plan::*, test_util};
use arrow::datatypes::DataType;
#[tokio::test]
@@ -510,7 +510,7 @@ mod tests {
ctx: &mut ExecutionContext,
table_name: &str,
) -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let testdata = crate::test_util::arrow_test_data();
ctx.register_csv(
table_name,
diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs
index 9c133de..089c6b4 100644
--- a/datafusion/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -211,10 +211,11 @@ mod tests {
use crate::physical_plan::{collect, common};
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{self, assert_is_pending};
+ use crate::test_util;
#[tokio::test]
async fn merge() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let num_partitions = 4;
let (_, files) =
diff --git a/datafusion/src/physical_plan/empty.rs b/datafusion/src/physical_plan/empty.rs
index 430beaf..46b5002 100644
--- a/datafusion/src/physical_plan/empty.rs
+++ b/datafusion/src/physical_plan/empty.rs
@@ -148,12 +148,11 @@ impl ExecutionPlan for EmptyExec {
#[cfg(test)]
mod tests {
use super::*;
- use crate::physical_plan::common;
- use crate::test;
+ use crate::{physical_plan::common, test_util};
#[tokio::test]
async fn empty() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(false, schema.clone());
assert_eq!(empty.schema(), schema);
@@ -168,7 +167,7 @@ mod tests {
#[test]
fn with_new_children() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(false, schema);
let empty2 = empty.with_new_children(vec![])?;
@@ -184,7 +183,7 @@ mod tests {
#[tokio::test]
async fn invalid_execute() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(false, schema);
// ask for the wrong partition
@@ -195,7 +194,7 @@ mod tests {
#[tokio::test]
async fn produce_one_row() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(true, schema);
let iter = empty.execute(0).await?;
diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs
index 0057e9e..efea300 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -167,7 +167,7 @@ mod tests {
use crate::{
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
scalar::ScalarValue,
- test::aggr_test_schema,
+ test_util::aggr_test_schema,
};
use futures::StreamExt;
diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs
index 3ad8ffe..17ec9f1 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -277,8 +277,9 @@ fn create_dict_array(
#[cfg(test)]
mod tests {
- use crate::test::{
- aggr_test_schema, build_table_i32, columns, object_store::TestObjectStore,
+ use crate::{
+ test::{build_table_i32, columns, object_store::TestObjectStore},
+ test_util::aggr_test_schema,
};
use super::*;
diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs
index fe0f103..a32371a 100644
--- a/datafusion/src/physical_plan/filter.rs
+++ b/datafusion/src/physical_plan/filter.rs
@@ -227,13 +227,14 @@ mod tests {
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::scalar::ScalarValue;
- use crate::test::{self};
+ use crate::test;
+ use crate::test_util;
use crate::{logical_plan::Operator, physical_plan::collect};
use std::iter::Iterator;
#[tokio::test]
async fn simple_predicate() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs
index f9c392a..ef492ec 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -388,11 +388,11 @@ mod tests {
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
- use crate::test;
+ use crate::{test, test_util};
#[tokio::test]
async fn limit() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let num_partitions = 4;
let (_, files) =
diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs
index eb335c2..eb0c4b8 100644
--- a/datafusion/src/physical_plan/projection.rs
+++ b/datafusion/src/physical_plan/projection.rs
@@ -264,11 +264,12 @@ mod tests {
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
use crate::scalar::ScalarValue;
use crate::test::{self};
+ use crate::test_util;
use futures::future;
#[tokio::test]
async fn project_first_column() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index a606906..5eb29bb 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -321,13 +321,14 @@ mod tests {
use crate::test::assert_is_pending;
use crate::test::exec::assert_strong_count_converges_to_zero;
use crate::test::{self, exec::BlockingExec};
+ use crate::test_util;
use arrow::array::*;
use arrow::datatypes::*;
use futures::FutureExt;
#[tokio::test]
async fn test_sort() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs
index 62f4b94..3f4827b 100644
--- a/datafusion/src/physical_plan/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sort_preserving_merge.rs
@@ -664,7 +664,6 @@ mod tests {
use std::iter::FromIterator;
use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
- use crate::assert_batches_eq;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
@@ -672,6 +671,7 @@ mod tests {
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::{collect, common};
use crate::test::{self, assert_is_pending};
+ use crate::{assert_batches_eq, test_util};
use super::*;
use arrow::datatypes::{DataType, Field, Schema};
@@ -930,7 +930,7 @@ mod tests {
#[tokio::test]
async fn test_partition_sort() {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
@@ -1013,7 +1013,7 @@ mod tests {
sort: Vec<PhysicalSortExpr>,
sizes: &[usize],
) -> Arc<dyn ExecutionPlan> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
@@ -1041,7 +1041,7 @@ mod tests {
#[tokio::test]
async fn test_partition_sort_streaming_input() {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let sort = vec![
// uint8
PhysicalSortExpr {
@@ -1080,7 +1080,7 @@ mod tests {
#[tokio::test]
async fn test_partition_sort_streaming_input_output() {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let sort = vec![
// float64
@@ -1195,7 +1195,7 @@ mod tests {
#[tokio::test]
async fn test_async() {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let sort = vec![PhysicalSortExpr {
expr: col("c12", &schema).unwrap(),
options: SortOptions::default(),
diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs
index 418be63..79c5072 100644
--- a/datafusion/src/physical_plan/union.rs
+++ b/datafusion/src/physical_plan/union.rs
@@ -219,7 +219,7 @@ fn stats_union(mut left: Statistics, right: Statistics) -> Statistics {
mod tests {
use super::*;
use crate::datasource::object_store::{local::LocalFileSystem, ObjectStore};
- use crate::test;
+ use crate::{test, test_util};
use crate::{
physical_plan::{
@@ -232,7 +232,7 @@ mod tests {
#[tokio::test]
async fn test_union_partitions() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let fs: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem {});
// Create csv's with different partitioning
diff --git a/datafusion/src/physical_plan/values.rs b/datafusion/src/physical_plan/values.rs
index de15d40..f4f8ccb 100644
--- a/datafusion/src/physical_plan/values.rs
+++ b/datafusion/src/physical_plan/values.rs
@@ -170,11 +170,11 @@ impl ExecutionPlan for ValuesExec {
#[cfg(test)]
mod tests {
use super::*;
- use crate::test;
+ use crate::test_util;
#[tokio::test]
async fn values_empty_case() -> Result<()> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let empty = ValuesExec::try_new(schema, vec![]);
assert!(!empty.is_ok());
Ok(())
diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs
index 28bf402..8b182f9 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -181,14 +181,15 @@ mod tests {
use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig};
use crate::physical_plan::{collect, Statistics};
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
- use crate::test::{self, aggr_test_schema, assert_is_pending};
+ use crate::test::{self, assert_is_pending};
+ use crate::test_util::{self, aggr_test_schema};
use arrow::array::*;
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::record_batch::RecordBatch;
use futures::FutureExt;
fn create_test_schema(partitions: usize) -> Result<(Arc<CsvExec>, SchemaRef)> {
- let schema = test::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
let csv = CsvExec::new(
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index c13df55..16c1383 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -26,7 +26,7 @@ use array::{
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow::array::{self, Int32Array};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use futures::{Future, FutureExt};
use std::fs::File;
@@ -104,25 +104,6 @@ pub fn create_partitioned_csv(
Ok((tmp_dir.into_path().to_str().unwrap().to_string(), groups))
}
-/// Get the schema for the aggregate_test_* csv files
-pub fn aggr_test_schema() -> SchemaRef {
- Arc::new(Schema::new(vec![
- Field::new("c1", DataType::Utf8, false),
- Field::new("c2", DataType::UInt32, false),
- Field::new("c3", DataType::Int8, false),
- Field::new("c4", DataType::Int16, false),
- Field::new("c5", DataType::Int32, false),
- Field::new("c6", DataType::Int64, false),
- Field::new("c7", DataType::UInt8, false),
- Field::new("c8", DataType::UInt16, false),
- Field::new("c9", DataType::UInt32, false),
- Field::new("c10", DataType::UInt64, false),
- Field::new("c11", DataType::Float32, false),
- Field::new("c12", DataType::Float64, false),
- Field::new("c13", DataType::Utf8, false),
- ]))
-}
-
/// some tests share a common table with different names
pub fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
let schema = Schema::new(vec![
diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs
index 03e0054..b87b756 100644
--- a/datafusion/src/test_util.rs
+++ b/datafusion/src/test_util.rs
@@ -17,7 +17,9 @@
//! Utility functions to make testing DataFusion based crates easier
-use std::{env, error::Error, path::PathBuf};
+use std::{env, error::Error, path::PathBuf, sync::Arc};
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
@@ -155,7 +157,7 @@ pub fn arrow_test_data() -> String {
}
}
-/// Returns the parquest test data directory, which is by default
+/// Returns the parquet test data directory, which is by default
/// stored in a git submodule rooted at
/// `parquest-testing/data`.
///
@@ -225,6 +227,25 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn
}
}
+/// Get the schema for the aggregate_test_* csv files
+pub fn aggr_test_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Utf8, false),
+ Field::new("c2", DataType::UInt32, false),
+ Field::new("c3", DataType::Int8, false),
+ Field::new("c4", DataType::Int16, false),
+ Field::new("c5", DataType::Int32, false),
+ Field::new("c6", DataType::Int64, false),
+ Field::new("c7", DataType::UInt8, false),
+ Field::new("c8", DataType::UInt16, false),
+ Field::new("c9", DataType::UInt32, false),
+ Field::new("c10", DataType::UInt64, false),
+ Field::new("c11", DataType::Float32, false),
+ Field::new("c12", DataType::Float64, false),
+ Field::new("c13", DataType::Utf8, false),
+ ]))
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/tests/common.rs b/datafusion/tests/common.rs
deleted file mode 100644
index 3490db5..0000000
--- a/datafusion/tests/common.rs
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! methods that are common to multiple integration test setups
-
-use std::sync::Arc;
-
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-
-pub fn aggr_test_schema() -> SchemaRef {
- Arc::new(Schema::new(vec![
- Field::new("c1", DataType::Utf8, false),
- Field::new("c2", DataType::UInt32, false),
- Field::new("c3", DataType::Int8, false),
- Field::new("c4", DataType::Int16, false),
- Field::new("c5", DataType::Int32, false),
- Field::new("c6", DataType::Int64, false),
- Field::new("c7", DataType::UInt8, false),
- Field::new("c8", DataType::UInt16, false),
- Field::new("c9", DataType::UInt32, false),
- Field::new("c10", DataType::UInt64, false),
- Field::new("c11", DataType::Float32, false),
- Field::new("c12", DataType::Float64, false),
- Field::new("c13", DataType::Utf8, false),
- ]))
-}
diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs
index 7895110..e68ef32 100644
--- a/datafusion/tests/path_partition.rs
+++ b/datafusion/tests/path_partition.rs
@@ -33,12 +33,10 @@ use datafusion::{
error::{DataFusionError, Result},
physical_plan::ColumnStatistics,
prelude::ExecutionContext,
- test_util::{arrow_test_data, parquet_test_data},
+ test_util::{self, arrow_test_data, parquet_test_data},
};
use futures::{stream, StreamExt};
-mod common;
-
#[tokio::test]
async fn csv_filter_with_file_col() -> Result<()> {
let mut ctx = ExecutionContext::new();
@@ -281,7 +279,7 @@ fn register_partitioned_aggregate_csv(
) {
let testdata = arrow_test_data();
let csv_file_path = format!("{}/csv/aggregate_test_100.csv", testdata);
- let file_schema = common::aggr_test_schema();
+ let file_schema = test_util::aggr_test_schema();
let object_store = MirroringObjectStore::new_arc(csv_file_path, store_paths);
let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index ec73494..8349669 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -43,6 +43,7 @@ use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::ExecutionPlanVisitor;
use datafusion::prelude::*;
+use datafusion::test_util;
use datafusion::{datasource::MemTable, physical_plan::collect};
use datafusion::{
error::{DataFusionError, Result},
@@ -50,8 +51,6 @@ use datafusion::{
};
use datafusion::{execution::context::ExecutionContext, physical_plan::displayable};
-mod common;
-
#[tokio::test]
async fn nyc() -> Result<()> {
// schema for nyxtaxi csv files
@@ -3461,7 +3460,7 @@ async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
async fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> {
let testdata = datafusion::test_util::arrow_test_data();
- let schema = common::aggr_test_schema();
+ let schema = test_util::aggr_test_schema();
ctx.register_csv(
"aggregate_test_100",
&format!("{}/csv/aggregate_test_100.csv", testdata),