You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "melgenek (via GitHub)" <gi...@apache.org> on 2023/05/23 21:26:08 UTC

[GitHub] [arrow-datafusion] melgenek commented on a diff in pull request #6393: Reduce output when `sqllogictest` runs successfully, and run tests in parallel

melgenek commented on code in PR #6393:
URL: https://github.com/apache/arrow-datafusion/pull/6393#discussion_r1202961565


##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
 
 #[tokio::main]
 #[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
     run_tests().await
 }
 
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
     // Enable logging (e.g. set RUST_LOG=debug to see debug logs)
     env_logger::init();
 
     let options = Options::new();
 
-    for (path, relative_path) in read_test_files(&options) {
-        if options.complete_mode {
-            run_complete_file(&path, relative_path).await?;
-        } else if options.postgres_runner {
-            run_test_file_with_postgres(&path, relative_path).await?;
-        } else {
-            run_test_file(&path, relative_path).await?;
+    let test_files: Vec<_> = read_test_files(&options).collect();
+
+    // Run all tests in parallel, reporting failures at the end
+    let results: Vec<_> = futures::stream::iter(test_files)

Review Comment:
   ```suggestion
       // Run all tests in parallel, reporting failures at the end
       let results: Vec<_> = futures::stream::iter(read_test_files(&options))
   ```
   There's no need to collect in between.



##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
 

Review Comment:
   The `main` function above for Windows creates a single-threaded executor. I made it this way by mistake.
   It should be `tokio::runtime::Builder::new_multi_thread()`.



##########
datafusion/core/tests/sqllogictests/src/main.rs:
##########
@@ -56,65 +57,116 @@ pub fn main() {
 
 #[tokio::main]
 #[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
     run_tests().await
 }
 
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
     // Enable logging (e.g. set RUST_LOG=debug to see debug logs)
     env_logger::init();
 
     let options = Options::new();
 
-    for (path, relative_path) in read_test_files(&options) {
-        if options.complete_mode {
-            run_complete_file(&path, relative_path).await?;
-        } else if options.postgres_runner {
-            run_test_file_with_postgres(&path, relative_path).await?;
-        } else {
-            run_test_file(&path, relative_path).await?;
+    let test_files: Vec<_> = read_test_files(&options).collect();
+
+    // Run all tests in parallel, reporting failures at the end
+    let results: Vec<_> = futures::stream::iter(test_files)
+        .map(|test_file| {
+            tokio::task::spawn(async move {
+                println!("Running {:?}", test_file.relative_path);
+                if options.complete_mode {
+                    run_complete_file(test_file).await?;
+                } else if options.postgres_runner {
+                    run_test_file_with_postgres(test_file).await?;
+                } else {
+                    run_test_file(test_file).await?;
+                }
+                Ok(()) as Result<()>
+            })
+        })
+        // run up to num_cpus streams in parallel
+        .buffer_unordered(num_cpus::get())
+        .collect()
+        .await;
+
+    // Collect and examine errors
+    let errors: Vec<_> = results
+        .into_iter()
+        .filter_map(|result| {
+            match result {
+                // Tokio panic error
+                Err(e) => Some(DataFusionError::External(Box::new(e))),
+                Ok(thread_result) => match thread_result {
+                    // Test run error
+                    Err(e) => Some(e),
+                    // success
+                    Ok(_) => None,
+                },
+            }
+        })
+        .collect();

Review Comment:
   ```suggestion
           .buffer_unordered(num_cpus::get())
           .flat_map(|result| {
               futures::stream::iter(match result {
                   // Tokio panic error
                   Err(e) => Some(DataFusionError::External(Box::new(e))),
                   Ok(thread_result) => match thread_result {
                       // Test run error
                       Err(e) => Some(e),
                       // success
                       Ok(_) => None,
                   },
               })
           })
           .collect()
           .await;
   ```
   It is possible to avoid an intermediate `collect+into_iter` to transform results. `flat_map + futures::stream::iter` would flatten the stream with 0 or 1 elements.
   Another to achieve the same effect is to replace `futures::stream::iter(Some/None)` by returning `tokio_stream::empty()`/`tokio_stream::once()` instead of Some/None in the branches of `match`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org