You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/27 10:48:03 UTC
[arrow-datafusion] branch master updated: refactor: parallelize `parquet_exec` test case `single_file` (#4735)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 38a24c0fe refactor: parallelize `parquet_exec` test case `single_file` (#4735)
38a24c0fe is described below
commit 38a24c0fe0b8d998616ed7ae0021fd5ac5c20464
Author: Ruihang Xia <wa...@gmail.com>
AuthorDate: Tue Dec 27 18:47:58 2022 +0800
refactor: parallelize `parquet_exec` test case `single_file` (#4735)
* refactor: parallelize test case
Signed-off-by: Ruihang Xia <wa...@gmail.com>
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* format code
Signed-off-by: Ruihang Xia <wa...@gmail.com>
* change row limit
Signed-off-by: Ruihang Xia <wa...@gmail.com>
Signed-off-by: Ruihang Xia <wa...@gmail.com>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/core/tests/parquet/filter_pushdown.rs | 129 ++++++++++++-----------
1 file changed, 66 insertions(+), 63 deletions(-)
diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs
index ac3744278..59350113c 100644
--- a/datafusion/core/tests/parquet/filter_pushdown.rs
+++ b/datafusion/core/tests/parquet/filter_pushdown.rs
@@ -26,6 +26,7 @@
//! select * from data limit 10;
//! ```
+use std::sync::Arc;
use std::time::Instant;
use arrow::compute::concat_batches;
@@ -42,6 +43,7 @@ use test_utils::AccessLogGenerator;
/// how many rows of generated data to write to our parquet file (arbitrary)
const NUM_ROWS: usize = 53819;
+const ROW_LIMIT: usize = 4096;
#[cfg(test)]
#[ctor::ctor]
@@ -51,13 +53,16 @@ fn init() {
}
#[cfg(not(target_family = "windows"))]
-#[tokio::test]
+// Use multi-threaded executor as this test consumes CPU
+#[tokio::test(flavor = "multi_thread")]
async fn single_file() {
// Only create the parquet file once as it is fairly large
let tempdir = TempDir::new().unwrap();
- let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS);
+ let generator = AccessLogGenerator::new()
+ .with_row_limit(NUM_ROWS)
+ .with_max_batch_size(ROW_LIMIT);
// default properties
let props = WriterProperties::builder().build();
@@ -65,31 +70,32 @@ async fn single_file() {
let start = Instant::now();
println!("Writing test data to {:?}", file);
- let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
+ let test_parquet_file =
+ Arc::new(TestParquetFile::try_new(file, props, generator).unwrap());
println!(
"Completed generating test data in {:?}",
Instant::now() - start
);
- TestCase::new(&test_parquet_file)
+ let mut set = tokio::task::JoinSet::new();
+
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("selective")
// request_method = 'GET'
.with_filter(col("request_method").eq(lit("GET")))
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(8886)
- .run()
- .await;
+ .with_expected_rows(8875);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("non_selective")
// request_method != 'GET'
.with_filter(col("request_method").not_eq(lit("GET")))
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(44933)
- .run()
- .await;
+ .with_expected_rows(44944);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("basic_conjunction")
// request_method = 'POST' AND
// response_status = 503
@@ -101,49 +107,44 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(1729)
- .run()
- .await;
+ .with_expected_rows(1731);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("everything")
// filter filters everything (no row has this status)
// response_status = 429
.with_filter(col("response_status").eq(lit(429_u16)))
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(0)
- .run()
- .await;
+ .with_expected_rows(0);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("nothing")
// No rows are filtered out -- all are returned
// response_status > 0
.with_filter(col("response_status").gt(lit(0_u16)))
.with_pushdown_expected(PushdownExpected::None)
- .with_expected_rows(NUM_ROWS)
- .run()
- .await;
+ .with_expected_rows(NUM_ROWS);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_selective")
// container = 'backend_container_0'
.with_filter(col("container").eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(37856)
- .run()
- .await;
+ .with_expected_rows(15911);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("not eq")
// container != 'backend_container_0'
.with_filter(col("container").not_eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(15963)
- .run()
- .await;
+ .with_expected_rows(37908);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_conjunction")
// container == 'backend_container_0' AND
// pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
@@ -155,11 +156,10 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(3052)
- .run()
- .await;
+ .with_expected_rows(3052);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_very_selective")
// request_bytes > 2B AND
// container == 'backend_container_0' AND
@@ -173,11 +173,10 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(88)
- .run()
- .await;
+ .with_expected_rows(88);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_very_selective2")
// picks only 2 rows
// client_addr = '204.47.29.82' AND
@@ -192,11 +191,10 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(88)
- .run()
- .await;
+ .with_expected_rows(88);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_disjunction")
// container = 'backend_container_0' OR
// pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
@@ -207,12 +205,11 @@ async fn single_file() {
])
.unwrap(),
)
- .with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(39982)
- .run()
- .await;
+ .with_pushdown_expected(PushdownExpected::None)
+ .with_expected_rows(16955);
+ set.spawn(async move { case.run().await });
- TestCase::new(&test_parquet_file)
+ let case = TestCase::new(test_parquet_file.clone())
.with_name("dict_disjunction3")
// request_method != 'GET' OR
// response_status = 400 OR
@@ -225,10 +222,14 @@ async fn single_file() {
])
.unwrap(),
)
- .with_pushdown_expected(PushdownExpected::None)
- .with_expected_rows(NUM_ROWS)
- .run()
- .await;
+ .with_pushdown_expected(PushdownExpected::Some)
+ .with_expected_rows(48919);
+ set.spawn(async move { case.run().await });
+
+ // Join all the cases.
+ while let Some(result) = set.join_next().await {
+ result.unwrap()
+ }
}
#[cfg(not(target_family = "windows"))]
@@ -247,7 +248,8 @@ async fn single_file_small_data_pages() {
let start = Instant::now();
println!("Writing test data to {:?}", file);
- let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
+ let test_parquet_file =
+ Arc::new(TestParquetFile::try_new(file, props, generator).unwrap());
println!(
"Completed generating test data in {:?}",
Instant::now() - start
@@ -267,7 +269,7 @@ async fn single_file_small_data_pages() {
// page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216
// page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739
- TestCase::new(&test_parquet_file)
+ TestCase::new(test_parquet_file.clone())
.with_name("selective")
// predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
// pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
@@ -286,7 +288,7 @@ async fn single_file_small_data_pages() {
// page 3: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
// page 4: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
// page 5: DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739
- TestCase::new(&test_parquet_file)
+ TestCase::new(test_parquet_file.clone())
.with_name("selective")
// predicate is chosen carefully to prune pages 1, 2, 4, and 5
// time > 1970-01-01T00:00:00.004300000
@@ -314,7 +316,7 @@ async fn single_file_small_data_pages() {
// offset compressed size first row index
// page-0 5581636 147517 0
// page-1 5729153 147517 9216
- TestCase::new(&test_parquet_file)
+ TestCase::new(test_parquet_file.clone())
.with_name("selective_on_decimal")
// predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
// decimal_price < 9200
@@ -345,8 +347,8 @@ enum PageIndexFilteringExpected {
}
/// parameters for running a test
-struct TestCase<'a> {
- test_parquet_file: &'a TestParquetFile,
+struct TestCase {
+ test_parquet_file: Arc<TestParquetFile>,
/// Human readable name to help debug failures
name: String,
/// The filter to apply
@@ -361,8 +363,8 @@ struct TestCase<'a> {
expected_rows: usize,
}
-impl<'a> TestCase<'a> {
- fn new(test_parquet_file: &'a TestParquetFile) -> Self {
+impl TestCase {
+ fn new(test_parquet_file: Arc<TestParquetFile>) -> Self {
Self {
test_parquet_file,
name: "<NOT SPECIFIED>".into(),
@@ -533,12 +535,13 @@ impl<'a> TestCase<'a> {
match pushdown_expected {
PushdownExpected::None => {
- assert_eq!(pushdown_rows_filtered, 0);
+ assert_eq!(pushdown_rows_filtered, 0, "{}", self.name);
}
PushdownExpected::Some => {
assert!(
pushdown_rows_filtered > 0,
- "Expected to filter rows via pushdown, but none were"
+ "{}: Expected to filter rows via pushdown, but none were",
+ self.name
);
}
};