You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/24 21:47:45 UTC

[arrow-datafusion] branch main updated: Reduce output when `sqllogictest` runs successfully, and run tests in parallel (#6393)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 32144501be Reduce output when `sqllogictest` runs successfully, and run tests in parallel (#6393)
32144501be is described below

commit 32144501be634253b254aca34660b4b3e81dbc1f
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed May 24 17:47:37 2023 -0400

    Reduce output when `sqllogictest` runs successfully, and run tests in parallel (#6393)
    
    * Reduce output in sqllogictest and run in parallel
    
    * Improve parallelism
    
    * Update instructions
    
    * Add comments about parallel execution
    
    * Use multi_thread on windows
    
    * Apply suggestions from code review
    
    Co-authored-by: Yevhenii Melnyk <me...@gmail.com>
    
    * fixup
    
    ---------
    
    Co-authored-by: Yevhenii Melnyk <me...@gmail.com>
---
 datafusion/core/tests/sqllogictests/README.md      |  50 ++++++-
 .../sqllogictests/src/engines/datafusion/mod.rs    |   6 +-
 datafusion/core/tests/sqllogictests/src/main.rs    | 156 +++++++++++++++------
 3 files changed, 162 insertions(+), 50 deletions(-)

diff --git a/datafusion/core/tests/sqllogictests/README.md b/datafusion/core/tests/sqllogictests/README.md
index 6591161dd3..ad76dc8348 100644
--- a/datafusion/core/tests/sqllogictests/README.md
+++ b/datafusion/core/tests/sqllogictests/README.md
@@ -43,6 +43,52 @@ cargo test -p datafusion --test sqllogictests -- information_schema
 cargo test -p datafusion --test sqllogictests -- ddl --complete
 ```
 
+```shell
+# Run ddl.slt, printing debug logging to stdout
+RUST_LOG=debug cargo test -p datafusion --test sqllogictests -- ddl
+```
+
+#### Cookbook: Adding Tests
+
+1. Add queries
+
+Add the setup and queries you want to run to a `.slt` file
+(`my_awesome_test.slt` in this example) using the following format:
+
+```text
+query
+CREATE TABLE foo AS VALUES (1);
+
+query
+SELECT * from foo;
+```
+
+2. Fill in expected answers with `--complete` mode
+
+Running the following command will update `my_awesome_test.slt` with the expected output:
+
+```shell
+cargo test -p datafusion --test sqllogictests -- my_awesome_test --complete
+```
+
+3. Verify the content
+
+In the case above, `my_awesome_test.slt` will look something like
+
+```
+statement ok
+CREATE TABLE foo AS VALUES (1);
+
+query I
+SELECT * from foo;
+----
+1
+```
+
+Assuming it looks good, check it in!
+
+#### Reference
+
 #### Running tests: Validation Mode
 
 In this model, `sqllogictests` runs the statements and queries in a `.slt` file, comparing the expected output in the file to the output produced by that run.
@@ -103,12 +149,12 @@ cargo test -p datafusion --test sqllogictests -- ddl --complete
 
 #### sqllogictests
 
-> :warning: **Warning**:Datafusion's sqllogictest implementation and migration is still in progress. Definitions taken from https://www.sqlite.org/sqllogictest/doc/trunk/about.wiki
-
 sqllogictest is a program originally written for SQLite to verify the correctness of SQL queries against the SQLite engine. The program is engine-agnostic and can parse sqllogictest files (`.slt`), runs queries against an SQL engine and compare the output to the expected output.
 
 Tests in the `.slt` file are a sequence of query record generally starting with `CREATE` statements to populate tables and then further queries to test the populated data (arrow-datafusion exception).
 
+Each `.slt` file runs in its own, isolated `SessionContext`, to make the test setup explicit and so they can run in parallel. Thus it important to keep the tests from having externally visible side effects (like writing to a global location such as `/tmp/`)
+
 Query records follow the format:
 
 ```sql
diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs
index 4c64daf155..dd30ef494d 100644
--- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs
+++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs
@@ -24,6 +24,7 @@ use self::error::{DFSqlLogicTestError, Result};
 use async_trait::async_trait;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::prelude::SessionContext;
+use log::info;
 use sqllogictest::DBOutput;
 
 mod error;
@@ -47,13 +48,12 @@ impl sqllogictest::AsyncDB for DataFusion {
     type ColumnType = DFColumnType;
 
     async fn run(&mut self, sql: &str) -> Result<DFOutput> {
-        println!(
+        info!(
             "[{}] Running query: \"{}\"",
             self.relative_path.display(),
             sql
         );
-        let result = run_query(&self.ctx, sql).await?;
-        Ok(result)
+        run_query(&self.ctx, sql).await
     }
 
     /// Engine name of current database.
diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs
index e3e1fd384d..27502123aa 100644
--- a/datafusion/core/tests/sqllogictests/src/main.rs
+++ b/datafusion/core/tests/sqllogictests/src/main.rs
@@ -15,16 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::error::Error;
 use std::path::{Path, PathBuf};
 #[cfg(target_family = "windows")]
 use std::thread;
 
+use futures::stream::StreamExt;
 use log::info;
 use sqllogictest::strict_column_validator;
 use tempfile::TempDir;
 
 use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_common::{DataFusionError, Result};
 
 use crate::engines::datafusion::DataFusion;
 use crate::engines::postgres::Postgres;
@@ -42,7 +43,7 @@ pub fn main() {
     thread::Builder::new()
         .stack_size(2 * 1024 * 1024) // 2 MB
         .spawn(move || {
-            tokio::runtime::Builder::new_current_thread()
+            tokio::runtime::Builder::new_multi_thread()
                 .enable_all()
                 .build()
                 .unwrap()
@@ -56,33 +57,72 @@ pub fn main() {
 
 #[tokio::main]
 #[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
     run_tests().await
 }
 
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
     // Enable logging (e.g. set RUST_LOG=debug to see debug logs)
     env_logger::init();
 
     let options = Options::new();
 
-    for (path, relative_path) in read_test_files(&options) {
-        if options.complete_mode {
-            run_complete_file(&path, relative_path).await?;
-        } else if options.postgres_runner {
-            run_test_file_with_postgres(&path, relative_path).await?;
-        } else {
-            run_test_file(&path, relative_path).await?;
+    // Run all tests in parallel, reporting failures at the end
+    //
+    // Doing so is safe because each slt file runs with its own
+    // `SessionContext` and should not have side effects (like
+    // modifying shared state like `/tmp/`)
+    let errors: Vec<_> = futures::stream::iter(read_test_files(&options))
+        .map(|test_file| {
+            tokio::task::spawn(async move {
+                println!("Running {:?}", test_file.relative_path);
+                if options.complete_mode {
+                    run_complete_file(test_file).await?;
+                } else if options.postgres_runner {
+                    run_test_file_with_postgres(test_file).await?;
+                } else {
+                    run_test_file(test_file).await?;
+                }
+                Ok(()) as Result<()>
+            })
+        })
+        // run up to num_cpus streams in parallel
+        .buffer_unordered(num_cpus::get())
+        .flat_map(|result| {
+            // Filter out any Ok() leaving only the DataFusionErrors
+            futures::stream::iter(match result {
+                // Tokio panic error
+                Err(e) => Some(DataFusionError::External(Box::new(e))),
+                Ok(thread_result) => match thread_result {
+                    // Test run error
+                    Err(e) => Some(e),
+                    // success
+                    Ok(_) => None,
+                },
+            })
+        })
+        .collect()
+        .await;
+
+    // report on any errors
+    if !errors.is_empty() {
+        for e in &errors {
+            println!("{e}");
         }
+        Err(DataFusionError::Execution(format!(
+            "{} failures",
+            errors.len()
+        )))
+    } else {
+        Ok(())
     }
-
-    Ok(())
 }
 
-async fn run_test_file(
-    path: &Path,
-    relative_path: PathBuf,
-) -> Result<(), Box<dyn Error>> {
+async fn run_test_file(test_file: TestFile) -> Result<()> {
+    let TestFile {
+        path,
+        relative_path,
+    } = test_file;
     info!("Running with DataFusion runner: {}", path.display());
     let Some(test_ctx) = context_for_test_file(&relative_path).await else {
         info!("Skipping: {}", path.display());
@@ -91,26 +131,35 @@ async fn run_test_file(
     let ctx = test_ctx.session_ctx().clone();
     let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path));
     runner.with_column_validator(strict_column_validator);
-    runner.run_file_async(path).await?;
-    Ok(())
+    runner
+        .run_file_async(path)
+        .await
+        .map_err(|e| DataFusionError::External(Box::new(e)))
 }
 
-async fn run_test_file_with_postgres(
-    path: &Path,
-    relative_path: PathBuf,
-) -> Result<(), Box<dyn Error>> {
+async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> {
+    let TestFile {
+        path,
+        relative_path,
+    } = test_file;
     info!("Running with Postgres runner: {}", path.display());
-    let postgres_client = Postgres::connect(relative_path).await?;
+    let postgres_client = Postgres::connect(relative_path)
+        .await
+        .map_err(|e| DataFusionError::External(Box::new(e)))?;
     let mut runner = sqllogictest::Runner::new(postgres_client);
     runner.with_column_validator(strict_column_validator);
-    runner.run_file_async(path).await?;
+    runner
+        .run_file_async(path)
+        .await
+        .map_err(|e| DataFusionError::External(Box::new(e)))?;
     Ok(())
 }
 
-async fn run_complete_file(
-    path: &Path,
-    relative_path: PathBuf,
-) -> Result<(), Box<dyn Error>> {
+async fn run_complete_file(test_file: TestFile) -> Result<()> {
+    let TestFile {
+        path,
+        relative_path,
+    } = test_file;
     use sqllogictest::default_validator;
 
     info!("Using complete mode to complete: {}", path.display());
@@ -120,7 +169,8 @@ async fn run_complete_file(
         return Ok(())
     };
     let ctx = test_ctx.session_ctx().clone();
-    let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path));
+    let mut runner =
+        sqllogictest::Runner::new(DataFusion::new(ctx, relative_path.clone()));
     let col_separator = " ";
     runner
         .update_test_file(
@@ -130,26 +180,42 @@ async fn run_complete_file(
             strict_column_validator,
         )
         .await
-        .map_err(|e| e.to_string())?;
+        // Can't use e directly because it isn't marked Send, so turn it into a string.
+        .map_err(|e| {
+            DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}"))
+        })
+}
 
-    Ok(())
+/// Represents a parsed test file
+#[derive(Debug)]
+struct TestFile {
+    /// The absolute path to the file
+    pub path: PathBuf,
+    /// The relative path of the file (used for display)
+    pub relative_path: PathBuf,
+}
+
+impl TestFile {
+    fn new(path: PathBuf) -> Self {
+        let relative_path = PathBuf::from(
+            path.to_string_lossy()
+                .strip_prefix(TEST_DIRECTORY)
+                .unwrap_or(""),
+        );
+
+        Self {
+            path,
+            relative_path,
+        }
+    }
 }
 
-fn read_test_files<'a>(
-    options: &'a Options,
-) -> Box<dyn Iterator<Item = (PathBuf, PathBuf)> + 'a> {
+fn read_test_files<'a>(options: &'a Options) -> Box<dyn Iterator<Item = TestFile> + 'a> {
     Box::new(
         read_dir_recursive(TEST_DIRECTORY)
-            .map(|path| {
-                (
-                    path.clone(),
-                    PathBuf::from(
-                        path.to_string_lossy().strip_prefix(TEST_DIRECTORY).unwrap(),
-                    ),
-                )
-            })
-            .filter(|(_, relative_path)| options.check_test_file(relative_path))
-            .filter(|(path, _)| options.check_pg_compat_file(path.as_path())),
+            .map(TestFile::new)
+            .filter(|f| options.check_test_file(&f.relative_path))
+            .filter(|f| options.check_pg_compat_file(f.path.as_path())),
     )
 }