You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/24 21:47:45 UTC
[arrow-datafusion] branch main updated: Reduce output when `sqllogictest` runs successfully, and run tests in parallel (#6393)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 32144501be Reduce output when `sqllogictest` runs successfully, and run tests in parallel (#6393)
32144501be is described below
commit 32144501be634253b254aca34660b4b3e81dbc1f
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed May 24 17:47:37 2023 -0400
Reduce output when `sqllogictest` runs successfully, and run tests in parallel (#6393)
* Reduce output in sqllogictest and run in parallel
* Improve parallelism
* Update instructions
* Add comments about parallel execution
* Use multi_thread on windows
* Apply suggestions from code review
Co-authored-by: Yevhenii Melnyk <me...@gmail.com>
* fixup
---------
Co-authored-by: Yevhenii Melnyk <me...@gmail.com>
---
datafusion/core/tests/sqllogictests/README.md | 50 ++++++-
.../sqllogictests/src/engines/datafusion/mod.rs | 6 +-
datafusion/core/tests/sqllogictests/src/main.rs | 156 +++++++++++++++------
3 files changed, 162 insertions(+), 50 deletions(-)
diff --git a/datafusion/core/tests/sqllogictests/README.md b/datafusion/core/tests/sqllogictests/README.md
index 6591161dd3..ad76dc8348 100644
--- a/datafusion/core/tests/sqllogictests/README.md
+++ b/datafusion/core/tests/sqllogictests/README.md
@@ -43,6 +43,52 @@ cargo test -p datafusion --test sqllogictests -- information_schema
cargo test -p datafusion --test sqllogictests -- ddl --complete
```
+```shell
+# Run ddl.slt, printing debug logging to stdout
+RUST_LOG=debug cargo test -p datafusion --test sqllogictests -- ddl
+```
+
+#### Cookbook: Adding Tests
+
+1. Add queries
+
+Add the setup and queries you want to run to a `.slt` file
+(`my_awesome_test.slt` in this example) using the following format:
+
+```text
+query
+CREATE TABLE foo AS VALUES (1);
+
+query
+SELECT * from foo;
+```
+
+2. Fill in expected answers with `--complete` mode
+
+Running the following command will update `my_awesome_test.slt` with the expected output:
+
+```shell
+cargo test -p datafusion --test sqllogictests -- my_awesome_test --complete
+```
+
+3. Verify the content
+
+In the case above, `my_awesome_test.slt` will look something like
+
+```
+statement ok
+CREATE TABLE foo AS VALUES (1);
+
+query I
+SELECT * from foo;
+----
+1
+```
+
+Assuming it looks good, check it in!
+
+#### Reference
+
#### Running tests: Validation Mode
In this model, `sqllogictests` runs the statements and queries in a `.slt` file, comparing the expected output in the file to the output produced by that run.
@@ -103,12 +149,12 @@ cargo test -p datafusion --test sqllogictests -- ddl --complete
#### sqllogictests
-> :warning: **Warning**:Datafusion's sqllogictest implementation and migration is still in progress. Definitions taken from https://www.sqlite.org/sqllogictest/doc/trunk/about.wiki
-
sqllogictest is a program originally written for SQLite to verify the correctness of SQL queries against the SQLite engine. The program is engine-agnostic and can parse sqllogictest files (`.slt`), runs queries against an SQL engine and compare the output to the expected output.
Tests in the `.slt` file are a sequence of query record generally starting with `CREATE` statements to populate tables and then further queries to test the populated data (arrow-datafusion exception).
+Each `.slt` file runs in its own, isolated `SessionContext`, to make the test setup explicit and so they can run in parallel. Thus it important to keep the tests from having externally visible side effects (like writing to a global location such as `/tmp/`)
+
Query records follow the format:
```sql
diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs
index 4c64daf155..dd30ef494d 100644
--- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs
+++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs
@@ -24,6 +24,7 @@ use self::error::{DFSqlLogicTestError, Result};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::prelude::SessionContext;
+use log::info;
use sqllogictest::DBOutput;
mod error;
@@ -47,13 +48,12 @@ impl sqllogictest::AsyncDB for DataFusion {
type ColumnType = DFColumnType;
async fn run(&mut self, sql: &str) -> Result<DFOutput> {
- println!(
+ info!(
"[{}] Running query: \"{}\"",
self.relative_path.display(),
sql
);
- let result = run_query(&self.ctx, sql).await?;
- Ok(result)
+ run_query(&self.ctx, sql).await
}
/// Engine name of current database.
diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs
index e3e1fd384d..27502123aa 100644
--- a/datafusion/core/tests/sqllogictests/src/main.rs
+++ b/datafusion/core/tests/sqllogictests/src/main.rs
@@ -15,16 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-use std::error::Error;
use std::path::{Path, PathBuf};
#[cfg(target_family = "windows")]
use std::thread;
+use futures::stream::StreamExt;
use log::info;
use sqllogictest::strict_column_validator;
use tempfile::TempDir;
use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_common::{DataFusionError, Result};
use crate::engines::datafusion::DataFusion;
use crate::engines::postgres::Postgres;
@@ -42,7 +43,7 @@ pub fn main() {
thread::Builder::new()
.stack_size(2 * 1024 * 1024) // 2 MB
.spawn(move || {
- tokio::runtime::Builder::new_current_thread()
+ tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
@@ -56,33 +57,72 @@ pub fn main() {
#[tokio::main]
#[cfg(not(target_family = "windows"))]
-pub async fn main() -> Result<(), Box<dyn Error>> {
+pub async fn main() -> Result<()> {
run_tests().await
}
-async fn run_tests() -> Result<(), Box<dyn Error>> {
+async fn run_tests() -> Result<()> {
// Enable logging (e.g. set RUST_LOG=debug to see debug logs)
env_logger::init();
let options = Options::new();
- for (path, relative_path) in read_test_files(&options) {
- if options.complete_mode {
- run_complete_file(&path, relative_path).await?;
- } else if options.postgres_runner {
- run_test_file_with_postgres(&path, relative_path).await?;
- } else {
- run_test_file(&path, relative_path).await?;
+ // Run all tests in parallel, reporting failures at the end
+ //
+ // Doing so is safe because each slt file runs with its own
+ // `SessionContext` and should not have side effects (like
+ // modifying shared state like `/tmp/`)
+ let errors: Vec<_> = futures::stream::iter(read_test_files(&options))
+ .map(|test_file| {
+ tokio::task::spawn(async move {
+ println!("Running {:?}", test_file.relative_path);
+ if options.complete_mode {
+ run_complete_file(test_file).await?;
+ } else if options.postgres_runner {
+ run_test_file_with_postgres(test_file).await?;
+ } else {
+ run_test_file(test_file).await?;
+ }
+ Ok(()) as Result<()>
+ })
+ })
+ // run up to num_cpus streams in parallel
+ .buffer_unordered(num_cpus::get())
+ .flat_map(|result| {
+ // Filter out any Ok() leaving only the DataFusionErrors
+ futures::stream::iter(match result {
+ // Tokio panic error
+ Err(e) => Some(DataFusionError::External(Box::new(e))),
+ Ok(thread_result) => match thread_result {
+ // Test run error
+ Err(e) => Some(e),
+ // success
+ Ok(_) => None,
+ },
+ })
+ })
+ .collect()
+ .await;
+
+ // report on any errors
+ if !errors.is_empty() {
+ for e in &errors {
+ println!("{e}");
}
+ Err(DataFusionError::Execution(format!(
+ "{} failures",
+ errors.len()
+ )))
+ } else {
+ Ok(())
}
-
- Ok(())
}
-async fn run_test_file(
- path: &Path,
- relative_path: PathBuf,
-) -> Result<(), Box<dyn Error>> {
+async fn run_test_file(test_file: TestFile) -> Result<()> {
+ let TestFile {
+ path,
+ relative_path,
+ } = test_file;
info!("Running with DataFusion runner: {}", path.display());
let Some(test_ctx) = context_for_test_file(&relative_path).await else {
info!("Skipping: {}", path.display());
@@ -91,26 +131,35 @@ async fn run_test_file(
let ctx = test_ctx.session_ctx().clone();
let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path));
runner.with_column_validator(strict_column_validator);
- runner.run_file_async(path).await?;
- Ok(())
+ runner
+ .run_file_async(path)
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))
}
-async fn run_test_file_with_postgres(
- path: &Path,
- relative_path: PathBuf,
-) -> Result<(), Box<dyn Error>> {
+async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> {
+ let TestFile {
+ path,
+ relative_path,
+ } = test_file;
info!("Running with Postgres runner: {}", path.display());
- let postgres_client = Postgres::connect(relative_path).await?;
+ let postgres_client = Postgres::connect(relative_path)
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
let mut runner = sqllogictest::Runner::new(postgres_client);
runner.with_column_validator(strict_column_validator);
- runner.run_file_async(path).await?;
+ runner
+ .run_file_async(path)
+ .await
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(())
}
-async fn run_complete_file(
- path: &Path,
- relative_path: PathBuf,
-) -> Result<(), Box<dyn Error>> {
+async fn run_complete_file(test_file: TestFile) -> Result<()> {
+ let TestFile {
+ path,
+ relative_path,
+ } = test_file;
use sqllogictest::default_validator;
info!("Using complete mode to complete: {}", path.display());
@@ -120,7 +169,8 @@ async fn run_complete_file(
return Ok(())
};
let ctx = test_ctx.session_ctx().clone();
- let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path));
+ let mut runner =
+ sqllogictest::Runner::new(DataFusion::new(ctx, relative_path.clone()));
let col_separator = " ";
runner
.update_test_file(
@@ -130,26 +180,42 @@ async fn run_complete_file(
strict_column_validator,
)
.await
- .map_err(|e| e.to_string())?;
+ // Can't use e directly because it isn't marked Send, so turn it into a string.
+ .map_err(|e| {
+ DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}"))
+ })
+}
- Ok(())
+/// Represents a parsed test file
+#[derive(Debug)]
+struct TestFile {
+ /// The absolute path to the file
+ pub path: PathBuf,
+ /// The relative path of the file (used for display)
+ pub relative_path: PathBuf,
+}
+
+impl TestFile {
+ fn new(path: PathBuf) -> Self {
+ let relative_path = PathBuf::from(
+ path.to_string_lossy()
+ .strip_prefix(TEST_DIRECTORY)
+ .unwrap_or(""),
+ );
+
+ Self {
+ path,
+ relative_path,
+ }
+ }
}
-fn read_test_files<'a>(
- options: &'a Options,
-) -> Box<dyn Iterator<Item = (PathBuf, PathBuf)> + 'a> {
+fn read_test_files<'a>(options: &'a Options) -> Box<dyn Iterator<Item = TestFile> + 'a> {
Box::new(
read_dir_recursive(TEST_DIRECTORY)
- .map(|path| {
- (
- path.clone(),
- PathBuf::from(
- path.to_string_lossy().strip_prefix(TEST_DIRECTORY).unwrap(),
- ),
- )
- })
- .filter(|(_, relative_path)| options.check_test_file(relative_path))
- .filter(|(path, _)| options.check_pg_compat_file(path.as_path())),
+ .map(TestFile::new)
+ .filter(|f| options.check_test_file(&f.relative_path))
+ .filter(|f| options.check_pg_compat_file(f.path.as_path())),
)
}