You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/27 10:48:03 UTC

[arrow-datafusion] branch master updated: refactor: parallelize `parquet_exec` test case `single_file` (#4735)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 38a24c0fe refactor: parallelize `parquet_exec` test case `single_file` (#4735)
38a24c0fe is described below

commit 38a24c0fe0b8d998616ed7ae0021fd5ac5c20464
Author: Ruihang Xia <wa...@gmail.com>
AuthorDate: Tue Dec 27 18:47:58 2022 +0800

    refactor: parallelize `parquet_exec` test case `single_file` (#4735)
    
    * refactor: parallelize test case
    
    Signed-off-by: Ruihang Xia <wa...@gmail.com>
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * format code
    
    Signed-off-by: Ruihang Xia <wa...@gmail.com>
    
    * change row limit
    
    Signed-off-by: Ruihang Xia <wa...@gmail.com>
    
    Signed-off-by: Ruihang Xia <wa...@gmail.com>
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/core/tests/parquet/filter_pushdown.rs | 129 ++++++++++++-----------
 1 file changed, 66 insertions(+), 63 deletions(-)

diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs
index ac3744278..59350113c 100644
--- a/datafusion/core/tests/parquet/filter_pushdown.rs
+++ b/datafusion/core/tests/parquet/filter_pushdown.rs
@@ -26,6 +26,7 @@
 //! select * from data limit 10;
 //! ```
 
+use std::sync::Arc;
 use std::time::Instant;
 
 use arrow::compute::concat_batches;
@@ -42,6 +43,7 @@ use test_utils::AccessLogGenerator;
 
 /// how many rows of generated data to write to our parquet file (arbitrary)
 const NUM_ROWS: usize = 53819;
+const ROW_LIMIT: usize = 4096;
 
 #[cfg(test)]
 #[ctor::ctor]
@@ -51,13 +53,16 @@ fn init() {
 }
 
 #[cfg(not(target_family = "windows"))]
-#[tokio::test]
+// Use multi-threaded executor as this test consumes CPU
+#[tokio::test(flavor = "multi_thread")]
 async fn single_file() {
     // Only create the parquet file once as it is fairly large
 
     let tempdir = TempDir::new().unwrap();
 
-    let generator = AccessLogGenerator::new().with_row_limit(NUM_ROWS);
+    let generator = AccessLogGenerator::new()
+        .with_row_limit(NUM_ROWS)
+        .with_max_batch_size(ROW_LIMIT);
 
     // default properties
     let props = WriterProperties::builder().build();
@@ -65,31 +70,32 @@ async fn single_file() {
 
     let start = Instant::now();
     println!("Writing test data to {:?}", file);
-    let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
+    let test_parquet_file =
+        Arc::new(TestParquetFile::try_new(file, props, generator).unwrap());
     println!(
         "Completed generating test data in {:?}",
         Instant::now() - start
     );
 
-    TestCase::new(&test_parquet_file)
+    let mut set = tokio::task::JoinSet::new();
+
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("selective")
         // request_method = 'GET'
         .with_filter(col("request_method").eq(lit("GET")))
         .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(8886)
-        .run()
-        .await;
+        .with_expected_rows(8875);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("non_selective")
         // request_method != 'GET'
         .with_filter(col("request_method").not_eq(lit("GET")))
         .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(44933)
-        .run()
-        .await;
+        .with_expected_rows(44944);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("basic_conjunction")
         // request_method = 'POST' AND
         //   response_status = 503
@@ -101,49 +107,44 @@ async fn single_file() {
             .unwrap(),
         )
         .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(1729)
-        .run()
-        .await;
+        .with_expected_rows(1731);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("everything")
         // filter filters everything (no row has this status)
         // response_status = 429
         .with_filter(col("response_status").eq(lit(429_u16)))
         .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(0)
-        .run()
-        .await;
+        .with_expected_rows(0);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("nothing")
         // No rows are filtered out -- all are returned
         // response_status > 0
         .with_filter(col("response_status").gt(lit(0_u16)))
         .with_pushdown_expected(PushdownExpected::None)
-        .with_expected_rows(NUM_ROWS)
-        .run()
-        .await;
+        .with_expected_rows(NUM_ROWS);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("dict_selective")
         // container = 'backend_container_0'
         .with_filter(col("container").eq(lit("backend_container_0")))
         .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(37856)
-        .run()
-        .await;
+        .with_expected_rows(15911);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("not eq")
         // container != 'backend_container_0'
         .with_filter(col("container").not_eq(lit("backend_container_0")))
         .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(15963)
-        .run()
-        .await;
+        .with_expected_rows(37908);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("dict_conjunction")
         // container == 'backend_container_0' AND
         //   pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
@@ -155,11 +156,10 @@ async fn single_file() {
             .unwrap(),
         )
         .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(3052)
-        .run()
-        .await;
+        .with_expected_rows(3052);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("dict_very_selective")
         // request_bytes > 2B AND
         //   container == 'backend_container_0' AND
@@ -173,11 +173,10 @@ async fn single_file() {
             .unwrap(),
         )
         .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(88)
-        .run()
-        .await;
+        .with_expected_rows(88);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("dict_very_selective2")
         // picks only 2 rows
         // client_addr = '204.47.29.82' AND
@@ -192,11 +191,10 @@ async fn single_file() {
             .unwrap(),
         )
         .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(88)
-        .run()
-        .await;
+        .with_expected_rows(88);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("dict_disjunction")
         // container = 'backend_container_0' OR
         //   pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
@@ -207,12 +205,11 @@ async fn single_file() {
             ])
             .unwrap(),
         )
-        .with_pushdown_expected(PushdownExpected::Some)
-        .with_expected_rows(39982)
-        .run()
-        .await;
+        .with_pushdown_expected(PushdownExpected::None)
+        .with_expected_rows(16955);
+    set.spawn(async move { case.run().await });
 
-    TestCase::new(&test_parquet_file)
+    let case = TestCase::new(test_parquet_file.clone())
         .with_name("dict_disjunction3")
         // request_method != 'GET' OR
         //   response_status = 400 OR
@@ -225,10 +222,14 @@ async fn single_file() {
             ])
             .unwrap(),
         )
-        .with_pushdown_expected(PushdownExpected::None)
-        .with_expected_rows(NUM_ROWS)
-        .run()
-        .await;
+        .with_pushdown_expected(PushdownExpected::Some)
+        .with_expected_rows(48919);
+    set.spawn(async move { case.run().await });
+
+    // Join all the cases.
+    while let Some(result) = set.join_next().await {
+        result.unwrap()
+    }
 }
 
 #[cfg(not(target_family = "windows"))]
@@ -247,7 +248,8 @@ async fn single_file_small_data_pages() {
 
     let start = Instant::now();
     println!("Writing test data to {:?}", file);
-    let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
+    let test_parquet_file =
+        Arc::new(TestParquetFile::try_new(file, props, generator).unwrap());
     println!(
         "Completed generating test data in {:?}",
         Instant::now() - start
@@ -267,7 +269,7 @@ async fn single_file_small_data_pages() {
     // page 4:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216
     // page 5:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739
 
-    TestCase::new(&test_parquet_file)
+    TestCase::new(test_parquet_file.clone())
         .with_name("selective")
         // predicate is chosen carefully to prune pages 0, 1, 2, 3, 4
         // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
@@ -286,7 +288,7 @@ async fn single_file_small_data_pages() {
     // page 3:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
     // page 4:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
     // page 5:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739
-    TestCase::new(&test_parquet_file)
+    TestCase::new(test_parquet_file.clone())
         .with_name("selective")
         // predicate is chosen carefully to prune pages 1, 2, 4, and 5
         // time > 1970-01-01T00:00:00.004300000
@@ -314,7 +316,7 @@ async fn single_file_small_data_pages() {
     //                            offset   compressed size       first row index
     // page-0                   5581636            147517                     0
     // page-1                   5729153            147517                  9216
-    TestCase::new(&test_parquet_file)
+    TestCase::new(test_parquet_file.clone())
         .with_name("selective_on_decimal")
         // predicate is chosen carefully to prune pages 1, 2, 3, 4, and 5
         // decimal_price < 9200
@@ -345,8 +347,8 @@ enum PageIndexFilteringExpected {
 }
 
 /// parameters for running a test
-struct TestCase<'a> {
-    test_parquet_file: &'a TestParquetFile,
+struct TestCase {
+    test_parquet_file: Arc<TestParquetFile>,
     /// Human readable name to help debug failures
     name: String,
     /// The filter to apply
@@ -361,8 +363,8 @@ struct TestCase<'a> {
     expected_rows: usize,
 }
 
-impl<'a> TestCase<'a> {
-    fn new(test_parquet_file: &'a TestParquetFile) -> Self {
+impl TestCase {
+    fn new(test_parquet_file: Arc<TestParquetFile>) -> Self {
         Self {
             test_parquet_file,
             name: "<NOT SPECIFIED>".into(),
@@ -533,12 +535,13 @@ impl<'a> TestCase<'a> {
 
         match pushdown_expected {
             PushdownExpected::None => {
-                assert_eq!(pushdown_rows_filtered, 0);
+                assert_eq!(pushdown_rows_filtered, 0, "{}", self.name);
             }
             PushdownExpected::Some => {
                 assert!(
                     pushdown_rows_filtered > 0,
-                    "Expected to filter rows via pushdown, but none were"
+                    "{}: Expected to filter rows via pushdown, but none were",
+                    self.name
                 );
             }
         };