You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/28 20:56:50 UTC
[arrow-datafusion] branch master updated: Support for executing infinite files and boundedness-aware join reordering rule (#4694)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new cd4fd80dd Support for executing infinite files and boundedness-aware join reordering rule (#4694)
cd4fd80dd is described below
commit cd4fd80dd5553ffdac41e21c2b7bfe0296f30695
Author: Metehan Yıldırım <10...@users.noreply.github.com>
AuthorDate: Wed Dec 28 23:56:45 2022 +0300
Support for executing infinite files and boundedness-aware join reordering rule (#4694)
* Executing infinite files with necessary optimization and file configurations
* Future-proofing, more idiomatic expressions, better comments
* Testing incremental execution
* Simplify FIFO test code
* Revamping unboundedness decision on rules and executors
* Clippy resolutions
* Change in some rules
* Minor refactors and renamings
* Cargo fmt changes
* Resolving depreciation warnings
* Splitting fixer and checker rule, before merge conflict
* Clippy and comments
* Update Cargo.lock
* Improve/correct comments, simplify PipelineFixer code
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion-cli/Cargo.lock | 149 ++---
datafusion/core/Cargo.toml | 4 +
datafusion/core/src/datasource/file_format/mod.rs | 1 +
datafusion/core/src/datasource/listing/table.rs | 154 ++++-
datafusion/core/src/execution/context.rs | 66 ++-
datafusion/core/src/execution/options.rs | 50 +-
.../core/src/physical_optimizer/enforcement.rs | 1 +
.../core/src/physical_optimizer/join_selection.rs | 64 +--
datafusion/core/src/physical_optimizer/mod.rs | 5 +
.../src/physical_optimizer/pipeline_checker.rs | 389 +++++++++++++
.../core/src/physical_optimizer/pipeline_fixer.rs | 623 +++++++++++++++++++++
.../core/src/physical_optimizer/repartition.rs | 1 +
.../core/src/physical_optimizer/test_utils.rs | 127 +++++
.../core/src/physical_plan/aggregates/mod.rs | 15 +-
datafusion/core/src/physical_plan/analyze.rs | 16 +-
.../core/src/physical_plan/coalesce_batches.rs | 7 +
.../core/src/physical_plan/coalesce_partitions.rs | 6 +
.../core/src/physical_plan/file_format/avro.rs | 7 +
.../core/src/physical_plan/file_format/csv.rs | 4 +
.../src/physical_plan/file_format/file_stream.rs | 1 +
.../core/src/physical_plan/file_format/json.rs | 7 +
.../core/src/physical_plan/file_format/mod.rs | 3 +
.../core/src/physical_plan/file_format/parquet.rs | 4 +
datafusion/core/src/physical_plan/filter.rs | 7 +
.../core/src/physical_plan/joins/cross_join.rs | 15 +
.../core/src/physical_plan/joins/hash_join.rs | 31 +
datafusion/core/src/physical_plan/mod.rs | 7 +
datafusion/core/src/physical_plan/projection.rs | 7 +
datafusion/core/src/physical_plan/repartition.rs | 7 +
datafusion/core/src/physical_plan/sorts/sort.rs | 15 +-
datafusion/core/src/physical_plan/union.rs | 7 +
.../src/physical_plan/windows/window_agg_exec.rs | 16 +-
datafusion/core/src/test/exec.rs | 70 +++
datafusion/core/src/test/mod.rs | 1 +
datafusion/core/tests/fifo.rs | 225 ++++++++
datafusion/core/tests/parquet/custom_reader.rs | 1 +
datafusion/core/tests/parquet/page_pruning.rs | 1 +
datafusion/core/tests/row.rs | 1 +
datafusion/proto/src/physical_plan/from_proto.rs | 1 +
datafusion/proto/src/physical_plan/mod.rs | 1 +
parquet-test-utils/src/lib.rs | 1 +
41 files changed, 1986 insertions(+), 132 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index cb6c543ab..baddb4226 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -263,9 +263,9 @@ dependencies = [
[[package]]
name = "async-trait"
-version = "0.1.59"
+version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364"
+checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3"
dependencies = [
"proc-macro2",
"quote",
@@ -278,7 +278,7 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
- "hermit-abi",
+ "hermit-abi 0.1.19",
"libc",
"winapi",
]
@@ -303,9 +303,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "blake2"
-version = "0.10.5"
+version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b12e5fd123190ce1c2e559308a94c9bacad77907d4c6005d9e58fe1a0689e55e"
+checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe"
dependencies = [
"digest",
]
@@ -407,9 +407,9 @@ dependencies = [
[[package]]
name = "cc"
-version = "1.0.77"
+version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e9f73505338f7d905b19d18738976aae232eb46b8efc15554ffc56deb5d9ebe4"
+checksum = "a20104e2335ce8a659d6dd92a51a767a0c062599c73b343fd152cb401e828c3d"
dependencies = [
"jobserver",
]
@@ -596,9 +596,9 @@ dependencies = [
[[package]]
name = "cxx"
-version = "1.0.83"
+version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bdf07d07d6531bfcdbe9b8b739b104610c6508dcc4d63b410585faf338241daf"
+checksum = "5add3fc1717409d029b20c5b6903fc0c0b02fa6741d820054f4a2efa5e5816fd"
dependencies = [
"cc",
"cxxbridge-flags",
@@ -608,9 +608,9 @@ dependencies = [
[[package]]
name = "cxx-build"
-version = "1.0.83"
+version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d2eb5b96ecdc99f72657332953d4d9c50135af1bac34277801cc3937906ebd39"
+checksum = "b4c87959ba14bc6fbc61df77c3fcfe180fc32b93538c4f1031dd802ccb5f2ff0"
dependencies = [
"cc",
"codespan-reporting",
@@ -623,15 +623,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
-version = "1.0.83"
+version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ac040a39517fd1674e0f32177648334b0f4074625b5588a64519804ba0553b12"
+checksum = "69a3e162fde4e594ed2b07d0f83c6c67b745e7f28ce58c6df5e6b6bef99dfb59"
[[package]]
name = "cxxbridge-macro"
-version = "1.0.83"
+version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1362b0ddcfc4eb0a1f57b68bd77dd99f0e826958a96abd0ae9bd092e114ffed6"
+checksum = "3e7e2adeb6a0d4a282e581096b06e1791532b7d576dcde5ccd9382acf55db8e6"
dependencies = [
"proc-macro2",
"quote",
@@ -1151,6 +1151,15 @@ dependencies = [
"libc",
]
+[[package]]
+name = "hermit-abi"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "http"
version = "0.2.8"
@@ -1159,7 +1168,7 @@ checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
dependencies = [
"bytes",
"fnv",
- "itoa 1.0.4",
+ "itoa 1.0.5",
]
[[package]]
@@ -1206,7 +1215,7 @@ dependencies = [
"http-body",
"httparse",
"httpdate",
- "itoa 1.0.4",
+ "itoa 1.0.5",
"pin-project-lite",
"socket2",
"tokio",
@@ -1299,9 +1308,9 @@ dependencies = [
[[package]]
name = "ipnet"
-version = "2.6.0"
+version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec947b7a4ce12e3b87e353abae7ce124d025b6c7d6c5aea5cc0bcf92e9510ded"
+checksum = "11b0d96e660696543b251e58030cf9787df56da39dab19ad60eae7353040917e"
[[package]]
name = "itertools"
@@ -1320,9 +1329,9 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "itoa"
-version = "1.0.4"
+version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
+checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
[[package]]
name = "jobserver"
@@ -1414,9 +1423,9 @@ dependencies = [
[[package]]
name = "libc"
-version = "0.2.138"
+version = "0.2.139"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8"
+checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
[[package]]
name = "libm"
@@ -1436,18 +1445,18 @@ dependencies = [
[[package]]
name = "link-cplusplus"
-version = "1.0.7"
+version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369"
+checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5"
dependencies = [
"cc",
]
[[package]]
name = "linux-raw-sys"
-version = "0.1.3"
+version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f"
+checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4"
[[package]]
name = "lock_api"
@@ -1669,11 +1678,11 @@ dependencies = [
[[package]]
name = "num_cpus"
-version = "1.14.0"
+version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5"
+checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
- "hermit-abi",
+ "hermit-abi 0.2.6",
"libc",
]
@@ -1785,9 +1794,9 @@ dependencies = [
[[package]]
name = "paste"
-version = "1.0.9"
+version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b1de2e551fb905ac83f73f7aedf2f0cb4a0da7e35efa24a202a936269f1f18e1"
+checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba"
[[package]]
name = "percent-encoding"
@@ -1845,15 +1854,15 @@ dependencies = [
[[package]]
name = "proc-macro-hack"
-version = "0.5.19"
+version = "0.5.20+deprecated"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
+checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
-version = "1.0.47"
+version = "1.0.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
+checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5"
dependencies = [
"unicode-ident",
]
@@ -1870,9 +1879,9 @@ dependencies = [
[[package]]
name = "quote"
-version = "1.0.21"
+version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
+checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b"
dependencies = [
"proc-macro2",
]
@@ -2026,9 +2035,9 @@ dependencies = [
[[package]]
name = "rustix"
-version = "0.36.5"
+version = "0.36.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3807b5d10909833d3e9acd1eb5fb988f79376ff10fce42937de71a449c4c588"
+checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549"
dependencies = [
"bitflags",
"errno",
@@ -2061,9 +2070,9 @@ dependencies = [
[[package]]
name = "rustversion"
-version = "1.0.9"
+version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
+checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70"
[[package]]
name = "rustyline"
@@ -2090,9 +2099,9 @@ dependencies = [
[[package]]
name = "ryu"
-version = "1.0.11"
+version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
+checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"
[[package]]
name = "same-file"
@@ -2111,9 +2120,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
-version = "1.0.2"
+version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898"
+checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
[[package]]
name = "sct"
@@ -2127,24 +2136,24 @@ dependencies = [
[[package]]
name = "seq-macro"
-version = "0.3.1"
+version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0772c5c30e1a0d91f6834f8e545c69281c099dfa9a3ac58d96a9fd629c8d4898"
+checksum = "1685deded9b272198423bdbdb907d8519def2f26cf3699040e54e8c4fbd5c5ce"
[[package]]
name = "serde"
-version = "1.0.149"
+version = "1.0.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "256b9932320c590e707b94576e3cc1f7c9024d0ee6612dfbcf1cb106cbe8e055"
+checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
-version = "1.0.149"
+version = "1.0.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b4eae9b04cbffdfd550eb462ed33bc6a1b68c935127d008b27444d08380f94e4"
+checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
dependencies = [
"proc-macro2",
"quote",
@@ -2153,11 +2162,11 @@ dependencies = [
[[package]]
name = "serde_json"
-version = "1.0.89"
+version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db"
+checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
dependencies = [
- "itoa 1.0.4",
+ "itoa 1.0.5",
"ryu",
"serde",
]
@@ -2169,7 +2178,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
- "itoa 1.0.4",
+ "itoa 1.0.5",
"ryu",
"serde",
]
@@ -2202,9 +2211,9 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "snafu"
-version = "0.7.3"
+version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a152ba99b054b22972ee794cf04e5ef572da1229e33b65f3c57abbff0525a454"
+checksum = "cb0656e7e3ffb70f6c39b3c2a86332bb74aa3c679da781642590f3c1118c5045"
dependencies = [
"doc-comment",
"snafu-derive",
@@ -2212,9 +2221,9 @@ dependencies = [
[[package]]
name = "snafu-derive"
-version = "0.7.3"
+version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d5e79cdebbabaebb06a9bdbaedc7f159b410461f63611d4d0e3fb0fab8fed850"
+checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2"
dependencies = [
"heck",
"proc-macro2",
@@ -2298,9 +2307,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
-version = "1.0.105"
+version = "1.0.107"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908"
+checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5"
dependencies = [
"proc-macro2",
"quote",
@@ -2338,18 +2347,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
-version = "1.0.37"
+version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
+checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
-version = "1.0.37"
+version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
+checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
dependencies = [
"proc-macro2",
"quote",
@@ -2525,9 +2534,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-ident"
-version = "1.0.5"
+version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
+checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
[[package]]
name = "unicode-normalization"
@@ -2703,9 +2712,9 @@ dependencies = [
[[package]]
name = "webpki-roots"
-version = "0.22.5"
+version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be"
+checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki",
]
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 18f590168..eaa01ca46 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -99,6 +99,7 @@ url = "2.2"
uuid = { version = "1.0", features = ["v4"] }
xz2 = { version = "0.1", optional = true }
+
[dev-dependencies]
arrow = { version = "29.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
async-trait = "0.1.53"
@@ -114,6 +115,9 @@ sqlparser = "0.28"
test-utils = { path = "../../test-utils" }
thiserror = "1.0.37"
+[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
+nix = "0.26.1"
+
[[bench]]
harness = false
name = "aggregate_query_sql"
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index a6b8a7904..b73b3881a 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -138,6 +138,7 @@ pub(crate) mod test_util {
limit,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
&[],
)
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 26ee98ea4..8458cc4ae 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -229,6 +229,11 @@ pub struct ListingOptions {
///
/// See <https://github.com/apache/arrow-datafusion/issues/4177>
pub file_sort_order: Option<Vec<Expr>>,
+ /// Infinite source means that the input is not guaranteed to end.
+ /// Currently, CSV, JSON, and AVRO formats are supported.
+ /// In order to support infinite inputs, DataFusion may adjust query
+ /// plans (e.g. joins) to run the given query in full pipelining mode.
+ pub infinite_source: bool,
}
impl ListingOptions {
@@ -246,9 +251,28 @@ impl ListingOptions {
collect_stat: true,
target_partitions: 1,
file_sort_order: None,
+ infinite_source: false,
}
}
+ /// Set unbounded assumption on [`ListingOptions`] and returns self.
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use datafusion::datasource::{listing::ListingOptions, file_format::csv::CsvFormat};
+ /// use datafusion::prelude::SessionContext;
+ /// let ctx = SessionContext::new();
+ /// let listing_options = ListingOptions::new(Arc::new(
+ /// CsvFormat::default()
+ /// )).with_infinite_source(true);
+ ///
+ /// assert_eq!(listing_options.infinite_source, true);
+ /// ```
+ pub fn with_infinite_source(mut self, infinite_source: bool) -> Self {
+ self.infinite_source = infinite_source;
+ self
+ }
+
/// Set file extension on [`ListingOptions`] and returns self.
///
/// ```
@@ -419,6 +443,7 @@ pub struct ListingTable {
options: ListingOptions,
definition: Option<String>,
collected_statistics: StatisticsCache,
+ infinite_source: bool,
}
impl ListingTable {
@@ -447,6 +472,7 @@ impl ListingTable {
false,
));
}
+ let infinite_source = options.infinite_source;
let table = Self {
table_paths: config.table_paths,
@@ -455,6 +481,7 @@ impl ListingTable {
options,
definition: None,
collected_statistics: Default::default(),
+ infinite_source,
};
Ok(table)
@@ -578,6 +605,7 @@ impl TableProvider for ListingTable {
limit,
output_ordering: self.try_create_output_ordering()?,
table_partition_cols,
+ infinite_source: self.infinite_source,
},
filters,
)
@@ -678,8 +706,9 @@ impl ListingTable {
#[cfg(test)]
mod tests {
+ use super::*;
use crate::datasource::file_format::file_type::GetExt;
- use crate::prelude::SessionContext;
+ use crate::prelude::*;
use crate::{
datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat},
logical_expr::{col, lit},
@@ -688,8 +717,37 @@ mod tests {
use arrow::datatypes::DataType;
use chrono::DateTime;
use datafusion_common::assert_contains;
+ use rstest::*;
+ use std::fs::File;
+ use tempfile::TempDir;
+
+ /// It creates dummy file and checks if it can create unbounded input executors.
+ async fn unbounded_table_helper(
+ file_type: FileType,
+ listing_option: ListingOptions,
+ infinite_data: bool,
+ ) -> Result<()> {
+ let ctx = SessionContext::new();
+ register_test_store(
+ &ctx,
+ &[(&format!("table/file{}", file_type.get_ext()), 100)],
+ );
- use super::*;
+ let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
+
+ let table_path = ListingTableUrl::parse("test:///table/").unwrap();
+ let config = ListingTableConfig::new(table_path)
+ .with_listing_options(listing_option)
+ .with_schema(Arc::new(schema));
+ // Create a table
+ let table = ListingTable::try_new(config)?;
+ // Create executor from table
+ let source_exec = table.scan(&ctx.state(), None, &[], None).await?;
+
+ assert_eq!(source_exec.unbounded_output(&[])?, infinite_data);
+
+ Ok(())
+ }
#[tokio::test]
async fn read_single_file() -> Result<()> {
@@ -770,7 +828,7 @@ mod tests {
let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = options.infer_schema(&state, &table_path).await.unwrap();
- use physical_plan::expressions::col as physical_col;
+ use crate::physical_plan::expressions::col as physical_col;
use std::ops::Add;
// (file_sort_order, expected_result)
@@ -904,6 +962,96 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn unbounded_csv_table_without_schema() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let file_path = tmp_dir.path().join("dummy.csv");
+ File::create(file_path)?;
+ let ctx = SessionContext::new();
+ let error = ctx
+ .register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new().mark_infinite(true),
+ )
+ .await
+ .unwrap_err();
+ match error {
+ DataFusionError::Plan(_) => Ok(()),
+ val => Err(val),
+ }
+ }
+
+ #[tokio::test]
+ async fn unbounded_json_table_without_schema() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let file_path = tmp_dir.path().join("dummy.json");
+ File::create(file_path)?;
+ let ctx = SessionContext::new();
+ let error = ctx
+ .register_json(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ NdJsonReadOptions::default().mark_infinite(true),
+ )
+ .await
+ .unwrap_err();
+ match error {
+ DataFusionError::Plan(_) => Ok(()),
+ val => Err(val),
+ }
+ }
+
+ #[tokio::test]
+ async fn unbounded_avro_table_without_schema() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let file_path = tmp_dir.path().join("dummy.avro");
+ File::create(file_path)?;
+ let ctx = SessionContext::new();
+ let error = ctx
+ .register_avro(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ AvroReadOptions::default().mark_infinite(true),
+ )
+ .await
+ .unwrap_err();
+ match error {
+ DataFusionError::Plan(_) => Ok(()),
+ val => Err(val),
+ }
+ }
+
+ #[rstest]
+ #[tokio::test]
+ async fn unbounded_csv_table(
+ #[values(true, false)] infinite_data: bool,
+ ) -> Result<()> {
+ let config = CsvReadOptions::new().mark_infinite(infinite_data);
+ let listing_options = config.to_listing_options(1);
+ unbounded_table_helper(FileType::CSV, listing_options, infinite_data).await
+ }
+
+ #[rstest]
+ #[tokio::test]
+ async fn unbounded_json_table(
+ #[values(true, false)] infinite_data: bool,
+ ) -> Result<()> {
+ let config = NdJsonReadOptions::default().mark_infinite(infinite_data);
+ let listing_options = config.to_listing_options(1);
+ unbounded_table_helper(FileType::JSON, listing_options, infinite_data).await
+ }
+
+ #[rstest]
+ #[tokio::test]
+ async fn unbounded_avro_table(
+ #[values(true, false)] infinite_data: bool,
+ ) -> Result<()> {
+ let config = AvroReadOptions::default().mark_infinite(infinite_data);
+ let listing_options = config.to_listing_options(1);
+ unbounded_table_helper(FileType::AVRO, listing_options, infinite_data).await
+ }
+
#[tokio::test]
async fn test_assert_list_files_for_scan_grouping() -> Result<()> {
// more expected partitions than files
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index db6ab99fb..1fa6cc383 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -101,6 +101,8 @@ use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
+use crate::physical_optimizer::pipeline_checker::PipelineChecker;
+use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
use uuid::Uuid;
use super::options::{
@@ -630,13 +632,19 @@ impl SessionContext {
let listing_options = options.to_listing_options(target_partitions);
- let resolved_schema = match options.schema {
- Some(s) => s,
- None => {
+ let resolved_schema = match (options.schema, options.infinite) {
+ (Some(s), _) => Arc::new(s.to_owned()),
+ (None, false) => {
listing_options
.infer_schema(&self.state(), &table_path)
.await?
}
+ (None, true) => {
+ return Err(DataFusionError::Plan(
+ "Schema inference for infinite data sources is not supported."
+ .to_string(),
+ ))
+ }
};
let config = ListingTableConfig::new(table_path)
@@ -657,13 +665,19 @@ impl SessionContext {
let listing_options = options.to_listing_options(target_partitions);
- let resolved_schema = match options.schema {
- Some(s) => s,
- None => {
+ let resolved_schema = match (options.schema, options.infinite) {
+ (Some(s), _) => Arc::new(s.to_owned()),
+ (None, false) => {
listing_options
.infer_schema(&self.state(), &table_path)
.await?
}
+ (None, true) => {
+ return Err(DataFusionError::Plan(
+ "Schema inference for infinite data sources is not supported."
+ .to_string(),
+ ))
+ }
};
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options)
@@ -690,13 +704,19 @@ impl SessionContext {
let table_path = ListingTableUrl::parse(table_path)?;
let target_partitions = self.copied_config().target_partitions();
let listing_options = options.to_listing_options(target_partitions);
- let resolved_schema = match options.schema {
- Some(s) => Arc::new(s.to_owned()),
- None => {
+ let resolved_schema = match (options.schema, options.infinite) {
+ (Some(s), _) => Arc::new(s.to_owned()),
+ (None, false) => {
listing_options
.infer_schema(&self.state(), &table_path)
.await?
}
+ (None, true) => {
+ return Err(DataFusionError::Plan(
+ "Schema inference for infinite data sources is not supported."
+ .to_string(),
+ ))
+ }
};
let config = ListingTableConfig::new(table_path.clone())
.with_listing_options(listing_options)
@@ -767,9 +787,15 @@ impl SessionContext {
sql_definition: Option<String>,
) -> Result<()> {
let table_path = ListingTableUrl::parse(table_path)?;
- let resolved_schema = match provided_schema {
- None => options.infer_schema(&self.state(), &table_path).await?,
- Some(s) => s,
+ let resolved_schema = match (provided_schema, options.infinite_source) {
+ (Some(s), _) => s,
+ (None, false) => options.infer_schema(&self.state(), &table_path).await?,
+ (None, true) => {
+ return Err(DataFusionError::Plan(
+ "Schema inference for infinite data sources is not supported."
+ .to_string(),
+ ))
+ }
};
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
@@ -817,7 +843,7 @@ impl SessionContext {
name,
table_path,
listing_options,
- options.schema,
+ options.schema.map(|s| Arc::new(s.to_owned())),
None,
)
.await?;
@@ -854,7 +880,7 @@ impl SessionContext {
name,
table_path,
listing_options,
- options.schema,
+ options.schema.map(|s| Arc::new(s.to_owned())),
None,
)
.await?;
@@ -1562,6 +1588,12 @@ impl SessionState {
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should be run before BasicEnforcement
physical_optimizers.push(Arc::new(JoinSelection::new()));
+ // If the query is processing infinite inputs, the PipelineFixer rule applies the
+ // necessary transformations to make the query runnable (if it is not already runnable).
+ // If the query can not be made runnable, the rule emits an error with a diagnostic message.
+ // Since the transformations it applies may alter output partitioning properties of operators
+ // (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
+ physical_optimizers.push(Arc::new(PipelineFixer::new()));
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// Please make sure that the whole plan tree is determined.
@@ -1587,7 +1619,11 @@ impl SessionState {
.unwrap(),
)));
}
-
+ // The PipelineChecker rule will reject non-runnable query plans that use
+ // pipeline-breaking operators on infinite input(s). The rule generates a
+ // diagnostic error message when this happens. It makes no changes to the
+ // given query plan; i.e. it only acts as a final gatekeeping rule.
+ physical_optimizers.push(Arc::new(PipelineChecker::new()));
SessionState {
session_id,
optimizer: Optimizer::new(),
diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs
index 229eaa313..6ecf89e00 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -19,7 +19,7 @@
use std::sync::Arc;
-use arrow::datatypes::{DataType, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Schema};
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
@@ -63,6 +63,8 @@ pub struct CsvReadOptions<'a> {
pub table_partition_cols: Vec<(String, DataType)>,
/// File compression type
pub file_compression_type: FileCompressionType,
+ /// Flag indicating whether this file may be unbounded (as in a FIFO file).
+ pub infinite: bool,
}
impl<'a> Default for CsvReadOptions<'a> {
@@ -82,6 +84,7 @@ impl<'a> CsvReadOptions<'a> {
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
+ infinite: false,
}
}
@@ -91,6 +94,12 @@ impl<'a> CsvReadOptions<'a> {
self
}
+ /// Configure mark_infinite setting
+ pub fn mark_infinite(mut self, infinite: bool) -> Self {
+ self.infinite = infinite;
+ self
+ }
+
/// Specify delimiter to use for CSV read
pub fn delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
@@ -153,6 +162,9 @@ impl<'a> CsvReadOptions<'a> {
.with_file_extension(self.file_extension)
.with_target_partitions(target_partitions)
.with_table_partition_cols(self.table_partition_cols.clone())
+ // TODO: Add file sort order into CsvReadOptions and introduce here.
+ .with_file_sort_order(None)
+ .with_infinite_source(self.infinite)
}
}
@@ -237,13 +249,15 @@ impl<'a> ParquetReadOptions<'a> {
#[derive(Clone)]
pub struct AvroReadOptions<'a> {
/// The data source schema.
- pub schema: Option<SchemaRef>,
+ pub schema: Option<&'a Schema>,
/// File extension; only files with this extension are selected for data input.
/// Defaults to `FileType::AVRO.get_ext().as_str()`.
pub file_extension: &'a str,
/// Partition Columns
pub table_partition_cols: Vec<(String, DataType)>,
+ /// Flag indicating whether this file may be unbounded (as in a FIFO file).
+ pub infinite: bool,
}
impl<'a> Default for AvroReadOptions<'a> {
@@ -252,6 +266,7 @@ impl<'a> Default for AvroReadOptions<'a> {
schema: None,
file_extension: DEFAULT_AVRO_EXTENSION,
table_partition_cols: vec![],
+ infinite: false,
}
}
}
@@ -274,6 +289,19 @@ impl<'a> AvroReadOptions<'a> {
.with_file_extension(self.file_extension)
.with_target_partitions(target_partitions)
.with_table_partition_cols(self.table_partition_cols.clone())
+ .with_infinite_source(self.infinite)
+ }
+
+ /// Configure mark_infinite setting
+ pub fn mark_infinite(mut self, infinite: bool) -> Self {
+ self.infinite = infinite;
+ self
+ }
+
+ /// Specify schema to use for AVRO read
+ pub fn schema(mut self, schema: &'a Schema) -> Self {
+ self.schema = Some(schema);
+ self
}
}
@@ -286,7 +314,7 @@ impl<'a> AvroReadOptions<'a> {
#[derive(Clone)]
pub struct NdJsonReadOptions<'a> {
/// The data source schema.
- pub schema: Option<SchemaRef>,
+ pub schema: Option<&'a Schema>,
/// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data input.
@@ -296,6 +324,8 @@ pub struct NdJsonReadOptions<'a> {
pub table_partition_cols: Vec<(String, DataType)>,
/// File compression type
pub file_compression_type: FileCompressionType,
+ /// Flag indicating whether this file may be unbounded (as in a FIFO file).
+ pub infinite: bool,
}
impl<'a> Default for NdJsonReadOptions<'a> {
@@ -306,6 +336,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
file_extension: DEFAULT_JSON_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
+ infinite: false,
}
}
}
@@ -326,6 +357,12 @@ impl<'a> NdJsonReadOptions<'a> {
self
}
+ /// Configure mark_infinite setting
+ pub fn mark_infinite(mut self, infinite: bool) -> Self {
+ self.infinite = infinite;
+ self
+ }
+
/// Specify file_compression_type
pub fn file_compression_type(
mut self,
@@ -335,6 +372,12 @@ impl<'a> NdJsonReadOptions<'a> {
self
}
+ /// Specify schema to use for NdJson read
+ pub fn schema(mut self, schema: &'a Schema) -> Self {
+ self.schema = Some(schema);
+ self
+ }
+
/// Helper to convert these user facing options to `ListingTable` options
pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
let file_format = JsonFormat::default()
@@ -344,5 +387,6 @@ impl<'a> NdJsonReadOptions<'a> {
.with_file_extension(self.file_extension)
.with_target_partitions(target_partitions)
.with_table_partition_cols(self.table_partition_cols.clone())
+ .with_infinite_source(self.infinite)
}
}
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 59e7a0190..f017d0e0a 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -1021,6 +1021,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering,
+ infinite_source: false,
},
None,
None,
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index 69e9e0f4d..c479e2c9a 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -89,21 +89,23 @@ fn supports_collect_by_size(
false
}
}
-
-fn supports_swap(join_type: JoinType) -> bool {
- match join_type {
+/// Predicate that checks whether the given join type supports input swapping.
+pub fn supports_swap(join_type: JoinType) -> bool {
+ matches!(
+ join_type,
JoinType::Inner
- | JoinType::Left
- | JoinType::Right
- | JoinType::Full
- | JoinType::LeftSemi
- | JoinType::RightSemi
- | JoinType::LeftAnti
- | JoinType::RightAnti => true,
- }
+ | JoinType::Left
+ | JoinType::Right
+ | JoinType::Full
+ | JoinType::LeftSemi
+ | JoinType::RightSemi
+ | JoinType::LeftAnti
+ | JoinType::RightAnti
+ )
}
-
-fn swap_join_type(join_type: JoinType) -> JoinType {
+/// This function returns the new join type we get after swapping the given
+/// join's inputs.
+pub fn swap_join_type(join_type: JoinType) -> JoinType {
match join_type {
JoinType::Inner => JoinType::Inner,
JoinType::Full => JoinType::Full,
@@ -116,12 +118,13 @@ fn swap_join_type(join_type: JoinType) -> JoinType {
}
}
-fn swap_hash_join(
+/// This function swaps the inputs of the given join operator.
+pub fn swap_hash_join(
hash_join: &HashJoinExec,
partition_mode: PartitionMode,
- left: &Arc<dyn ExecutionPlan>,
- right: &Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let left = hash_join.left();
+ let right = hash_join.right();
let new_join = HashJoinExec::try_new(
Arc::clone(right),
Arc::clone(left),
@@ -153,12 +156,11 @@ fn swap_hash_join(
}
}
-/// When the order of the join is changed by the optimizer,
-/// the columns in the output should not be impacted.
-/// This helper creates the expressions that will allow to swap
-/// back the values from the original left as first columns and
-/// those on the right next
-fn swap_reverting_projection(
+/// When the order of the join is changed by the optimizer, the columns in
+/// the output should not be impacted. This function creates the expressions
+/// that will allow to swap back the values from the original left as the first
+/// columns and those on the right next.
+pub fn swap_reverting_projection(
left_schema: &Schema,
right_schema: &Schema,
) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
@@ -241,8 +243,6 @@ impl PhysicalOptimizerRule for JoinSelection {
Ok(Some(swap_hash_join(
hash_join,
PartitionMode::Partitioned,
- left,
- right,
)?))
} else {
Ok(None)
@@ -320,12 +320,7 @@ fn try_collect_left(
if should_swap_join_order(&**left, &**right)
&& supports_swap(*hash_join.join_type())
{
- Ok(Some(swap_hash_join(
- hash_join,
- PartitionMode::CollectLeft,
- left,
- right,
- )?))
+ Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?))
} else {
Ok(Some(Arc::new(HashJoinExec::try_new(
Arc::clone(left),
@@ -349,12 +344,7 @@ fn try_collect_left(
)?))),
(false, true) => {
if supports_swap(*hash_join.join_type()) {
- Ok(Some(swap_hash_join(
- hash_join,
- PartitionMode::CollectLeft,
- left,
- right,
- )?))
+ Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?))
} else {
Ok(None)
}
@@ -368,7 +358,7 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPl
let right = hash_join.right();
if should_swap_join_order(&**left, &**right) && supports_swap(*hash_join.join_type())
{
- swap_hash_join(hash_join, PartitionMode::Partitioned, left, right)
+ swap_hash_join(hash_join, PartitionMode::Partitioned)
} else {
Ok(Arc::new(HashJoinExec::try_new(
Arc::clone(left),
diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs
index 86ec6f846..fb07d54b9 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -25,8 +25,13 @@ pub mod global_sort_selection;
pub mod join_selection;
pub mod optimize_sorts;
pub mod optimizer;
+pub mod pipeline_checker;
pub mod pruning;
pub mod repartition;
mod utils;
+pub mod pipeline_fixer;
+#[cfg(test)]
+pub mod test_utils;
+
pub use optimizer::PhysicalOptimizerRule;
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
new file mode 100644
index 000000000..c35ef29f2
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -0,0 +1,389 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The [PipelineChecker] rule ensures that a given plan can accommodate its
+//! infinite sources, if there are any. It will reject non-runnable query plans
+//! that use pipeline-breaking operators on infinite input(s).
+//!
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use std::sync::Arc;
+
+/// The PipelineChecker rule rejects non-runnable query plans that use
+/// pipeline-breaking operators on infinite input(s).
+#[derive(Default)]
+pub struct PipelineChecker {}
+
+impl PipelineChecker {
+ #[allow(missing_docs)]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl PhysicalOptimizerRule for PipelineChecker {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let pipeline = PipelineStatePropagator::new(plan);
+ let state = pipeline.transform_up(&check_finiteness_requirements)?;
+ Ok(state.plan)
+ }
+
+ fn name(&self) -> &str {
+ "PipelineChecker"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+}
+
+/// [PipelineStatePropagator] propagates the [ExecutionPlan] pipelining information.
+#[derive(Clone, Debug)]
+pub struct PipelineStatePropagator {
+ pub(crate) plan: Arc<dyn ExecutionPlan>,
+ pub(crate) unbounded: bool,
+ pub(crate) children_unbounded: Vec<bool>,
+}
+
+impl PipelineStatePropagator {
+ /// Constructs a new, default pipelining state.
+ pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ let length = plan.children().len();
+ PipelineStatePropagator {
+ plan,
+ unbounded: false,
+ children_unbounded: vec![false; length],
+ }
+ }
+}
+
+impl TreeNodeRewritable for PipelineStatePropagator {
+ fn map_children<F>(self, transform: F) -> Result<Self>
+ where
+ F: FnMut(Self) -> Result<Self>,
+ {
+ let children = self.plan.children();
+ if !children.is_empty() {
+ let new_children = children
+ .into_iter()
+ .map(|child| PipelineStatePropagator::new(child))
+ .map(transform)
+ .collect::<Result<Vec<_>>>()?;
+ let children_unbounded = new_children
+ .iter()
+ .map(|c| c.unbounded)
+ .collect::<Vec<bool>>();
+ let children_plans = new_children
+ .into_iter()
+ .map(|child| child.plan)
+ .collect::<Vec<_>>();
+ Ok(PipelineStatePropagator {
+ plan: with_new_children_if_necessary(self.plan, children_plans)?,
+ unbounded: self.unbounded,
+ children_unbounded,
+ })
+ } else {
+ Ok(self)
+ }
+ }
+}
+
+/// This function propagates finiteness information and rejects any plan with
+/// pipeline-breaking operators acting on infinite inputs.
+pub fn check_finiteness_requirements(
+ input: PipelineStatePropagator,
+) -> Result<Option<PipelineStatePropagator>> {
+ let plan = input.plan;
+ let children = input.children_unbounded;
+ plan.unbounded_output(&children).map(|value| {
+ Some(PipelineStatePropagator {
+ plan,
+ unbounded: value,
+ children_unbounded: children,
+ })
+ })
+}
+
+#[cfg(test)]
+mod sql_tests {
+ use super::*;
+ use crate::physical_optimizer::test_utils::{
+ BinaryTestCase, QueryCase, SourceType, UnaryTestCase,
+ };
+
+ #[tokio::test]
+ async fn test_hash_left_join_swap() -> Result<()> {
+ let test1 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Bounded),
+ expect_fail: false,
+ };
+ let test2 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Unbounded),
+ expect_fail: true,
+ };
+ let test3 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Unbounded),
+ expect_fail: true,
+ };
+ let test4 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Bounded),
+ expect_fail: false,
+ };
+ let case = QueryCase {
+ sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1"
+ .to_string(),
+ cases: vec![
+ Arc::new(test1),
+ Arc::new(test2),
+ Arc::new(test3),
+ Arc::new(test4),
+ ],
+ error_operator: "Join Error".to_string(),
+ };
+
+ case.run().await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_hash_right_join_swap() -> Result<()> {
+ let test1 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Bounded),
+ expect_fail: true,
+ };
+ let test2 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Unbounded),
+ expect_fail: true,
+ };
+ let test3 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Unbounded),
+ expect_fail: false,
+ };
+ let test4 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Bounded),
+ expect_fail: false,
+ };
+ let case = QueryCase {
+ sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1"
+ .to_string(),
+ cases: vec![
+ Arc::new(test1),
+ Arc::new(test2),
+ Arc::new(test3),
+ Arc::new(test4),
+ ],
+ error_operator: "Join Error".to_string(),
+ };
+
+ case.run().await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_hash_inner_join_swap() -> Result<()> {
+ let test1 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Bounded),
+ expect_fail: false,
+ };
+ let test2 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Unbounded),
+ expect_fail: true,
+ };
+ let test3 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Unbounded),
+ expect_fail: false,
+ };
+ let test4 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Bounded),
+ expect_fail: false,
+ };
+ let case = QueryCase {
+ sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1"
+ .to_string(),
+ cases: vec![
+ Arc::new(test1),
+ Arc::new(test2),
+ Arc::new(test3),
+ Arc::new(test4),
+ ],
+ error_operator: "Join Error".to_string(),
+ };
+
+ case.run().await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_hash_full_outer_join_swap() -> Result<()> {
+ let test1 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Bounded),
+ expect_fail: true,
+ };
+ let test2 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Unbounded),
+ expect_fail: true,
+ };
+ let test3 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Unbounded),
+ expect_fail: true,
+ };
+ let test4 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Bounded),
+ expect_fail: false,
+ };
+ let case = QueryCase {
+ sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1"
+ .to_string(),
+ cases: vec![
+ Arc::new(test1),
+ Arc::new(test2),
+ Arc::new(test3),
+ Arc::new(test4),
+ ],
+ error_operator: "Join Error".to_string(),
+ };
+
+ case.run().await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_aggregate() -> Result<()> {
+ let test1 = UnaryTestCase {
+ source_type: SourceType::Bounded,
+ expect_fail: false,
+ };
+ let test2 = UnaryTestCase {
+ source_type: SourceType::Unbounded,
+ expect_fail: true,
+ };
+ let case = QueryCase {
+ sql: "SELECT c1, MIN(c4) FROM test GROUP BY c1".to_string(),
+ cases: vec![Arc::new(test1), Arc::new(test2)],
+ error_operator: "Aggregate Error".to_string(),
+ };
+
+ case.run().await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_window_agg_hash_partition() -> Result<()> {
+ let test1 = UnaryTestCase {
+ source_type: SourceType::Bounded,
+ expect_fail: false,
+ };
+ let test2 = UnaryTestCase {
+ source_type: SourceType::Unbounded,
+ expect_fail: true,
+ };
+ let case = QueryCase {
+ sql: "SELECT
+ c9,
+ SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
+ FROM test
+ LIMIT 5".to_string(),
+ cases: vec![Arc::new(test1), Arc::new(test2)],
+ error_operator: "Window Error".to_string()
+ };
+
+ case.run().await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_window_agg_single_partition() -> Result<()> {
+ let test1 = UnaryTestCase {
+ source_type: SourceType::Bounded,
+ expect_fail: false,
+ };
+ let test2 = UnaryTestCase {
+ source_type: SourceType::Unbounded,
+ expect_fail: true,
+ };
+ let case = QueryCase {
+ sql: "SELECT
+ c9,
+ SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
+ FROM test".to_string(),
+ cases: vec![Arc::new(test1), Arc::new(test2)],
+ error_operator: "Window Error".to_string()
+ };
+ case.run().await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_hash_cross_join() -> Result<()> {
+ let test1 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Bounded),
+ expect_fail: true,
+ };
+ let test2 = BinaryTestCase {
+ source_types: (SourceType::Unbounded, SourceType::Unbounded),
+ expect_fail: true,
+ };
+ let test3 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Unbounded),
+ expect_fail: true,
+ };
+ let test4 = BinaryTestCase {
+ source_types: (SourceType::Bounded, SourceType::Bounded),
+ expect_fail: false,
+ };
+ let case = QueryCase {
+ sql: "SELECT t2.c1 FROM left as t1 CROSS JOIN right as t2".to_string(),
+ cases: vec![
+ Arc::new(test1),
+ Arc::new(test2),
+ Arc::new(test3),
+ Arc::new(test4),
+ ],
+ error_operator: "Cross Join Error".to_string(),
+ };
+
+ case.run().await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_analyzer() -> Result<()> {
+ let test1 = UnaryTestCase {
+ source_type: SourceType::Bounded,
+ expect_fail: false,
+ };
+ let test2 = UnaryTestCase {
+ source_type: SourceType::Unbounded,
+ expect_fail: true,
+ };
+ let case = QueryCase {
+ sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(),
+ cases: vec![Arc::new(test1), Arc::new(test2)],
+ error_operator: "Analyze Error".to_string(),
+ };
+
+ case.run().await?;
+ Ok(())
+ }
+}
diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
new file mode 100644
index 000000000..92cb45d56
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
@@ -0,0 +1,623 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The [PipelineFixer] rule tries to modify a given plan so that it can
+//! accommodate its infinite sources, if there are any. In other words,
+//! it tries to obtain a runnable query (with the given infinite sources)
+//! from an non-runnable query by transforming pipeline-breaking operations
+//! to pipeline-friendly ones. If this can not be done, the rule emits a
+//! diagnostic error message.
+//!
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::physical_optimizer::join_selection::swap_hash_join;
+use crate::physical_optimizer::pipeline_checker::{
+ check_finiteness_requirements, PipelineStatePropagator,
+};
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::joins::{HashJoinExec, PartitionMode};
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::ExecutionPlan;
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::JoinType;
+use std::sync::Arc;
+
+/// The [PipelineFixer] rule tries to modify a given plan so that it can
+/// accommodate its infinite sources, if there are any. If this is not
+/// possible, the rule emits a diagnostic error message.
+#[derive(Default)]
+pub struct PipelineFixer {}
+
+impl PipelineFixer {
+ #[allow(missing_docs)]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+type PipelineFixerSubrule =
+ dyn Fn(&PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;
+impl PhysicalOptimizerRule for PipelineFixer {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ _config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let pipeline = PipelineStatePropagator::new(plan);
+ let physical_optimizer_subrules: Vec<Box<PipelineFixerSubrule>> =
+ vec![Box::new(hash_join_swap_subrule)];
+ let state = pipeline.transform_up(&|p| {
+ apply_subrules_and_check_finiteness_requirements(
+ p,
+ &physical_optimizer_subrules,
+ )
+ })?;
+ Ok(state.plan)
+ }
+
+ fn name(&self) -> &str {
+ "PipelineFixer"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+}
+
+/// This subrule will swap build/probe sides of a hash join depending on whether its inputs
+/// may produce an infinite stream of records. The rule ensures that the left (build) side
+/// of the hash join always operates on an input stream that will produce a finite set of.
+/// records If the left side can not be chosen to be "finite", the order stays the
+/// same as the original query.
+/// ```text
+/// For example, this rule makes the following transformation:
+///
+///
+///
+/// +--------------+ +--------------+
+/// | | unbounded | |
+/// Left | Infinite | true | Hash |\true
+/// | Data source |--------------| Repartition | \ +--------------+ +--------------+
+/// | | | | \ | | | |
+/// +--------------+ +--------------+ - | Hash Join |-------| Projection |
+/// - | | | |
+/// +--------------+ +--------------+ / +--------------+ +--------------+
+/// | | unbounded | | /
+/// Right | Finite | false | Hash |/false
+/// | Data Source |--------------| Repartition |
+/// | | | |
+/// +--------------+ +--------------+
+///
+///
+///
+/// +--------------+ +--------------+
+/// | | unbounded | |
+/// Left | Finite | false | Hash |\false
+/// | Data source |--------------| Repartition | \ +--------------+ +--------------+
+/// | | | | \ | | true | | true
+/// +--------------+ +--------------+ - | Hash Join |-------| Projection |-----
+/// - | | | |
+/// +--------------+ +--------------+ / +--------------+ +--------------+
+/// | | unbounded | | /
+/// Right | Infinite | true | Hash |/true
+/// | Data Source |--------------| Repartition |
+/// | | | |
+/// +--------------+ +--------------+
+///
+/// ```
+fn hash_join_swap_subrule(
+ input: &PipelineStatePropagator,
+) -> Option<Result<PipelineStatePropagator>> {
+ let plan = input.plan.clone();
+ let children = &input.children_unbounded;
+ if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+ let (left_unbounded, right_unbounded) = (children[0], children[1]);
+ let new_plan = if left_unbounded && !right_unbounded {
+ if matches!(
+ *hash_join.join_type(),
+ JoinType::Inner
+ | JoinType::Left
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti
+ ) {
+ swap(hash_join)
+ } else {
+ Ok(plan)
+ }
+ } else {
+ Ok(plan)
+ };
+ let new_state = new_plan.map(|plan| PipelineStatePropagator {
+ plan,
+ unbounded: left_unbounded || right_unbounded,
+ children_unbounded: vec![left_unbounded, right_unbounded],
+ });
+ Some(new_state)
+ } else {
+ None
+ }
+}
+
+/// This function swaps sides of a hash join to make it runnable even if one of its
+/// inputs are infinite. Note that this is not always possible; i.e. [JoinType::Full],
+/// [JoinType::Right], [JoinType::RightAnti] and [JoinType::RightSemi] can not run with
+/// an unbounded left side, even if we swap. Therefore, we do not consider them here.
+fn swap(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
+ let partition_mode = hash_join.partition_mode();
+ let join_type = hash_join.join_type();
+ match (*partition_mode, *join_type) {
+ (
+ _,
+ JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full,
+ ) => Err(DataFusionError::Internal(format!(
+ "{} join cannot be swapped for unbounded input.",
+ join_type
+ ))),
+ (PartitionMode::Partitioned, _) => {
+ swap_hash_join(hash_join, PartitionMode::Partitioned)
+ }
+ (PartitionMode::CollectLeft, _) => {
+ swap_hash_join(hash_join, PartitionMode::CollectLeft)
+ }
+ (PartitionMode::Auto, _) => Err(DataFusionError::Internal(
+ "Auto is not acceptable for unbounded input here.".to_string(),
+ )),
+ }
+}
+
+fn apply_subrules_and_check_finiteness_requirements(
+ mut input: PipelineStatePropagator,
+ physical_optimizer_subrules: &Vec<Box<PipelineFixerSubrule>>,
+) -> Result<Option<PipelineStatePropagator>> {
+ for sub_rule in physical_optimizer_subrules {
+ if let Some(value) = sub_rule(&input).transpose()? {
+ input = value;
+ }
+ }
+ check_finiteness_requirements(input)
+}
+
+#[cfg(test)]
+mod hash_join_tests {
+ use super::*;
+ use crate::physical_optimizer::join_selection::swap_join_type;
+ use crate::physical_optimizer::test_utils::SourceType;
+ use crate::physical_plan::expressions::Column;
+ use crate::physical_plan::projection::ProjectionExec;
+ use crate::{physical_plan::joins::PartitionMode, test::exec::UnboundedExec};
+ use arrow::datatypes::{DataType, Field, Schema};
+ use std::sync::Arc;
+
+ struct TestCase {
+ case: String,
+ initial_sources_unbounded: (SourceType, SourceType),
+ initial_join_type: JoinType,
+ initial_mode: PartitionMode,
+ expected_sources_unbounded: (SourceType, SourceType),
+ expected_join_type: JoinType,
+ expected_mode: PartitionMode,
+ expecting_swap: bool,
+ }
+
+ #[tokio::test]
+ async fn test_join_with_swap_full() -> Result<()> {
+ // NOTE: Currently, some initial conditions are not viable after join order selection.
+ // For example, full join always comes in partitioned mode. See the warning in
+ // function "swap". If this changes in the future, we should update these tests.
+ let cases = vec![
+ TestCase {
+ case: "Bounded - Unbounded 1".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ initial_join_type: JoinType::Full,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ expected_join_type: JoinType::Full,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ },
+ TestCase {
+ case: "Unbounded - Bounded 2".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+ initial_join_type: JoinType::Full,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+ expected_join_type: JoinType::Full,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ },
+ TestCase {
+ case: "Bounded - Bounded 3".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ initial_join_type: JoinType::Full,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ expected_join_type: JoinType::Full,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ },
+ TestCase {
+ case: "Unbounded - Unbounded 4".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+ initial_join_type: JoinType::Full,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: JoinType::Full,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ },
+ ];
+ for case in cases.into_iter() {
+ test_join_with_maybe_swap_unbounded_case(case).await?
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_cases_without_collect_left_check() -> Result<()> {
+ let mut cases = vec![];
+ let join_types = vec![JoinType::LeftSemi, JoinType::Inner];
+ for join_type in join_types {
+ cases.push(TestCase {
+ case: "Unbounded - Bounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ expected_join_type: swap_join_type(join_type),
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: true,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Unbounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Bounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Bounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ expected_join_type: swap_join_type(join_type),
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: true,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Unbounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Bounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ }
+
+ for case in cases.into_iter() {
+ test_join_with_maybe_swap_unbounded_case(case).await?
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_not_support_collect_left() -> Result<()> {
+ let mut cases = vec![];
+ // After [JoinSelection] optimization, these join types cannot run in CollectLeft mode except
+ // [JoinType::LeftSemi]
+ let the_ones_not_support_collect_left = vec![JoinType::Left, JoinType::LeftAnti];
+ for join_type in the_ones_not_support_collect_left {
+ cases.push(TestCase {
+ case: "Unbounded - Bounded".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ expected_join_type: swap_join_type(join_type),
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: true,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Unbounded".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Bounded".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ }
+
+ for case in cases.into_iter() {
+ test_join_with_maybe_swap_unbounded_case(case).await?
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_not_supporting_swaps_possible_collect_left() -> Result<()> {
+ let mut cases = vec![];
+ let the_ones_not_support_collect_left =
+ vec![JoinType::Right, JoinType::RightAnti, JoinType::RightSemi];
+ for join_type in the_ones_not_support_collect_left {
+ // We expect that (SourceType::Unbounded, SourceType::Bounded) will change, regardless of the
+ // statistics.
+ cases.push(TestCase {
+ case: "Unbounded - Bounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ // We expect that (SourceType::Bounded, SourceType::Unbounded) will stay same, regardless of the
+ // statistics.
+ cases.push(TestCase {
+ case: "Bounded - Unbounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ //
+ cases.push(TestCase {
+ case: "Bounded - Bounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ // If cases are partitioned, only unbounded & bounded check will affect the order.
+ cases.push(TestCase {
+ case: "Unbounded - Bounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Unbounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Bounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ }
+
+ for case in cases.into_iter() {
+ test_join_with_maybe_swap_unbounded_case(case).await?
+ }
+ Ok(())
+ }
+ #[allow(clippy::vtable_address_comparisons)]
+ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> {
+ let left_unbounded = t.initial_sources_unbounded.0 == SourceType::Unbounded;
+ let right_unbounded = t.initial_sources_unbounded.1 == SourceType::Unbounded;
+ let left_exec = Arc::new(UnboundedExec::new(
+ left_unbounded,
+ Schema::new(vec![Field::new("a", DataType::Int32, false)]),
+ )) as Arc<dyn ExecutionPlan>;
+ let right_exec = Arc::new(UnboundedExec::new(
+ right_unbounded,
+ Schema::new(vec![Field::new("b", DataType::Int32, false)]),
+ )) as Arc<dyn ExecutionPlan>;
+
+ let join = HashJoinExec::try_new(
+ Arc::clone(&left_exec),
+ Arc::clone(&right_exec),
+ vec![(
+ Column::new_with_schema("a", &left_exec.schema())?,
+ Column::new_with_schema("b", &right_exec.schema())?,
+ )],
+ None,
+ &t.initial_join_type,
+ t.initial_mode,
+ &false,
+ )?;
+
+ let initial_hash_join_state = PipelineStatePropagator {
+ plan: Arc::new(join),
+ unbounded: false,
+ children_unbounded: vec![left_unbounded, right_unbounded],
+ };
+ let optimized_hash_join =
+ hash_join_swap_subrule(&initial_hash_join_state).unwrap()?;
+ let optimized_join_plan = optimized_hash_join.plan;
+
+ // If swap did happen
+ let projection_added = optimized_join_plan.as_any().is::<ProjectionExec>();
+ let plan = if projection_added {
+ let proj = optimized_join_plan
+ .as_any()
+ .downcast_ref::<ProjectionExec>()
+ .expect(
+ "A proj is required to swap columns back to their original order",
+ );
+ proj.input().clone()
+ } else {
+ optimized_join_plan
+ };
+
+ if let Some(HashJoinExec {
+ left,
+ right,
+ join_type,
+ mode,
+ ..
+ }) = plan.as_any().downcast_ref::<HashJoinExec>()
+ {
+ let left_changed = Arc::ptr_eq(left, &right_exec);
+ let right_changed = Arc::ptr_eq(right, &left_exec);
+ // If this is not equal, we have a bigger problem.
+ assert_eq!(left_changed, right_changed);
+ assert_eq!(
+ (
+ t.case.as_str(),
+ if left.unbounded_output(&[])? {
+ SourceType::Unbounded
+ } else {
+ SourceType::Bounded
+ },
+ if right.unbounded_output(&[])? {
+ SourceType::Unbounded
+ } else {
+ SourceType::Bounded
+ },
+ join_type,
+ mode,
+ left_changed && right_changed
+ ),
+ (
+ t.case.as_str(),
+ t.expected_sources_unbounded.0,
+ t.expected_sources_unbounded.1,
+ &t.expected_join_type,
+ &t.expected_mode,
+ t.expecting_swap
+ )
+ );
+ };
+ Ok(())
+ }
+}
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 66359ebf6..5bdbf59a6 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -269,6 +269,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
None,
None,
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs
new file mode 100644
index 000000000..ce6e252c7
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Collection of testing utility functions that are leveraged by the query optimizer rules
+
+use crate::error::Result;
+use crate::prelude::{CsvReadOptions, SessionContext};
+use async_trait::async_trait;
+use std::sync::Arc;
+
+async fn register_current_csv(
+ ctx: &SessionContext,
+ table_name: &str,
+ infinite: bool,
+) -> Result<()> {
+ let testdata = crate::test_util::arrow_test_data();
+ let schema = crate::test_util::aggr_test_schema();
+ ctx.register_csv(
+ table_name,
+ &format!("{}/csv/aggregate_test_100.csv", testdata),
+ CsvReadOptions::new()
+ .schema(&schema)
+ .mark_infinite(infinite),
+ )
+ .await?;
+ Ok(())
+}
+
+#[derive(Eq, PartialEq, Debug)]
+pub enum SourceType {
+ Unbounded,
+ Bounded,
+}
+
+#[async_trait]
+pub trait SqlTestCase {
+ async fn register_table(&self, ctx: &SessionContext) -> Result<()>;
+ fn expect_fail(&self) -> bool;
+}
+
+/// [UnaryTestCase] is designed for single input [ExecutionPlan]s.
+pub struct UnaryTestCase {
+ pub(crate) source_type: SourceType,
+ pub(crate) expect_fail: bool,
+}
+
+#[async_trait]
+impl SqlTestCase for UnaryTestCase {
+ async fn register_table(&self, ctx: &SessionContext) -> Result<()> {
+ let table_is_infinite = self.source_type == SourceType::Unbounded;
+ register_current_csv(ctx, "test", table_is_infinite).await?;
+ Ok(())
+ }
+
+ fn expect_fail(&self) -> bool {
+ self.expect_fail
+ }
+}
+/// [BinaryTestCase] is designed for binary input [ExecutionPlan]s.
+pub struct BinaryTestCase {
+ pub(crate) source_types: (SourceType, SourceType),
+ pub(crate) expect_fail: bool,
+}
+
+#[async_trait]
+impl SqlTestCase for BinaryTestCase {
+ async fn register_table(&self, ctx: &SessionContext) -> Result<()> {
+ let left_table_is_infinite = self.source_types.0 == SourceType::Unbounded;
+ let right_table_is_infinite = self.source_types.1 == SourceType::Unbounded;
+ register_current_csv(ctx, "left", left_table_is_infinite).await?;
+ register_current_csv(ctx, "right", right_table_is_infinite).await?;
+ Ok(())
+ }
+
+ fn expect_fail(&self) -> bool {
+ self.expect_fail
+ }
+}
+
+pub struct QueryCase {
+ pub(crate) sql: String,
+ pub(crate) cases: Vec<Arc<dyn SqlTestCase>>,
+ pub(crate) error_operator: String,
+}
+
+impl QueryCase {
+ /// Run the test cases
+ pub(crate) async fn run(&self) -> Result<()> {
+ for case in &self.cases {
+ let ctx = SessionContext::new();
+ case.register_table(&ctx).await?;
+ let error = if case.expect_fail() {
+ Some(&self.error_operator)
+ } else {
+ None
+ };
+ self.run_case(ctx, error).await?;
+ }
+ Ok(())
+ }
+ async fn run_case(&self, ctx: SessionContext, error: Option<&String>) -> Result<()> {
+ let dataframe = ctx.sql(self.sql.as_str()).await?;
+ let plan = dataframe.create_physical_plan().await;
+ if error.is_some() {
+ let plan_error = plan.unwrap_err();
+ let initial = error.unwrap().to_string();
+ assert!(plan_error.to_string().contains(initial.as_str()));
+ } else {
+ assert!(plan.is_ok())
+ }
+ Ok(())
+ }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 6d7c3c21b..f5591f2c3 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -30,7 +30,7 @@ use crate::physical_plan::{
use arrow::array::ArrayRef;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use datafusion_common::Result;
+use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
@@ -367,6 +367,19 @@ impl ExecutionPlan for AggregateExec {
}
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ if children[0] {
+ Err(DataFusionError::Plan(
+ "Aggregate Error: `GROUP BY` clause (including the more general GROUPING SET) is not supported for unbounded inputs.".to_string(),
+ ))
+ } else {
+ Ok(false)
+ }
+ }
+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index b0578bd48..7f1cfe57a 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -41,7 +41,7 @@ pub struct AnalyzeExec {
/// control how much extra to print
verbose: bool,
/// The input plan (the plan being analyzed)
- input: Arc<dyn ExecutionPlan>,
+ pub(crate) input: Arc<dyn ExecutionPlan>,
/// The output schema for RecordBatches of this exec node
schema: SchemaRef,
}
@@ -76,6 +76,20 @@ impl ExecutionPlan for AnalyzeExec {
vec![Distribution::SinglePartition]
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ if children[0] {
+ Err(DataFusionError::Plan(
+ "Analyze Error: Analysis is not supported for unbounded inputs"
+ .to_string(),
+ ))
+ } else {
+ Ok(false)
+ }
+ }
+
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index afa2a85ab..582edc103 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -96,6 +96,13 @@ impl ExecutionPlan for CoalesceBatchesExec {
self.input.output_partitioning()
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
// The coalesce batches operator does not make any changes to the sorting of its input
self.input.output_ordering()
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 816a9c940..d765c275a 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -80,6 +80,12 @@ impl ExecutionPlan for CoalescePartitionsExec {
vec![self.input.clone()]
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index ab522dc94..91d27c3e3 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -72,6 +72,10 @@ impl ExecutionPlan for AvroExec {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
+ fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
+ Ok(self.base_config().infinite_source)
+ }
+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
get_output_ordering(&self.base_config)
}
@@ -259,6 +263,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
let mut results = avro_exec
@@ -330,6 +335,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -404,6 +410,7 @@ mod tests {
partition_type_wrap(DataType::Utf8),
)],
output_ordering: None,
+ infinite_source: false,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index b759b7254..3afd844cf 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -109,6 +109,10 @@ impl ExecutionPlan for CsvExec {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
+ fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
+ Ok(self.base_config().infinite_source)
+ }
+
/// See comments on `impl ExecutionPlan for ParquetExec`: output order can't be
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
get_output_ordering(&self.base_config)
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 0481ca64d..3d09377bf 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -351,6 +351,7 @@ mod tests {
limit,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
};
let file_stream =
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index a4be015c5..14fe0128c 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -87,6 +87,10 @@ impl ExecutionPlan for NdJsonExec {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
+ fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
+ Ok(self.base_config.infinite_source)
+ }
+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
get_output_ordering(&self.base_config)
}
@@ -382,6 +386,7 @@ mod tests {
limit: Some(3),
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
file_compression_type.to_owned(),
);
@@ -456,6 +461,7 @@ mod tests {
limit: Some(3),
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
file_compression_type.to_owned(),
);
@@ -500,6 +506,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
file_compression_type.to_owned(),
);
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 6e9b6b8f8..de4b58c1e 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -104,6 +104,8 @@ pub struct FileScanConfig {
pub table_partition_cols: Vec<(String, DataType)>,
/// The order in which the data is sorted, if known.
pub output_ordering: Option<Vec<PhysicalSortExpr>>,
+ /// Indicates whether this plan may produce an infinite stream of records.
+ pub infinite_source: bool,
}
impl FileScanConfig {
@@ -809,6 +811,7 @@ mod tests {
statistics,
table_partition_cols,
output_ordering: None,
+ infinite_source: false,
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index a8d3002a9..0ad41d994 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -813,6 +813,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
predicate,
None,
@@ -1350,6 +1351,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
None,
None,
@@ -1439,6 +1441,7 @@ mod tests {
("day".to_owned(), partition_type_wrap(DataType::Utf8)),
],
output_ordering: None,
+ infinite_source: false,
},
None,
None,
@@ -1498,6 +1501,7 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
None,
None,
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 1dc634b77..758e31a28 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -108,6 +108,13 @@ impl ExecutionPlan for FilterExec {
self.input.output_partitioning()
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()
}
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 1b43b6096..82733cee7 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -35,6 +35,7 @@ use crate::physical_plan::{
};
use crate::{error::Result, scalar::ScalarValue};
use async_trait::async_trait;
+use datafusion_common::DataFusionError;
use log::debug;
use std::time::Instant;
@@ -143,6 +144,20 @@ impl ExecutionPlan for CrossJoinExec {
vec![self.left.clone(), self.right.clone()]
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ if children[0] || children[1] {
+ Err(DataFusionError::Plan(
+ "Cross Join Error: Cross join is not supported for the unbounded inputs."
+ .to_string(),
+ ))
+ } else {
+ Ok(false)
+ }
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index f15398018..abb5b5e29 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -309,6 +309,37 @@ impl ExecutionPlan for HashJoinExec {
}
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ let (left, right) = (children[0], children[1]);
+ // If left is unbounded, or right is unbounded with JoinType::Right,
+ // JoinType::Full, JoinType::RightAnti types.
+ let breaking = left
+ || (right
+ && matches!(
+ self.join_type,
+ JoinType::Left
+ | JoinType::Full
+ | JoinType::LeftAnti
+ | JoinType::LeftSemi
+ ));
+
+ if breaking {
+ Err(DataFusionError::Plan(format!(
+ "Join Error: The join with cannot be executed with unbounded inputs. {}",
+ if left && right {
+ "Currently, we do not support unbounded inputs on both sides."
+ } else {
+ "Please consider a different type of join or sources."
+ }
+ )))
+ } else {
+ Ok(left || right)
+ }
+ }
+
fn output_partitioning(&self) -> Partitioning {
let left_columns_len = self.left.schema().fields.len();
match self.mode {
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 07b4a6abb..3d6663fbd 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -110,6 +110,13 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+ Ok(false)
+ }
+
/// If the output of this operator within each partition is sorted,
/// returns `Some(keys)` with the description of how it was sorted.
///
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 3a2765e7f..bc8087c98 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -159,6 +159,13 @@ impl ExecutionPlan for ProjectionExec {
self.schema.clone()
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 3dc0c6d33..967281c1c 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -284,6 +284,13 @@ impl ExecutionPlan for RepartitionExec {
)?))
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
fn output_partitioning(&self) -> Partitioning {
self.partitioning.clone()
}
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 85eca5450..89b7d414f 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -627,7 +627,7 @@ fn read_spill(sender: Sender<ArrowResult<RecordBatch>>, path: &Path) -> Result<(
#[derive(Debug)]
pub struct SortExec {
/// Input schema
- input: Arc<dyn ExecutionPlan>,
+ pub(crate) input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
/// Containing all metrics set created during sort
@@ -704,6 +704,19 @@ impl ExecutionPlan for SortExec {
}
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ if children[0] {
+ Err(DataFusionError::Plan(
+ "Sort Error: Can not sort unbounded inputs.".to_string(),
+ ))
+ } else {
+ Ok(false)
+ }
+ }
+
fn required_input_distribution(&self) -> Vec<Distribution> {
if self.preserve_partitioning {
vec![Distribution::UnspecifiedDistribution]
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index af57c9ef9..3a92cd123 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -123,6 +123,13 @@ impl ExecutionPlan for UnionExec {
self.schema.clone()
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children.iter().any(|x| *x))
+ }
+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
self.inputs.clone()
}
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index d1ea0af69..23ec2d179 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -54,7 +54,7 @@ use std::task::{Context, Poll};
#[derive(Debug)]
pub struct WindowAggExec {
/// Input plan
- input: Arc<dyn ExecutionPlan>,
+ pub(crate) input: Arc<dyn ExecutionPlan>,
/// Window function expression
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
@@ -206,6 +206,20 @@ impl ExecutionPlan for WindowAggExec {
}
}
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but it its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ if children[0] {
+ Err(DataFusionError::Plan(
+ "Window Error: Windowing is not currently support for unbounded inputs."
+ .to_string(),
+ ))
+ } else {
+ Ok(false)
+ }
+ }
+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.output_ordering.as_deref()
}
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index e7ee746c9..f74b1eb31 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -509,6 +509,76 @@ impl ExecutionPlan for StatisticsExec {
}
}
+/// A mock execution plan that simply returns the provided data source characteristic
+#[derive(Debug, Clone)]
+pub struct UnboundedExec {
+ unbounded: bool,
+ schema: Arc<Schema>,
+}
+impl UnboundedExec {
+ pub fn new(unbounded: bool, schema: Schema) -> Self {
+ Self {
+ unbounded,
+ schema: Arc::new(schema),
+ }
+ }
+}
+impl ExecutionPlan for UnboundedExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(2)
+ }
+
+ fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+ Ok(self.unbounded)
+ }
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(self)
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ unimplemented!("This plan only serves for testing statistics")
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "UnboundableExec: unbounded={}", self.unbounded,)
+ }
+ }
+ }
+
+ fn statistics(&self) -> Statistics {
+ Statistics::default()
+ }
+}
+
/// Execution plan that emits streams that block forever.
///
/// This is useful to test shutdown / cancelation behavior of certain execution plans.
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index efa3fece1..cf7594c96 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -182,6 +182,7 @@ pub fn partitioned_csv_config(
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
})
}
diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs
new file mode 100644
index 000000000..677347594
--- /dev/null
+++ b/datafusion/core/tests/fifo.rs
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This test demonstrates the DataFusion FIFO capabilities.
+//!
+#[cfg(not(target_os = "windows"))]
+mod unix_test {
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::{
+ prelude::{CsvReadOptions, SessionConfig, SessionContext},
+ test_util::{aggr_test_schema, arrow_test_data},
+ };
+ use datafusion_common::{DataFusionError, Result};
+ use futures::StreamExt;
+ use itertools::enumerate;
+ use nix::sys::stat;
+ use nix::unistd;
+ use rstest::*;
+ use std::fs::{File, OpenOptions};
+ use std::io::Write;
+ use std::path::Path;
+ use std::path::PathBuf;
+ use std::sync::mpsc;
+ use std::sync::mpsc::{Receiver, Sender};
+ use std::sync::{Arc, Mutex};
+ use std::thread;
+ use std::time::{Duration, Instant};
+ use tempfile::TempDir;
+ // ! For the sake of the test, do not alter the numbers. !
+ // Session batch size
+ const TEST_BATCH_SIZE: usize = 20;
+ // Number of lines written to FIFO
+ const TEST_DATA_SIZE: usize = 20_000;
+ // Number of lines what can be joined. Each joinable key produced 20 lines with
+ // aggregate_test_100 dataset. We will use these joinable keys for understanding
+ // incremental execution.
+ const TEST_JOIN_RATIO: f64 = 0.01;
+
+ fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
+ let file_path = tmp_dir.path().join(file_name);
+ // Simulate an infinite environment via a FIFO file
+ if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
+ Err(DataFusionError::Execution(e.to_string()))
+ } else {
+ Ok(file_path)
+ }
+ }
+
+ fn write_to_fifo(
+ mut file: &File,
+ line: &str,
+ ref_time: Instant,
+ broken_pipe_timeout: Duration,
+ ) -> Result<usize> {
+ // We need to handle broken pipe error until the reader is ready. This
+ // is why we use a timeout to limit the wait duration for the reader.
+ // If the error is different than broken pipe, we fail immediately.
+ file.write(line.as_bytes()).or_else(|e| {
+ if e.raw_os_error().unwrap() == 32 {
+ let interval = Instant::now().duration_since(ref_time);
+ if interval < broken_pipe_timeout {
+ thread::sleep(Duration::from_millis(100));
+ return Ok(0);
+ }
+ }
+ Err(DataFusionError::Execution(e.to_string()))
+ })
+ }
+
+ async fn create_ctx(
+ fifo_path: &Path,
+ with_unbounded_execution: bool,
+ ) -> Result<SessionContext> {
+ let config = SessionConfig::new()
+ .with_batch_size(TEST_BATCH_SIZE)
+ .set_u64(
+ "datafusion.execution.coalesce_target_batch_size",
+ TEST_BATCH_SIZE as u64,
+ );
+ let ctx = SessionContext::with_config(config);
+ // Register left table
+ let left_schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::Utf8, false),
+ Field::new("a2", DataType::UInt32, false),
+ ]));
+ ctx.register_csv(
+ "left",
+ fifo_path.as_os_str().to_str().unwrap(),
+ CsvReadOptions::new()
+ .schema(left_schema.as_ref())
+ .has_header(false)
+ .mark_infinite(with_unbounded_execution),
+ )
+ .await?;
+ // Register right table
+ let schema = aggr_test_schema();
+ let test_data = arrow_test_data();
+ ctx.register_csv(
+ "right",
+ &format!("{}/csv/aggregate_test_100.csv", test_data),
+ CsvReadOptions::new().schema(schema.as_ref()),
+ )
+ .await?;
+ Ok(ctx)
+ }
+
+ #[derive(Debug, PartialEq)]
+ enum Operation {
+ Read,
+ Write,
+ }
+
+ /// Checks if there is a [Operation::Read] between [Operation::Write]s.
+ /// This indicates we did not wait for the file to finish before processing it.
+ fn interleave(result: &[Operation]) -> bool {
+ let first_read = result.iter().position(|op| op == &Operation::Read);
+ let last_write = result.iter().rev().position(|op| op == &Operation::Write);
+ match (first_read, last_write) {
+ (Some(first_read), Some(last_write)) => {
+ result.len() - 1 - last_write > first_read
+ }
+ (_, _) => false,
+ }
+ }
+
+ // This test provides a relatively realistic end-to-end scenario where
+ // we ensure that we swap join sides correctly to accommodate a FIFO source.
+ #[rstest]
+ #[timeout(std::time::Duration::from_secs(30))]
+ #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
+ async fn unbounded_file_with_swapped_join(
+ #[values(true, false)] unbounded_file: bool,
+ ) -> Result<()> {
+ // To make unbounded deterministic
+ let waiting = Arc::new(Mutex::new(unbounded_file));
+ let waiting_thread = waiting.clone();
+ // Create a channel
+ let (tx, rx): (Sender<Operation>, Receiver<Operation>) = mpsc::channel();
+ // Create a new temporary FIFO file
+ let tmp_dir = TempDir::new()?;
+ let fifo_path = create_fifo_file(&tmp_dir, "fisrt_fifo.csv")?;
+ // Prevent move
+ let fifo_path_thread = fifo_path.clone();
+ // Timeout for a long period of BrokenPipe error
+ let broken_pipe_timeout = Duration::from_secs(5);
+ // The sender endpoint can be copied
+ let thread_tx = tx.clone();
+ // Spawn a new thread to write to the FIFO file
+ let fifo_writer = thread::spawn(move || {
+ let first_file = OpenOptions::new()
+ .write(true)
+ .open(fifo_path_thread)
+ .unwrap();
+ // Reference time to use when deciding to fail the test
+ let execution_start = Instant::now();
+ // Execution can calculated at least one RecordBatch after the number of
+ // "joinable_lines_length" lines are read.
+ let joinable_lines_length =
+ (TEST_DATA_SIZE as f64 * TEST_JOIN_RATIO).round() as usize;
+ // The row including "a" is joinable with aggregate_test_100.c1
+ let joinable_iterator = (0..joinable_lines_length).map(|_| "a".to_string());
+ let second_joinable_iterator =
+ (0..joinable_lines_length).map(|_| "a".to_string());
+ // The row including "zzz" is not joinable with aggregate_test_100.c1
+ let non_joinable_iterator =
+ (0..(TEST_DATA_SIZE - joinable_lines_length)).map(|_| "zzz".to_string());
+ let string_array = joinable_iterator
+ .chain(non_joinable_iterator)
+ .chain(second_joinable_iterator);
+ for (cnt, string_col) in enumerate(string_array) {
+ // Wait a reading sign for unbounded execution
+ // For unbounded execution:
+ // After joinable_lines_length FIFO reading, we MUST get a Operation::Read.
+ // For bounded execution:
+ // Never goes into while loop since waiting_thread initiated as false.
+ while *waiting_thread.lock().unwrap() && joinable_lines_length < cnt {
+ thread::sleep(Duration::from_millis(200));
+ }
+ // Each thread queues a message in the channel
+ if cnt % TEST_BATCH_SIZE == 0 {
+ thread_tx.send(Operation::Write).unwrap();
+ }
+ let line = format!("{},{}\n", string_col, cnt).to_owned();
+ write_to_fifo(&first_file, &line, execution_start, broken_pipe_timeout)
+ .unwrap();
+ }
+ });
+ // Collects operations from both writer and executor.
+ let result_collector = thread::spawn(move || {
+ let mut results = vec![];
+ while let Ok(res) = rx.recv() {
+ results.push(res);
+ }
+ results
+ });
+ // Create an execution case with bounded or unbounded flag.
+ let ctx = create_ctx(&fifo_path, unbounded_file).await?;
+ // Execute the query
+ let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1 JOIN right as t2 ON t1.a1 = t2.c1").await?;
+ let mut stream = df.execute_stream().await?;
+ while (stream.next().await).is_some() {
+ *waiting.lock().unwrap() = false;
+ tx.send(Operation::Read).unwrap();
+ }
+ fifo_writer.join().unwrap();
+ drop(tx);
+ let result = result_collector.join().unwrap();
+ assert_eq!(interleave(&result), unbounded_file);
+ Ok(())
+ }
+}
diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs
index d3cf06c34..f40258219 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -83,6 +83,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
None,
None,
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index e62124a33..4d2830ad8 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -69,6 +69,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
Some(filter),
None,
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 1f071febe..0652961b0 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -113,6 +113,7 @@ async fn get_exec(
limit,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
},
&[],
)
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 311ed46ff..01d8e31b3 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -377,6 +377,7 @@ pub fn parse_protobuf_file_scan_config(
limit: proto.limit.as_ref().map(|sl| sl.limit as usize),
table_partition_cols,
output_ordering,
+ infinite_source: false,
})
}
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index c122b5f43..bd3614df0 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1477,6 +1477,7 @@ mod roundtrip_tests {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
};
let predicate = datafusion::prelude::col("col").eq(datafusion::prelude::lit("1"));
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
index 5e8f15c0a..eb36b11df 100644
--- a/parquet-test-utils/src/lib.rs
+++ b/parquet-test-utils/src/lib.rs
@@ -144,6 +144,7 @@ impl TestParquetFile {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
+ infinite_source: false,
};
let df_schema = self.schema.clone().to_dfschema_ref()?;