You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/28 20:56:50 UTC

[arrow-datafusion] branch master updated: Support for executing infinite files and boundedness-aware join reordering rule (#4694)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new cd4fd80dd Support for executing infinite files and boundedness-aware join reordering rule (#4694)
cd4fd80dd is described below

commit cd4fd80dd5553ffdac41e21c2b7bfe0296f30695
Author: Metehan Yıldırım <10...@users.noreply.github.com>
AuthorDate: Wed Dec 28 23:56:45 2022 +0300

    Support for executing infinite files and boundedness-aware join reordering rule (#4694)
    
    * Executing infinite files with necessary optimization and file configurations
    
    * Future-proofing, more idiomatic expressions, better comments
    
    * Testing incremental execution
    
    * Simplify FIFO test code
    
    * Revamping unboundedness decision on rules and executors
    
    * Clippy resolutions
    
    * Change in some rules
    
    * Minor refactors and renamings
    
    * Cargo fmt changes
    
    * Resolving depreciation warnings
    
    * Splitting fixer and checker rule, before merge conflict
    
    * Clippy and comments
    
    * Update Cargo.lock
    
    * Improve/correct comments, simplify PipelineFixer code
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion-cli/Cargo.lock                          | 149 ++---
 datafusion/core/Cargo.toml                         |   4 +
 datafusion/core/src/datasource/file_format/mod.rs  |   1 +
 datafusion/core/src/datasource/listing/table.rs    | 154 ++++-
 datafusion/core/src/execution/context.rs           |  66 ++-
 datafusion/core/src/execution/options.rs           |  50 +-
 .../core/src/physical_optimizer/enforcement.rs     |   1 +
 .../core/src/physical_optimizer/join_selection.rs  |  64 +--
 datafusion/core/src/physical_optimizer/mod.rs      |   5 +
 .../src/physical_optimizer/pipeline_checker.rs     | 389 +++++++++++++
 .../core/src/physical_optimizer/pipeline_fixer.rs  | 623 +++++++++++++++++++++
 .../core/src/physical_optimizer/repartition.rs     |   1 +
 .../core/src/physical_optimizer/test_utils.rs      | 127 +++++
 .../core/src/physical_plan/aggregates/mod.rs       |  15 +-
 datafusion/core/src/physical_plan/analyze.rs       |  16 +-
 .../core/src/physical_plan/coalesce_batches.rs     |   7 +
 .../core/src/physical_plan/coalesce_partitions.rs  |   6 +
 .../core/src/physical_plan/file_format/avro.rs     |   7 +
 .../core/src/physical_plan/file_format/csv.rs      |   4 +
 .../src/physical_plan/file_format/file_stream.rs   |   1 +
 .../core/src/physical_plan/file_format/json.rs     |   7 +
 .../core/src/physical_plan/file_format/mod.rs      |   3 +
 .../core/src/physical_plan/file_format/parquet.rs  |   4 +
 datafusion/core/src/physical_plan/filter.rs        |   7 +
 .../core/src/physical_plan/joins/cross_join.rs     |  15 +
 .../core/src/physical_plan/joins/hash_join.rs      |  31 +
 datafusion/core/src/physical_plan/mod.rs           |   7 +
 datafusion/core/src/physical_plan/projection.rs    |   7 +
 datafusion/core/src/physical_plan/repartition.rs   |   7 +
 datafusion/core/src/physical_plan/sorts/sort.rs    |  15 +-
 datafusion/core/src/physical_plan/union.rs         |   7 +
 .../src/physical_plan/windows/window_agg_exec.rs   |  16 +-
 datafusion/core/src/test/exec.rs                   |  70 +++
 datafusion/core/src/test/mod.rs                    |   1 +
 datafusion/core/tests/fifo.rs                      | 225 ++++++++
 datafusion/core/tests/parquet/custom_reader.rs     |   1 +
 datafusion/core/tests/parquet/page_pruning.rs      |   1 +
 datafusion/core/tests/row.rs                       |   1 +
 datafusion/proto/src/physical_plan/from_proto.rs   |   1 +
 datafusion/proto/src/physical_plan/mod.rs          |   1 +
 parquet-test-utils/src/lib.rs                      |   1 +
 41 files changed, 1986 insertions(+), 132 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index cb6c543ab..baddb4226 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -263,9 +263,9 @@ dependencies = [
 
 [[package]]
 name = "async-trait"
-version = "0.1.59"
+version = "0.1.60"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364"
+checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -278,7 +278,7 @@ version = "0.2.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
 dependencies = [
- "hermit-abi",
+ "hermit-abi 0.1.19",
  "libc",
  "winapi",
 ]
@@ -303,9 +303,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
 
 [[package]]
 name = "blake2"
-version = "0.10.5"
+version = "0.10.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b12e5fd123190ce1c2e559308a94c9bacad77907d4c6005d9e58fe1a0689e55e"
+checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe"
 dependencies = [
  "digest",
 ]
@@ -407,9 +407,9 @@ dependencies = [
 
 [[package]]
 name = "cc"
-version = "1.0.77"
+version = "1.0.78"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e9f73505338f7d905b19d18738976aae232eb46b8efc15554ffc56deb5d9ebe4"
+checksum = "a20104e2335ce8a659d6dd92a51a767a0c062599c73b343fd152cb401e828c3d"
 dependencies = [
  "jobserver",
 ]
@@ -596,9 +596,9 @@ dependencies = [
 
 [[package]]
 name = "cxx"
-version = "1.0.83"
+version = "1.0.85"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bdf07d07d6531bfcdbe9b8b739b104610c6508dcc4d63b410585faf338241daf"
+checksum = "5add3fc1717409d029b20c5b6903fc0c0b02fa6741d820054f4a2efa5e5816fd"
 dependencies = [
  "cc",
  "cxxbridge-flags",
@@ -608,9 +608,9 @@ dependencies = [
 
 [[package]]
 name = "cxx-build"
-version = "1.0.83"
+version = "1.0.85"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d2eb5b96ecdc99f72657332953d4d9c50135af1bac34277801cc3937906ebd39"
+checksum = "b4c87959ba14bc6fbc61df77c3fcfe180fc32b93538c4f1031dd802ccb5f2ff0"
 dependencies = [
  "cc",
  "codespan-reporting",
@@ -623,15 +623,15 @@ dependencies = [
 
 [[package]]
 name = "cxxbridge-flags"
-version = "1.0.83"
+version = "1.0.85"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ac040a39517fd1674e0f32177648334b0f4074625b5588a64519804ba0553b12"
+checksum = "69a3e162fde4e594ed2b07d0f83c6c67b745e7f28ce58c6df5e6b6bef99dfb59"
 
 [[package]]
 name = "cxxbridge-macro"
-version = "1.0.83"
+version = "1.0.85"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1362b0ddcfc4eb0a1f57b68bd77dd99f0e826958a96abd0ae9bd092e114ffed6"
+checksum = "3e7e2adeb6a0d4a282e581096b06e1791532b7d576dcde5ccd9382acf55db8e6"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1151,6 +1151,15 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "hermit-abi"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "http"
 version = "0.2.8"
@@ -1159,7 +1168,7 @@ checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
 dependencies = [
  "bytes",
  "fnv",
- "itoa 1.0.4",
+ "itoa 1.0.5",
 ]
 
 [[package]]
@@ -1206,7 +1215,7 @@ dependencies = [
  "http-body",
  "httparse",
  "httpdate",
- "itoa 1.0.4",
+ "itoa 1.0.5",
  "pin-project-lite",
  "socket2",
  "tokio",
@@ -1299,9 +1308,9 @@ dependencies = [
 
 [[package]]
 name = "ipnet"
-version = "2.6.0"
+version = "2.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec947b7a4ce12e3b87e353abae7ce124d025b6c7d6c5aea5cc0bcf92e9510ded"
+checksum = "11b0d96e660696543b251e58030cf9787df56da39dab19ad60eae7353040917e"
 
 [[package]]
 name = "itertools"
@@ -1320,9 +1329,9 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
 
 [[package]]
 name = "itoa"
-version = "1.0.4"
+version = "1.0.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
+checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
 
 [[package]]
 name = "jobserver"
@@ -1414,9 +1423,9 @@ dependencies = [
 
 [[package]]
 name = "libc"
-version = "0.2.138"
+version = "0.2.139"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8"
+checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
 
 [[package]]
 name = "libm"
@@ -1436,18 +1445,18 @@ dependencies = [
 
 [[package]]
 name = "link-cplusplus"
-version = "1.0.7"
+version = "1.0.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9272ab7b96c9046fbc5bc56c06c117cb639fe2d509df0c421cad82d2915cf369"
+checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5"
 dependencies = [
  "cc",
 ]
 
 [[package]]
 name = "linux-raw-sys"
-version = "0.1.3"
+version = "0.1.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f"
+checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4"
 
 [[package]]
 name = "lock_api"
@@ -1669,11 +1678,11 @@ dependencies = [
 
 [[package]]
 name = "num_cpus"
-version = "1.14.0"
+version = "1.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5"
+checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
 dependencies = [
- "hermit-abi",
+ "hermit-abi 0.2.6",
  "libc",
 ]
 
@@ -1785,9 +1794,9 @@ dependencies = [
 
 [[package]]
 name = "paste"
-version = "1.0.9"
+version = "1.0.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b1de2e551fb905ac83f73f7aedf2f0cb4a0da7e35efa24a202a936269f1f18e1"
+checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba"
 
 [[package]]
 name = "percent-encoding"
@@ -1845,15 +1854,15 @@ dependencies = [
 
 [[package]]
 name = "proc-macro-hack"
-version = "0.5.19"
+version = "0.5.20+deprecated"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
+checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.47"
+version = "1.0.49"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
+checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5"
 dependencies = [
  "unicode-ident",
 ]
@@ -1870,9 +1879,9 @@ dependencies = [
 
 [[package]]
 name = "quote"
-version = "1.0.21"
+version = "1.0.23"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
+checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b"
 dependencies = [
  "proc-macro2",
 ]
@@ -2026,9 +2035,9 @@ dependencies = [
 
 [[package]]
 name = "rustix"
-version = "0.36.5"
+version = "0.36.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3807b5d10909833d3e9acd1eb5fb988f79376ff10fce42937de71a449c4c588"
+checksum = "4feacf7db682c6c329c4ede12649cd36ecab0f3be5b7d74e6a20304725db4549"
 dependencies = [
  "bitflags",
  "errno",
@@ -2061,9 +2070,9 @@ dependencies = [
 
 [[package]]
 name = "rustversion"
-version = "1.0.9"
+version = "1.0.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8"
+checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70"
 
 [[package]]
 name = "rustyline"
@@ -2090,9 +2099,9 @@ dependencies = [
 
 [[package]]
 name = "ryu"
-version = "1.0.11"
+version = "1.0.12"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
+checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"
 
 [[package]]
 name = "same-file"
@@ -2111,9 +2120,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
 
 [[package]]
 name = "scratch"
-version = "1.0.2"
+version = "1.0.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898"
+checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
 
 [[package]]
 name = "sct"
@@ -2127,24 +2136,24 @@ dependencies = [
 
 [[package]]
 name = "seq-macro"
-version = "0.3.1"
+version = "0.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0772c5c30e1a0d91f6834f8e545c69281c099dfa9a3ac58d96a9fd629c8d4898"
+checksum = "1685deded9b272198423bdbdb907d8519def2f26cf3699040e54e8c4fbd5c5ce"
 
 [[package]]
 name = "serde"
-version = "1.0.149"
+version = "1.0.152"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "256b9932320c590e707b94576e3cc1f7c9024d0ee6612dfbcf1cb106cbe8e055"
+checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.149"
+version = "1.0.152"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b4eae9b04cbffdfd550eb462ed33bc6a1b68c935127d008b27444d08380f94e4"
+checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2153,11 +2162,11 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.89"
+version = "1.0.91"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db"
+checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
 dependencies = [
- "itoa 1.0.4",
+ "itoa 1.0.5",
  "ryu",
  "serde",
 ]
@@ -2169,7 +2178,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
 dependencies = [
  "form_urlencoded",
- "itoa 1.0.4",
+ "itoa 1.0.5",
  "ryu",
  "serde",
 ]
@@ -2202,9 +2211,9 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
 
 [[package]]
 name = "snafu"
-version = "0.7.3"
+version = "0.7.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a152ba99b054b22972ee794cf04e5ef572da1229e33b65f3c57abbff0525a454"
+checksum = "cb0656e7e3ffb70f6c39b3c2a86332bb74aa3c679da781642590f3c1118c5045"
 dependencies = [
  "doc-comment",
  "snafu-derive",
@@ -2212,9 +2221,9 @@ dependencies = [
 
 [[package]]
 name = "snafu-derive"
-version = "0.7.3"
+version = "0.7.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d5e79cdebbabaebb06a9bdbaedc7f159b410461f63611d4d0e3fb0fab8fed850"
+checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2"
 dependencies = [
  "heck",
  "proc-macro2",
@@ -2298,9 +2307,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
 
 [[package]]
 name = "syn"
-version = "1.0.105"
+version = "1.0.107"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908"
+checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2338,18 +2347,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
 
 [[package]]
 name = "thiserror"
-version = "1.0.37"
+version = "1.0.38"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
+checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
 dependencies = [
  "thiserror-impl",
 ]
 
 [[package]]
 name = "thiserror-impl"
-version = "1.0.37"
+version = "1.0.38"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
+checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2525,9 +2534,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
 
 [[package]]
 name = "unicode-ident"
-version = "1.0.5"
+version = "1.0.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
+checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
 
 [[package]]
 name = "unicode-normalization"
@@ -2703,9 +2712,9 @@ dependencies = [
 
 [[package]]
 name = "webpki-roots"
-version = "0.22.5"
+version = "0.22.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be"
+checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
 dependencies = [
  "webpki",
 ]
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 18f590168..eaa01ca46 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -99,6 +99,7 @@ url = "2.2"
 uuid = { version = "1.0", features = ["v4"] }
 xz2 = { version = "0.1", optional = true }
 
+
 [dev-dependencies]
 arrow = { version = "29.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
 async-trait = "0.1.53"
@@ -114,6 +115,9 @@ sqlparser = "0.28"
 test-utils = { path = "../../test-utils" }
 thiserror = "1.0.37"
 
+[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
+nix = "0.26.1"
+
 [[bench]]
 harness = false
 name = "aggregate_query_sql"
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index a6b8a7904..b73b3881a 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -138,6 +138,7 @@ pub(crate) mod test_util {
                     limit,
                     table_partition_cols: vec![],
                     output_ordering: None,
+                    infinite_source: false,
                 },
                 &[],
             )
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 26ee98ea4..8458cc4ae 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -229,6 +229,11 @@ pub struct ListingOptions {
     ///
     /// See <https://github.com/apache/arrow-datafusion/issues/4177>
     pub file_sort_order: Option<Vec<Expr>>,
+    /// Infinite source means that the input is not guaranteed to end.
+    /// Currently, CSV, JSON, and AVRO formats are supported.
+    /// In order to support infinite inputs, DataFusion may adjust query
+    /// plans (e.g. joins) to run the given query in full pipelining mode.
+    pub infinite_source: bool,
 }
 
 impl ListingOptions {
@@ -246,9 +251,28 @@ impl ListingOptions {
             collect_stat: true,
             target_partitions: 1,
             file_sort_order: None,
+            infinite_source: false,
         }
     }
 
+    /// Set unbounded assumption on [`ListingOptions`] and returns self.
+    ///
+    /// ```
+    /// use std::sync::Arc;
+    /// use datafusion::datasource::{listing::ListingOptions, file_format::csv::CsvFormat};
+    /// use datafusion::prelude::SessionContext;
+    /// let ctx = SessionContext::new();
+    /// let listing_options = ListingOptions::new(Arc::new(
+    ///     CsvFormat::default()
+    ///   )).with_infinite_source(true);
+    ///
+    /// assert_eq!(listing_options.infinite_source, true);
+    /// ```
+    pub fn with_infinite_source(mut self, infinite_source: bool) -> Self {
+        self.infinite_source = infinite_source;
+        self
+    }
+
     /// Set file extension on [`ListingOptions`] and returns self.
     ///
     /// ```
@@ -419,6 +443,7 @@ pub struct ListingTable {
     options: ListingOptions,
     definition: Option<String>,
     collected_statistics: StatisticsCache,
+    infinite_source: bool,
 }
 
 impl ListingTable {
@@ -447,6 +472,7 @@ impl ListingTable {
                 false,
             ));
         }
+        let infinite_source = options.infinite_source;
 
         let table = Self {
             table_paths: config.table_paths,
@@ -455,6 +481,7 @@ impl ListingTable {
             options,
             definition: None,
             collected_statistics: Default::default(),
+            infinite_source,
         };
 
         Ok(table)
@@ -578,6 +605,7 @@ impl TableProvider for ListingTable {
                     limit,
                     output_ordering: self.try_create_output_ordering()?,
                     table_partition_cols,
+                    infinite_source: self.infinite_source,
                 },
                 filters,
             )
@@ -678,8 +706,9 @@ impl ListingTable {
 
 #[cfg(test)]
 mod tests {
+    use super::*;
     use crate::datasource::file_format::file_type::GetExt;
-    use crate::prelude::SessionContext;
+    use crate::prelude::*;
     use crate::{
         datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat},
         logical_expr::{col, lit},
@@ -688,8 +717,37 @@ mod tests {
     use arrow::datatypes::DataType;
     use chrono::DateTime;
     use datafusion_common::assert_contains;
+    use rstest::*;
+    use std::fs::File;
+    use tempfile::TempDir;
+
+    /// It creates dummy file and checks if it can create unbounded input executors.
+    async fn unbounded_table_helper(
+        file_type: FileType,
+        listing_option: ListingOptions,
+        infinite_data: bool,
+    ) -> Result<()> {
+        let ctx = SessionContext::new();
+        register_test_store(
+            &ctx,
+            &[(&format!("table/file{}", file_type.get_ext()), 100)],
+        );
 
-    use super::*;
+        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
+
+        let table_path = ListingTableUrl::parse("test:///table/").unwrap();
+        let config = ListingTableConfig::new(table_path)
+            .with_listing_options(listing_option)
+            .with_schema(Arc::new(schema));
+        // Create a table
+        let table = ListingTable::try_new(config)?;
+        // Create executor from table
+        let source_exec = table.scan(&ctx.state(), None, &[], None).await?;
+
+        assert_eq!(source_exec.unbounded_output(&[])?, infinite_data);
+
+        Ok(())
+    }
 
     #[tokio::test]
     async fn read_single_file() -> Result<()> {
@@ -770,7 +828,7 @@ mod tests {
         let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
         let schema = options.infer_schema(&state, &table_path).await.unwrap();
 
-        use physical_plan::expressions::col as physical_col;
+        use crate::physical_plan::expressions::col as physical_col;
         use std::ops::Add;
 
         // (file_sort_order, expected_result)
@@ -904,6 +962,96 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn unbounded_csv_table_without_schema() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let file_path = tmp_dir.path().join("dummy.csv");
+        File::create(file_path)?;
+        let ctx = SessionContext::new();
+        let error = ctx
+            .register_csv(
+                "test",
+                tmp_dir.path().to_str().unwrap(),
+                CsvReadOptions::new().mark_infinite(true),
+            )
+            .await
+            .unwrap_err();
+        match error {
+            DataFusionError::Plan(_) => Ok(()),
+            val => Err(val),
+        }
+    }
+
+    #[tokio::test]
+    async fn unbounded_json_table_without_schema() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let file_path = tmp_dir.path().join("dummy.json");
+        File::create(file_path)?;
+        let ctx = SessionContext::new();
+        let error = ctx
+            .register_json(
+                "test",
+                tmp_dir.path().to_str().unwrap(),
+                NdJsonReadOptions::default().mark_infinite(true),
+            )
+            .await
+            .unwrap_err();
+        match error {
+            DataFusionError::Plan(_) => Ok(()),
+            val => Err(val),
+        }
+    }
+
+    #[tokio::test]
+    async fn unbounded_avro_table_without_schema() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let file_path = tmp_dir.path().join("dummy.avro");
+        File::create(file_path)?;
+        let ctx = SessionContext::new();
+        let error = ctx
+            .register_avro(
+                "test",
+                tmp_dir.path().to_str().unwrap(),
+                AvroReadOptions::default().mark_infinite(true),
+            )
+            .await
+            .unwrap_err();
+        match error {
+            DataFusionError::Plan(_) => Ok(()),
+            val => Err(val),
+        }
+    }
+
+    #[rstest]
+    #[tokio::test]
+    async fn unbounded_csv_table(
+        #[values(true, false)] infinite_data: bool,
+    ) -> Result<()> {
+        let config = CsvReadOptions::new().mark_infinite(infinite_data);
+        let listing_options = config.to_listing_options(1);
+        unbounded_table_helper(FileType::CSV, listing_options, infinite_data).await
+    }
+
+    #[rstest]
+    #[tokio::test]
+    async fn unbounded_json_table(
+        #[values(true, false)] infinite_data: bool,
+    ) -> Result<()> {
+        let config = NdJsonReadOptions::default().mark_infinite(infinite_data);
+        let listing_options = config.to_listing_options(1);
+        unbounded_table_helper(FileType::JSON, listing_options, infinite_data).await
+    }
+
+    #[rstest]
+    #[tokio::test]
+    async fn unbounded_avro_table(
+        #[values(true, false)] infinite_data: bool,
+    ) -> Result<()> {
+        let config = AvroReadOptions::default().mark_infinite(infinite_data);
+        let listing_options = config.to_listing_options(1);
+        unbounded_table_helper(FileType::AVRO, listing_options, infinite_data).await
+    }
+
     #[tokio::test]
     async fn test_assert_list_files_for_scan_grouping() -> Result<()> {
         // more expected partitions than files
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index db6ab99fb..1fa6cc383 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -101,6 +101,8 @@ use crate::datasource::object_store::ObjectStoreUrl;
 use crate::execution::memory_pool::MemoryPool;
 use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
 use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
+use crate::physical_optimizer::pipeline_checker::PipelineChecker;
+use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
 use uuid::Uuid;
 
 use super::options::{
@@ -630,13 +632,19 @@ impl SessionContext {
 
         let listing_options = options.to_listing_options(target_partitions);
 
-        let resolved_schema = match options.schema {
-            Some(s) => s,
-            None => {
+        let resolved_schema = match (options.schema, options.infinite) {
+            (Some(s), _) => Arc::new(s.to_owned()),
+            (None, false) => {
                 listing_options
                     .infer_schema(&self.state(), &table_path)
                     .await?
             }
+            (None, true) => {
+                return Err(DataFusionError::Plan(
+                    "Schema inference for infinite data sources is not supported."
+                        .to_string(),
+                ))
+            }
         };
 
         let config = ListingTableConfig::new(table_path)
@@ -657,13 +665,19 @@ impl SessionContext {
 
         let listing_options = options.to_listing_options(target_partitions);
 
-        let resolved_schema = match options.schema {
-            Some(s) => s,
-            None => {
+        let resolved_schema = match (options.schema, options.infinite) {
+            (Some(s), _) => Arc::new(s.to_owned()),
+            (None, false) => {
                 listing_options
                     .infer_schema(&self.state(), &table_path)
                     .await?
             }
+            (None, true) => {
+                return Err(DataFusionError::Plan(
+                    "Schema inference for infinite data sources is not supported."
+                        .to_string(),
+                ))
+            }
         };
         let config = ListingTableConfig::new(table_path)
             .with_listing_options(listing_options)
@@ -690,13 +704,19 @@ impl SessionContext {
         let table_path = ListingTableUrl::parse(table_path)?;
         let target_partitions = self.copied_config().target_partitions();
         let listing_options = options.to_listing_options(target_partitions);
-        let resolved_schema = match options.schema {
-            Some(s) => Arc::new(s.to_owned()),
-            None => {
+        let resolved_schema = match (options.schema, options.infinite) {
+            (Some(s), _) => Arc::new(s.to_owned()),
+            (None, false) => {
                 listing_options
                     .infer_schema(&self.state(), &table_path)
                     .await?
             }
+            (None, true) => {
+                return Err(DataFusionError::Plan(
+                    "Schema inference for infinite data sources is not supported."
+                        .to_string(),
+                ))
+            }
         };
         let config = ListingTableConfig::new(table_path.clone())
             .with_listing_options(listing_options)
@@ -767,9 +787,15 @@ impl SessionContext {
         sql_definition: Option<String>,
     ) -> Result<()> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let resolved_schema = match provided_schema {
-            None => options.infer_schema(&self.state(), &table_path).await?,
-            Some(s) => s,
+        let resolved_schema = match (provided_schema, options.infinite_source) {
+            (Some(s), _) => s,
+            (None, false) => options.infer_schema(&self.state(), &table_path).await?,
+            (None, true) => {
+                return Err(DataFusionError::Plan(
+                    "Schema inference for infinite data sources is not supported."
+                        .to_string(),
+                ))
+            }
         };
         let config = ListingTableConfig::new(table_path)
             .with_listing_options(options)
@@ -817,7 +843,7 @@ impl SessionContext {
             name,
             table_path,
             listing_options,
-            options.schema,
+            options.schema.map(|s| Arc::new(s.to_owned())),
             None,
         )
         .await?;
@@ -854,7 +880,7 @@ impl SessionContext {
             name,
             table_path,
             listing_options,
-            options.schema,
+            options.schema.map(|s| Arc::new(s.to_owned())),
             None,
         )
         .await?;
@@ -1562,6 +1588,12 @@ impl SessionState {
         // and local sort to meet the distribution and ordering requirements.
         // Therefore, it should be run before BasicEnforcement
         physical_optimizers.push(Arc::new(JoinSelection::new()));
+        // If the query is processing infinite inputs, the PipelineFixer rule applies the
+        // necessary transformations to make the query runnable (if it is not already runnable).
+        // If the query can not be made runnable, the rule emits an error with a diagnostic message.
+        // Since the transformations it applies may alter output partitioning properties of operators
+        // (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
+        physical_optimizers.push(Arc::new(PipelineFixer::new()));
         // It's for adding essential repartition and local sorting operator to satisfy the
         // required distribution and local sort.
         // Please make sure that the whole plan tree is determined.
@@ -1587,7 +1619,11 @@ impl SessionState {
                     .unwrap(),
             )));
         }
-
+        // The PipelineChecker rule will reject non-runnable query plans that use
+        // pipeline-breaking operators on infinite input(s). The rule generates a
+        // diagnostic error message when this happens. It makes no changes to the
+        // given query plan; i.e. it only acts as a final gatekeeping rule.
+        physical_optimizers.push(Arc::new(PipelineChecker::new()));
         SessionState {
             session_id,
             optimizer: Optimizer::new(),
diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs
index 229eaa313..6ecf89e00 100644
--- a/datafusion/core/src/execution/options.rs
+++ b/datafusion/core/src/execution/options.rs
@@ -19,7 +19,7 @@
 
 use std::sync::Arc;
 
-use arrow::datatypes::{DataType, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Schema};
 
 use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
 use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
@@ -63,6 +63,8 @@ pub struct CsvReadOptions<'a> {
     pub table_partition_cols: Vec<(String, DataType)>,
     /// File compression type
     pub file_compression_type: FileCompressionType,
+    /// Flag indicating whether this file may be unbounded (as in a FIFO file).
+    pub infinite: bool,
 }
 
 impl<'a> Default for CsvReadOptions<'a> {
@@ -82,6 +84,7 @@ impl<'a> CsvReadOptions<'a> {
             file_extension: DEFAULT_CSV_EXTENSION,
             table_partition_cols: vec![],
             file_compression_type: FileCompressionType::UNCOMPRESSED,
+            infinite: false,
         }
     }
 
@@ -91,6 +94,12 @@ impl<'a> CsvReadOptions<'a> {
         self
     }
 
+    /// Configure mark_infinite setting
+    pub fn mark_infinite(mut self, infinite: bool) -> Self {
+        self.infinite = infinite;
+        self
+    }
+
     /// Specify delimiter to use for CSV read
     pub fn delimiter(mut self, delimiter: u8) -> Self {
         self.delimiter = delimiter;
@@ -153,6 +162,9 @@ impl<'a> CsvReadOptions<'a> {
             .with_file_extension(self.file_extension)
             .with_target_partitions(target_partitions)
             .with_table_partition_cols(self.table_partition_cols.clone())
+            // TODO: Add file sort order into CsvReadOptions and introduce here.
+            .with_file_sort_order(None)
+            .with_infinite_source(self.infinite)
     }
 }
 
@@ -237,13 +249,15 @@ impl<'a> ParquetReadOptions<'a> {
 #[derive(Clone)]
 pub struct AvroReadOptions<'a> {
     /// The data source schema.
-    pub schema: Option<SchemaRef>,
+    pub schema: Option<&'a Schema>,
 
     /// File extension; only files with this extension are selected for data input.
     /// Defaults to `FileType::AVRO.get_ext().as_str()`.
     pub file_extension: &'a str,
     /// Partition Columns
     pub table_partition_cols: Vec<(String, DataType)>,
+    /// Flag indicating whether this file may be unbounded (as in a FIFO file).
+    pub infinite: bool,
 }
 
 impl<'a> Default for AvroReadOptions<'a> {
@@ -252,6 +266,7 @@ impl<'a> Default for AvroReadOptions<'a> {
             schema: None,
             file_extension: DEFAULT_AVRO_EXTENSION,
             table_partition_cols: vec![],
+            infinite: false,
         }
     }
 }
@@ -274,6 +289,19 @@ impl<'a> AvroReadOptions<'a> {
             .with_file_extension(self.file_extension)
             .with_target_partitions(target_partitions)
             .with_table_partition_cols(self.table_partition_cols.clone())
+            .with_infinite_source(self.infinite)
+    }
+
+    /// Configure mark_infinite setting
+    pub fn mark_infinite(mut self, infinite: bool) -> Self {
+        self.infinite = infinite;
+        self
+    }
+
+    /// Specify schema to use for AVRO read
+    pub fn schema(mut self, schema: &'a Schema) -> Self {
+        self.schema = Some(schema);
+        self
     }
 }
 
@@ -286,7 +314,7 @@ impl<'a> AvroReadOptions<'a> {
 #[derive(Clone)]
 pub struct NdJsonReadOptions<'a> {
     /// The data source schema.
-    pub schema: Option<SchemaRef>,
+    pub schema: Option<&'a Schema>,
     /// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
     pub schema_infer_max_records: usize,
     /// File extension; only files with this extension are selected for data input.
@@ -296,6 +324,8 @@ pub struct NdJsonReadOptions<'a> {
     pub table_partition_cols: Vec<(String, DataType)>,
     /// File compression type
     pub file_compression_type: FileCompressionType,
+    /// Flag indicating whether this file may be unbounded (as in a FIFO file).
+    pub infinite: bool,
 }
 
 impl<'a> Default for NdJsonReadOptions<'a> {
@@ -306,6 +336,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
             file_extension: DEFAULT_JSON_EXTENSION,
             table_partition_cols: vec![],
             file_compression_type: FileCompressionType::UNCOMPRESSED,
+            infinite: false,
         }
     }
 }
@@ -326,6 +357,12 @@ impl<'a> NdJsonReadOptions<'a> {
         self
     }
 
+    /// Configure mark_infinite setting
+    pub fn mark_infinite(mut self, infinite: bool) -> Self {
+        self.infinite = infinite;
+        self
+    }
+
     /// Specify file_compression_type
     pub fn file_compression_type(
         mut self,
@@ -335,6 +372,12 @@ impl<'a> NdJsonReadOptions<'a> {
         self
     }
 
+    /// Specify schema to use for NdJson read
+    pub fn schema(mut self, schema: &'a Schema) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
     /// Helper to convert these user facing options to `ListingTable` options
     pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
         let file_format = JsonFormat::default()
@@ -344,5 +387,6 @@ impl<'a> NdJsonReadOptions<'a> {
             .with_file_extension(self.file_extension)
             .with_target_partitions(target_partitions)
             .with_table_partition_cols(self.table_partition_cols.clone())
+            .with_infinite_source(self.infinite)
     }
 }
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 59e7a0190..f017d0e0a 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -1021,6 +1021,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering,
+                infinite_source: false,
             },
             None,
             None,
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs
index 69e9e0f4d..c479e2c9a 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -89,21 +89,23 @@ fn supports_collect_by_size(
         false
     }
 }
-
-fn supports_swap(join_type: JoinType) -> bool {
-    match join_type {
+/// Predicate that checks whether the given join type supports input swapping.
+pub fn supports_swap(join_type: JoinType) -> bool {
+    matches!(
+        join_type,
         JoinType::Inner
-        | JoinType::Left
-        | JoinType::Right
-        | JoinType::Full
-        | JoinType::LeftSemi
-        | JoinType::RightSemi
-        | JoinType::LeftAnti
-        | JoinType::RightAnti => true,
-    }
+            | JoinType::Left
+            | JoinType::Right
+            | JoinType::Full
+            | JoinType::LeftSemi
+            | JoinType::RightSemi
+            | JoinType::LeftAnti
+            | JoinType::RightAnti
+    )
 }
-
-fn swap_join_type(join_type: JoinType) -> JoinType {
+/// This function returns the new join type we get after swapping the given
+/// join's inputs.
+pub fn swap_join_type(join_type: JoinType) -> JoinType {
     match join_type {
         JoinType::Inner => JoinType::Inner,
         JoinType::Full => JoinType::Full,
@@ -116,12 +118,13 @@ fn swap_join_type(join_type: JoinType) -> JoinType {
     }
 }
 
-fn swap_hash_join(
+/// This function swaps the inputs of the given join operator.
+pub fn swap_hash_join(
     hash_join: &HashJoinExec,
     partition_mode: PartitionMode,
-    left: &Arc<dyn ExecutionPlan>,
-    right: &Arc<dyn ExecutionPlan>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
+    let left = hash_join.left();
+    let right = hash_join.right();
     let new_join = HashJoinExec::try_new(
         Arc::clone(right),
         Arc::clone(left),
@@ -153,12 +156,11 @@ fn swap_hash_join(
     }
 }
 
-/// When the order of the join is changed by the optimizer,
-/// the columns in the output should not be impacted.
-/// This helper creates the expressions that will allow to swap
-/// back the values from the original left as first columns and
-/// those on the right next
-fn swap_reverting_projection(
+/// When the order of the join is changed by the optimizer, the columns in
+/// the output should not be impacted. This function creates the expressions
+/// that will allow to swap back the values from the original left as the first
+/// columns and those on the right next.
+pub fn swap_reverting_projection(
     left_schema: &Schema,
     right_schema: &Schema,
 ) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
@@ -241,8 +243,6 @@ impl PhysicalOptimizerRule for JoinSelection {
                             Ok(Some(swap_hash_join(
                                 hash_join,
                                 PartitionMode::Partitioned,
-                                left,
-                                right,
                             )?))
                         } else {
                             Ok(None)
@@ -320,12 +320,7 @@ fn try_collect_left(
             if should_swap_join_order(&**left, &**right)
                 && supports_swap(*hash_join.join_type())
             {
-                Ok(Some(swap_hash_join(
-                    hash_join,
-                    PartitionMode::CollectLeft,
-                    left,
-                    right,
-                )?))
+                Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?))
             } else {
                 Ok(Some(Arc::new(HashJoinExec::try_new(
                     Arc::clone(left),
@@ -349,12 +344,7 @@ fn try_collect_left(
         )?))),
         (false, true) => {
             if supports_swap(*hash_join.join_type()) {
-                Ok(Some(swap_hash_join(
-                    hash_join,
-                    PartitionMode::CollectLeft,
-                    left,
-                    right,
-                )?))
+                Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?))
             } else {
                 Ok(None)
             }
@@ -368,7 +358,7 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPl
     let right = hash_join.right();
     if should_swap_join_order(&**left, &**right) && supports_swap(*hash_join.join_type())
     {
-        swap_hash_join(hash_join, PartitionMode::Partitioned, left, right)
+        swap_hash_join(hash_join, PartitionMode::Partitioned)
     } else {
         Ok(Arc::new(HashJoinExec::try_new(
             Arc::clone(left),
diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs
index 86ec6f846..fb07d54b9 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -25,8 +25,13 @@ pub mod global_sort_selection;
 pub mod join_selection;
 pub mod optimize_sorts;
 pub mod optimizer;
+pub mod pipeline_checker;
 pub mod pruning;
 pub mod repartition;
 mod utils;
 
+pub mod pipeline_fixer;
+#[cfg(test)]
+pub mod test_utils;
+
 pub use optimizer::PhysicalOptimizerRule;
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
new file mode 100644
index 000000000..c35ef29f2
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -0,0 +1,389 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The [PipelineChecker] rule ensures that a given plan can accommodate its
+//! infinite sources, if there are any. It will reject non-runnable query plans
+//! that use pipeline-breaking operators on infinite input(s).
+//!
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use std::sync::Arc;
+
+/// The PipelineChecker rule rejects non-runnable query plans that use
+/// pipeline-breaking operators on infinite input(s).
+#[derive(Default)]
+pub struct PipelineChecker {}
+
+impl PipelineChecker {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for PipelineChecker {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let pipeline = PipelineStatePropagator::new(plan);
+        let state = pipeline.transform_up(&check_finiteness_requirements)?;
+        Ok(state.plan)
+    }
+
+    fn name(&self) -> &str {
+        "PipelineChecker"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// [PipelineStatePropagator] propagates the [ExecutionPlan] pipelining information.
+#[derive(Clone, Debug)]
+pub struct PipelineStatePropagator {
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub(crate) unbounded: bool,
+    pub(crate) children_unbounded: Vec<bool>,
+}
+
+impl PipelineStatePropagator {
+    /// Constructs a new, default pipelining state.
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let length = plan.children().len();
+        PipelineStatePropagator {
+            plan,
+            unbounded: false,
+            children_unbounded: vec![false; length],
+        }
+    }
+}
+
+impl TreeNodeRewritable for PipelineStatePropagator {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.plan.children();
+        if !children.is_empty() {
+            let new_children = children
+                .into_iter()
+                .map(|child| PipelineStatePropagator::new(child))
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            let children_unbounded = new_children
+                .iter()
+                .map(|c| c.unbounded)
+                .collect::<Vec<bool>>();
+            let children_plans = new_children
+                .into_iter()
+                .map(|child| child.plan)
+                .collect::<Vec<_>>();
+            Ok(PipelineStatePropagator {
+                plan: with_new_children_if_necessary(self.plan, children_plans)?,
+                unbounded: self.unbounded,
+                children_unbounded,
+            })
+        } else {
+            Ok(self)
+        }
+    }
+}
+
+/// This function propagates finiteness information and rejects any plan with
+/// pipeline-breaking operators acting on infinite inputs.
+pub fn check_finiteness_requirements(
+    input: PipelineStatePropagator,
+) -> Result<Option<PipelineStatePropagator>> {
+    let plan = input.plan;
+    let children = input.children_unbounded;
+    plan.unbounded_output(&children).map(|value| {
+        Some(PipelineStatePropagator {
+            plan,
+            unbounded: value,
+            children_unbounded: children,
+        })
+    })
+}
+
+#[cfg(test)]
+mod sql_tests {
+    use super::*;
+    use crate::physical_optimizer::test_utils::{
+        BinaryTestCase, QueryCase, SourceType, UnaryTestCase,
+    };
+
+    #[tokio::test]
+    async fn test_hash_left_join_swap() -> Result<()> {
+        let test1 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Bounded),
+            expect_fail: false,
+        };
+        let test2 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Unbounded),
+            expect_fail: true,
+        };
+        let test3 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Unbounded),
+            expect_fail: true,
+        };
+        let test4 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Bounded),
+            expect_fail: false,
+        };
+        let case = QueryCase {
+            sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1"
+                .to_string(),
+            cases: vec![
+                Arc::new(test1),
+                Arc::new(test2),
+                Arc::new(test3),
+                Arc::new(test4),
+            ],
+            error_operator: "Join Error".to_string(),
+        };
+
+        case.run().await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_hash_right_join_swap() -> Result<()> {
+        let test1 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Bounded),
+            expect_fail: true,
+        };
+        let test2 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Unbounded),
+            expect_fail: true,
+        };
+        let test3 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Unbounded),
+            expect_fail: false,
+        };
+        let test4 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Bounded),
+            expect_fail: false,
+        };
+        let case = QueryCase {
+            sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1"
+                .to_string(),
+            cases: vec![
+                Arc::new(test1),
+                Arc::new(test2),
+                Arc::new(test3),
+                Arc::new(test4),
+            ],
+            error_operator: "Join Error".to_string(),
+        };
+
+        case.run().await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_hash_inner_join_swap() -> Result<()> {
+        let test1 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Bounded),
+            expect_fail: false,
+        };
+        let test2 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Unbounded),
+            expect_fail: true,
+        };
+        let test3 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Unbounded),
+            expect_fail: false,
+        };
+        let test4 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Bounded),
+            expect_fail: false,
+        };
+        let case = QueryCase {
+            sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1"
+                .to_string(),
+            cases: vec![
+                Arc::new(test1),
+                Arc::new(test2),
+                Arc::new(test3),
+                Arc::new(test4),
+            ],
+            error_operator: "Join Error".to_string(),
+        };
+
+        case.run().await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_hash_full_outer_join_swap() -> Result<()> {
+        let test1 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Bounded),
+            expect_fail: true,
+        };
+        let test2 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Unbounded),
+            expect_fail: true,
+        };
+        let test3 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Unbounded),
+            expect_fail: true,
+        };
+        let test4 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Bounded),
+            expect_fail: false,
+        };
+        let case = QueryCase {
+            sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1"
+                .to_string(),
+            cases: vec![
+                Arc::new(test1),
+                Arc::new(test2),
+                Arc::new(test3),
+                Arc::new(test4),
+            ],
+            error_operator: "Join Error".to_string(),
+        };
+
+        case.run().await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_aggregate() -> Result<()> {
+        let test1 = UnaryTestCase {
+            source_type: SourceType::Bounded,
+            expect_fail: false,
+        };
+        let test2 = UnaryTestCase {
+            source_type: SourceType::Unbounded,
+            expect_fail: true,
+        };
+        let case = QueryCase {
+            sql: "SELECT c1, MIN(c4) FROM test GROUP BY c1".to_string(),
+            cases: vec![Arc::new(test1), Arc::new(test2)],
+            error_operator: "Aggregate Error".to_string(),
+        };
+
+        case.run().await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_window_agg_hash_partition() -> Result<()> {
+        let test1 = UnaryTestCase {
+            source_type: SourceType::Bounded,
+            expect_fail: false,
+        };
+        let test2 = UnaryTestCase {
+            source_type: SourceType::Unbounded,
+            expect_fail: true,
+        };
+        let case = QueryCase {
+            sql: "SELECT
+                    c9,
+                    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
+                  FROM test
+                  LIMIT 5".to_string(),
+            cases: vec![Arc::new(test1), Arc::new(test2)],
+            error_operator: "Window Error".to_string()
+        };
+
+        case.run().await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_window_agg_single_partition() -> Result<()> {
+        let test1 = UnaryTestCase {
+            source_type: SourceType::Bounded,
+            expect_fail: false,
+        };
+        let test2 = UnaryTestCase {
+            source_type: SourceType::Unbounded,
+            expect_fail: true,
+        };
+        let case = QueryCase {
+            sql: "SELECT
+                        c9,
+                        SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1
+                  FROM test".to_string(),
+            cases: vec![Arc::new(test1), Arc::new(test2)],
+            error_operator: "Window Error".to_string()
+        };
+        case.run().await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_hash_cross_join() -> Result<()> {
+        let test1 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Bounded),
+            expect_fail: true,
+        };
+        let test2 = BinaryTestCase {
+            source_types: (SourceType::Unbounded, SourceType::Unbounded),
+            expect_fail: true,
+        };
+        let test3 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Unbounded),
+            expect_fail: true,
+        };
+        let test4 = BinaryTestCase {
+            source_types: (SourceType::Bounded, SourceType::Bounded),
+            expect_fail: false,
+        };
+        let case = QueryCase {
+            sql: "SELECT t2.c1 FROM left as t1 CROSS JOIN right as t2".to_string(),
+            cases: vec![
+                Arc::new(test1),
+                Arc::new(test2),
+                Arc::new(test3),
+                Arc::new(test4),
+            ],
+            error_operator: "Cross Join Error".to_string(),
+        };
+
+        case.run().await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_analyzer() -> Result<()> {
+        let test1 = UnaryTestCase {
+            source_type: SourceType::Bounded,
+            expect_fail: false,
+        };
+        let test2 = UnaryTestCase {
+            source_type: SourceType::Unbounded,
+            expect_fail: true,
+        };
+        let case = QueryCase {
+            sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(),
+            cases: vec![Arc::new(test1), Arc::new(test2)],
+            error_operator: "Analyze Error".to_string(),
+        };
+
+        case.run().await?;
+        Ok(())
+    }
+}
diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
new file mode 100644
index 000000000..92cb45d56
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
@@ -0,0 +1,623 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The [PipelineFixer] rule tries to modify a given plan so that it can
+//! accommodate its infinite sources, if there are any. In other words,
+//! it tries to obtain a runnable query (with the given infinite sources)
+//! from an non-runnable query by transforming pipeline-breaking operations
+//! to pipeline-friendly ones. If this can not be done, the rule emits a
+//! diagnostic error message.
+//!
+use crate::config::ConfigOptions;
+use crate::error::Result;
+use crate::physical_optimizer::join_selection::swap_hash_join;
+use crate::physical_optimizer::pipeline_checker::{
+    check_finiteness_requirements, PipelineStatePropagator,
+};
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::joins::{HashJoinExec, PartitionMode};
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+use crate::physical_plan::ExecutionPlan;
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::JoinType;
+use std::sync::Arc;
+
+/// The [PipelineFixer] rule tries to modify a given plan so that it can
+/// accommodate its infinite sources, if there are any. If this is not
+/// possible, the rule emits a diagnostic error message.
+#[derive(Default)]
+pub struct PipelineFixer {}
+
+impl PipelineFixer {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+type PipelineFixerSubrule =
+    dyn Fn(&PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;
+impl PhysicalOptimizerRule for PipelineFixer {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let pipeline = PipelineStatePropagator::new(plan);
+        let physical_optimizer_subrules: Vec<Box<PipelineFixerSubrule>> =
+            vec![Box::new(hash_join_swap_subrule)];
+        let state = pipeline.transform_up(&|p| {
+            apply_subrules_and_check_finiteness_requirements(
+                p,
+                &physical_optimizer_subrules,
+            )
+        })?;
+        Ok(state.plan)
+    }
+
+    fn name(&self) -> &str {
+        "PipelineFixer"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
+
+/// This subrule will swap build/probe sides of a hash join depending on whether its inputs
+/// may produce an infinite stream of records. The rule ensures that the left (build) side
+/// of the hash join always operates on an input stream that will produce a finite set of.
+/// records If the left side can not be chosen to be "finite", the order stays the
+/// same as the original query.
+/// ```text
+/// For example, this rule makes the following transformation:
+///
+///
+///
+///           +--------------+              +--------------+
+///           |              |  unbounded   |              |
+///    Left   | Infinite     |    true      | Hash         |\true
+///           | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
+///           |              |              |              |  \  |              |       |              |
+///           +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |
+///                                                            - |              |       |              |
+///           +--------------+              +--------------+  /  +--------------+       +--------------+
+///           |              |  unbounded   |              | /
+///    Right  | Finite       |    false     | Hash         |/false
+///           | Data Source  |--------------| Repartition  |
+///           |              |              |              |
+///           +--------------+              +--------------+
+///
+///
+///
+///           +--------------+              +--------------+
+///           |              |  unbounded   |              |
+///    Left   | Finite       |    false     | Hash         |\false
+///           | Data source  |--------------| Repartition  | \   +--------------+       +--------------+
+///           |              |              |              |  \  |              | true  |              | true
+///           +--------------+              +--------------+   - |  Hash Join   |-------| Projection   |-----
+///                                                            - |              |       |              |
+///           +--------------+              +--------------+  /  +--------------+       +--------------+
+///           |              |  unbounded   |              | /
+///    Right  | Infinite     |    true      | Hash         |/true
+///           | Data Source  |--------------| Repartition  |
+///           |              |              |              |
+///           +--------------+              +--------------+
+///
+/// ```
+fn hash_join_swap_subrule(
+    input: &PipelineStatePropagator,
+) -> Option<Result<PipelineStatePropagator>> {
+    let plan = input.plan.clone();
+    let children = &input.children_unbounded;
+    if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+        let (left_unbounded, right_unbounded) = (children[0], children[1]);
+        let new_plan = if left_unbounded && !right_unbounded {
+            if matches!(
+                *hash_join.join_type(),
+                JoinType::Inner
+                    | JoinType::Left
+                    | JoinType::LeftSemi
+                    | JoinType::LeftAnti
+            ) {
+                swap(hash_join)
+            } else {
+                Ok(plan)
+            }
+        } else {
+            Ok(plan)
+        };
+        let new_state = new_plan.map(|plan| PipelineStatePropagator {
+            plan,
+            unbounded: left_unbounded || right_unbounded,
+            children_unbounded: vec![left_unbounded, right_unbounded],
+        });
+        Some(new_state)
+    } else {
+        None
+    }
+}
+
+/// This function swaps sides of a hash join to make it runnable even if one of its
+/// inputs are infinite. Note that this is not always possible; i.e. [JoinType::Full],
+/// [JoinType::Right], [JoinType::RightAnti] and [JoinType::RightSemi] can not run with
+/// an unbounded left side, even if we swap. Therefore, we do not consider them here.
+fn swap(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
+    let partition_mode = hash_join.partition_mode();
+    let join_type = hash_join.join_type();
+    match (*partition_mode, *join_type) {
+        (
+            _,
+            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full,
+        ) => Err(DataFusionError::Internal(format!(
+            "{} join cannot be swapped for unbounded input.",
+            join_type
+        ))),
+        (PartitionMode::Partitioned, _) => {
+            swap_hash_join(hash_join, PartitionMode::Partitioned)
+        }
+        (PartitionMode::CollectLeft, _) => {
+            swap_hash_join(hash_join, PartitionMode::CollectLeft)
+        }
+        (PartitionMode::Auto, _) => Err(DataFusionError::Internal(
+            "Auto is not acceptable for unbounded input here.".to_string(),
+        )),
+    }
+}
+
+fn apply_subrules_and_check_finiteness_requirements(
+    mut input: PipelineStatePropagator,
+    physical_optimizer_subrules: &Vec<Box<PipelineFixerSubrule>>,
+) -> Result<Option<PipelineStatePropagator>> {
+    for sub_rule in physical_optimizer_subrules {
+        if let Some(value) = sub_rule(&input).transpose()? {
+            input = value;
+        }
+    }
+    check_finiteness_requirements(input)
+}
+
+#[cfg(test)]
+mod hash_join_tests {
+    use super::*;
+    use crate::physical_optimizer::join_selection::swap_join_type;
+    use crate::physical_optimizer::test_utils::SourceType;
+    use crate::physical_plan::expressions::Column;
+    use crate::physical_plan::projection::ProjectionExec;
+    use crate::{physical_plan::joins::PartitionMode, test::exec::UnboundedExec};
+    use arrow::datatypes::{DataType, Field, Schema};
+    use std::sync::Arc;
+
+    struct TestCase {
+        case: String,
+        initial_sources_unbounded: (SourceType, SourceType),
+        initial_join_type: JoinType,
+        initial_mode: PartitionMode,
+        expected_sources_unbounded: (SourceType, SourceType),
+        expected_join_type: JoinType,
+        expected_mode: PartitionMode,
+        expecting_swap: bool,
+    }
+
+    #[tokio::test]
+    async fn test_join_with_swap_full() -> Result<()> {
+        // NOTE: Currently, some initial conditions are not viable after join order selection.
+        //       For example, full join always comes in partitioned mode. See the warning in
+        //       function "swap". If this changes in the future, we should update these tests.
+        let cases = vec![
+            TestCase {
+                case: "Bounded - Unbounded 1".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                initial_join_type: JoinType::Full,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                expected_join_type: JoinType::Full,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            },
+            TestCase {
+                case: "Unbounded - Bounded 2".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+                initial_join_type: JoinType::Full,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+                expected_join_type: JoinType::Full,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            },
+            TestCase {
+                case: "Bounded - Bounded 3".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                initial_join_type: JoinType::Full,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                expected_join_type: JoinType::Full,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            },
+            TestCase {
+                case: "Unbounded - Unbounded 4".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+                initial_join_type: JoinType::Full,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: JoinType::Full,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            },
+        ];
+        for case in cases.into_iter() {
+            test_join_with_maybe_swap_unbounded_case(case).await?
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_cases_without_collect_left_check() -> Result<()> {
+        let mut cases = vec![];
+        let join_types = vec![JoinType::LeftSemi, JoinType::Inner];
+        for join_type in join_types {
+            cases.push(TestCase {
+                case: "Unbounded - Bounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                expected_join_type: swap_join_type(join_type),
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: true,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Unbounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Bounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Bounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                expected_join_type: swap_join_type(join_type),
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: true,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Unbounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Bounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+        }
+
+        for case in cases.into_iter() {
+            test_join_with_maybe_swap_unbounded_case(case).await?
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_not_support_collect_left() -> Result<()> {
+        let mut cases = vec![];
+        // After [JoinSelection] optimization, these join types cannot run in CollectLeft mode except
+        // [JoinType::LeftSemi]
+        let the_ones_not_support_collect_left = vec![JoinType::Left, JoinType::LeftAnti];
+        for join_type in the_ones_not_support_collect_left {
+            cases.push(TestCase {
+                case: "Unbounded - Bounded".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                expected_join_type: swap_join_type(join_type),
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: true,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Unbounded".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Bounded".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+        }
+
+        for case in cases.into_iter() {
+            test_join_with_maybe_swap_unbounded_case(case).await?
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_not_supporting_swaps_possible_collect_left() -> Result<()> {
+        let mut cases = vec![];
+        let the_ones_not_support_collect_left =
+            vec![JoinType::Right, JoinType::RightAnti, JoinType::RightSemi];
+        for join_type in the_ones_not_support_collect_left {
+            // We expect that (SourceType::Unbounded, SourceType::Bounded) will change, regardless of the
+            // statistics.
+            cases.push(TestCase {
+                case: "Unbounded - Bounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            // We expect that (SourceType::Bounded, SourceType::Unbounded) will stay same, regardless of the
+            // statistics.
+            cases.push(TestCase {
+                case: "Bounded - Unbounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            //
+            cases.push(TestCase {
+                case: "Bounded - Bounded / CollectLeft".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::CollectLeft,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::CollectLeft,
+                expecting_swap: false,
+            });
+            // If cases are partitioned, only unbounded & bounded check will affect the order.
+            cases.push(TestCase {
+                case: "Unbounded - Bounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Unbounded, SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Unbounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Unbounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Bounded - Bounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (SourceType::Bounded, SourceType::Bounded),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+            cases.push(TestCase {
+                case: "Unbounded - Unbounded / Partitioned".to_string(),
+                initial_sources_unbounded: (SourceType::Unbounded, SourceType::Unbounded),
+                initial_join_type: join_type,
+                initial_mode: PartitionMode::Partitioned,
+                expected_sources_unbounded: (
+                    SourceType::Unbounded,
+                    SourceType::Unbounded,
+                ),
+                expected_join_type: join_type,
+                expected_mode: PartitionMode::Partitioned,
+                expecting_swap: false,
+            });
+        }
+
+        for case in cases.into_iter() {
+            test_join_with_maybe_swap_unbounded_case(case).await?
+        }
+        Ok(())
+    }
+    #[allow(clippy::vtable_address_comparisons)]
+    async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> {
+        let left_unbounded = t.initial_sources_unbounded.0 == SourceType::Unbounded;
+        let right_unbounded = t.initial_sources_unbounded.1 == SourceType::Unbounded;
+        let left_exec = Arc::new(UnboundedExec::new(
+            left_unbounded,
+            Schema::new(vec![Field::new("a", DataType::Int32, false)]),
+        )) as Arc<dyn ExecutionPlan>;
+        let right_exec = Arc::new(UnboundedExec::new(
+            right_unbounded,
+            Schema::new(vec![Field::new("b", DataType::Int32, false)]),
+        )) as Arc<dyn ExecutionPlan>;
+
+        let join = HashJoinExec::try_new(
+            Arc::clone(&left_exec),
+            Arc::clone(&right_exec),
+            vec![(
+                Column::new_with_schema("a", &left_exec.schema())?,
+                Column::new_with_schema("b", &right_exec.schema())?,
+            )],
+            None,
+            &t.initial_join_type,
+            t.initial_mode,
+            &false,
+        )?;
+
+        let initial_hash_join_state = PipelineStatePropagator {
+            plan: Arc::new(join),
+            unbounded: false,
+            children_unbounded: vec![left_unbounded, right_unbounded],
+        };
+        let optimized_hash_join =
+            hash_join_swap_subrule(&initial_hash_join_state).unwrap()?;
+        let optimized_join_plan = optimized_hash_join.plan;
+
+        // If swap did happen
+        let projection_added = optimized_join_plan.as_any().is::<ProjectionExec>();
+        let plan = if projection_added {
+            let proj = optimized_join_plan
+                .as_any()
+                .downcast_ref::<ProjectionExec>()
+                .expect(
+                    "A proj is required to swap columns back to their original order",
+                );
+            proj.input().clone()
+        } else {
+            optimized_join_plan
+        };
+
+        if let Some(HashJoinExec {
+            left,
+            right,
+            join_type,
+            mode,
+            ..
+        }) = plan.as_any().downcast_ref::<HashJoinExec>()
+        {
+            let left_changed = Arc::ptr_eq(left, &right_exec);
+            let right_changed = Arc::ptr_eq(right, &left_exec);
+            // If this is not equal, we have a bigger problem.
+            assert_eq!(left_changed, right_changed);
+            assert_eq!(
+                (
+                    t.case.as_str(),
+                    if left.unbounded_output(&[])? {
+                        SourceType::Unbounded
+                    } else {
+                        SourceType::Bounded
+                    },
+                    if right.unbounded_output(&[])? {
+                        SourceType::Unbounded
+                    } else {
+                        SourceType::Bounded
+                    },
+                    join_type,
+                    mode,
+                    left_changed && right_changed
+                ),
+                (
+                    t.case.as_str(),
+                    t.expected_sources_unbounded.0,
+                    t.expected_sources_unbounded.1,
+                    &t.expected_join_type,
+                    &t.expected_mode,
+                    t.expecting_swap
+                )
+            );
+        };
+        Ok(())
+    }
+}
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 66359ebf6..5bdbf59a6 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -269,6 +269,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: None,
+                infinite_source: false,
             },
             None,
             None,
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs
new file mode 100644
index 000000000..ce6e252c7
--- /dev/null
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Collection of testing utility functions that are leveraged by the query optimizer rules
+
+use crate::error::Result;
+use crate::prelude::{CsvReadOptions, SessionContext};
+use async_trait::async_trait;
+use std::sync::Arc;
+
+async fn register_current_csv(
+    ctx: &SessionContext,
+    table_name: &str,
+    infinite: bool,
+) -> Result<()> {
+    let testdata = crate::test_util::arrow_test_data();
+    let schema = crate::test_util::aggr_test_schema();
+    ctx.register_csv(
+        table_name,
+        &format!("{}/csv/aggregate_test_100.csv", testdata),
+        CsvReadOptions::new()
+            .schema(&schema)
+            .mark_infinite(infinite),
+    )
+    .await?;
+    Ok(())
+}
+
+#[derive(Eq, PartialEq, Debug)]
+pub enum SourceType {
+    Unbounded,
+    Bounded,
+}
+
+#[async_trait]
+pub trait SqlTestCase {
+    async fn register_table(&self, ctx: &SessionContext) -> Result<()>;
+    fn expect_fail(&self) -> bool;
+}
+
+/// [UnaryTestCase] is designed for single input [ExecutionPlan]s.
+pub struct UnaryTestCase {
+    pub(crate) source_type: SourceType,
+    pub(crate) expect_fail: bool,
+}
+
+#[async_trait]
+impl SqlTestCase for UnaryTestCase {
+    async fn register_table(&self, ctx: &SessionContext) -> Result<()> {
+        let table_is_infinite = self.source_type == SourceType::Unbounded;
+        register_current_csv(ctx, "test", table_is_infinite).await?;
+        Ok(())
+    }
+
+    fn expect_fail(&self) -> bool {
+        self.expect_fail
+    }
+}
+/// [BinaryTestCase] is designed for binary input [ExecutionPlan]s.
+pub struct BinaryTestCase {
+    pub(crate) source_types: (SourceType, SourceType),
+    pub(crate) expect_fail: bool,
+}
+
+#[async_trait]
+impl SqlTestCase for BinaryTestCase {
+    async fn register_table(&self, ctx: &SessionContext) -> Result<()> {
+        let left_table_is_infinite = self.source_types.0 == SourceType::Unbounded;
+        let right_table_is_infinite = self.source_types.1 == SourceType::Unbounded;
+        register_current_csv(ctx, "left", left_table_is_infinite).await?;
+        register_current_csv(ctx, "right", right_table_is_infinite).await?;
+        Ok(())
+    }
+
+    fn expect_fail(&self) -> bool {
+        self.expect_fail
+    }
+}
+
+pub struct QueryCase {
+    pub(crate) sql: String,
+    pub(crate) cases: Vec<Arc<dyn SqlTestCase>>,
+    pub(crate) error_operator: String,
+}
+
+impl QueryCase {
+    /// Run the test cases
+    pub(crate) async fn run(&self) -> Result<()> {
+        for case in &self.cases {
+            let ctx = SessionContext::new();
+            case.register_table(&ctx).await?;
+            let error = if case.expect_fail() {
+                Some(&self.error_operator)
+            } else {
+                None
+            };
+            self.run_case(ctx, error).await?;
+        }
+        Ok(())
+    }
+    async fn run_case(&self, ctx: SessionContext, error: Option<&String>) -> Result<()> {
+        let dataframe = ctx.sql(self.sql.as_str()).await?;
+        let plan = dataframe.create_physical_plan().await;
+        if error.is_some() {
+            let plan_error = plan.unwrap_err();
+            let initial = error.unwrap().to_string();
+            assert!(plan_error.to_string().contains(initial.as_str()));
+        } else {
+            assert!(plan.is_ok())
+        }
+        Ok(())
+    }
+}
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 6d7c3c21b..f5591f2c3 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -30,7 +30,7 @@ use crate::physical_plan::{
 use arrow::array::ArrayRef;
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use datafusion_common::Result;
+use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::{
@@ -367,6 +367,19 @@ impl ExecutionPlan for AggregateExec {
         }
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.    
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        if children[0] {
+            Err(DataFusionError::Plan(
+                "Aggregate Error: `GROUP BY` clause (including the more general GROUPING SET) is not supported for unbounded inputs.".to_string(),
+            ))
+        } else {
+            Ok(false)
+        }
+    }
+
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         None
     }
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index b0578bd48..7f1cfe57a 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -41,7 +41,7 @@ pub struct AnalyzeExec {
     /// control how much extra to print
     verbose: bool,
     /// The input plan (the plan being analyzed)
-    input: Arc<dyn ExecutionPlan>,
+    pub(crate) input: Arc<dyn ExecutionPlan>,
     /// The output schema for RecordBatches of this exec node
     schema: SchemaRef,
 }
@@ -76,6 +76,20 @@ impl ExecutionPlan for AnalyzeExec {
         vec![Distribution::SinglePartition]
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        if children[0] {
+            Err(DataFusionError::Plan(
+                "Analyze Error: Analysis is not supported for unbounded inputs"
+                    .to_string(),
+            ))
+        } else {
+            Ok(false)
+        }
+    }
+
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
         Partitioning::UnknownPartitioning(1)
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index afa2a85ab..582edc103 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -96,6 +96,13 @@ impl ExecutionPlan for CoalesceBatchesExec {
         self.input.output_partitioning()
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         // The coalesce batches operator does not make any changes to the sorting of its input
         self.input.output_ordering()
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index 816a9c940..d765c275a 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -80,6 +80,12 @@ impl ExecutionPlan for CoalescePartitionsExec {
         vec![self.input.clone()]
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
         Partitioning::UnknownPartitioning(1)
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index ab522dc94..91d27c3e3 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -72,6 +72,10 @@ impl ExecutionPlan for AvroExec {
         Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
     }
 
+    fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
+        Ok(self.base_config().infinite_source)
+    }
+
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         get_output_ordering(&self.base_config)
     }
@@ -259,6 +263,7 @@ mod tests {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: None,
+            infinite_source: false,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
         let mut results = avro_exec
@@ -330,6 +335,7 @@ mod tests {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: None,
+            infinite_source: false,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
@@ -404,6 +410,7 @@ mod tests {
                 partition_type_wrap(DataType::Utf8),
             )],
             output_ordering: None,
+            infinite_source: false,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index b759b7254..3afd844cf 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -109,6 +109,10 @@ impl ExecutionPlan for CsvExec {
         Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
     }
 
+    fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
+        Ok(self.base_config().infinite_source)
+    }
+
     /// See comments on `impl ExecutionPlan for ParquetExec`: output order can't be
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         get_output_ordering(&self.base_config)
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 0481ca64d..3d09377bf 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -351,6 +351,7 @@ mod tests {
             limit,
             table_partition_cols: vec![],
             output_ordering: None,
+            infinite_source: false,
         };
 
         let file_stream =
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index a4be015c5..14fe0128c 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -87,6 +87,10 @@ impl ExecutionPlan for NdJsonExec {
         Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
     }
 
+    fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
+        Ok(self.base_config.infinite_source)
+    }
+
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         get_output_ordering(&self.base_config)
     }
@@ -382,6 +386,7 @@ mod tests {
                 limit: Some(3),
                 table_partition_cols: vec![],
                 output_ordering: None,
+                infinite_source: false,
             },
             file_compression_type.to_owned(),
         );
@@ -456,6 +461,7 @@ mod tests {
                 limit: Some(3),
                 table_partition_cols: vec![],
                 output_ordering: None,
+                infinite_source: false,
             },
             file_compression_type.to_owned(),
         );
@@ -500,6 +506,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: None,
+                infinite_source: false,
             },
             file_compression_type.to_owned(),
         );
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 6e9b6b8f8..de4b58c1e 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -104,6 +104,8 @@ pub struct FileScanConfig {
     pub table_partition_cols: Vec<(String, DataType)>,
     /// The order in which the data is sorted, if known.
     pub output_ordering: Option<Vec<PhysicalSortExpr>>,
+    /// Indicates whether this plan may produce an infinite stream of records.
+    pub infinite_source: bool,
 }
 
 impl FileScanConfig {
@@ -809,6 +811,7 @@ mod tests {
             statistics,
             table_partition_cols,
             output_ordering: None,
+            infinite_source: false,
         }
     }
 
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index a8d3002a9..0ad41d994 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -813,6 +813,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: None,
+                infinite_source: false,
             },
             predicate,
             None,
@@ -1350,6 +1351,7 @@ mod tests {
                     limit: None,
                     table_partition_cols: vec![],
                     output_ordering: None,
+                    infinite_source: false,
                 },
                 None,
                 None,
@@ -1439,6 +1441,7 @@ mod tests {
                     ("day".to_owned(), partition_type_wrap(DataType::Utf8)),
                 ],
                 output_ordering: None,
+                infinite_source: false,
             },
             None,
             None,
@@ -1498,6 +1501,7 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: None,
+                infinite_source: false,
             },
             None,
             None,
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 1dc634b77..758e31a28 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -108,6 +108,13 @@ impl ExecutionPlan for FilterExec {
         self.input.output_partitioning()
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         self.input.output_ordering()
     }
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 1b43b6096..82733cee7 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -35,6 +35,7 @@ use crate::physical_plan::{
 };
 use crate::{error::Result, scalar::ScalarValue};
 use async_trait::async_trait;
+use datafusion_common::DataFusionError;
 use log::debug;
 use std::time::Instant;
 
@@ -143,6 +144,20 @@ impl ExecutionPlan for CrossJoinExec {
         vec![self.left.clone(), self.right.clone()]
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.    
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        if children[0] || children[1] {
+            Err(DataFusionError::Plan(
+                "Cross Join Error: Cross join is not supported for the unbounded inputs."
+                    .to_string(),
+            ))
+        } else {
+            Ok(false)
+        }
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index f15398018..abb5b5e29 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -309,6 +309,37 @@ impl ExecutionPlan for HashJoinExec {
         }
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.    
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        let (left, right) = (children[0], children[1]);
+        // If left is unbounded, or right is unbounded with JoinType::Right,
+        // JoinType::Full, JoinType::RightAnti types.
+        let breaking = left
+            || (right
+                && matches!(
+                    self.join_type,
+                    JoinType::Left
+                        | JoinType::Full
+                        | JoinType::LeftAnti
+                        | JoinType::LeftSemi
+                ));
+
+        if breaking {
+            Err(DataFusionError::Plan(format!(
+                "Join Error: The join with cannot be executed with unbounded inputs. {}",
+                if left && right {
+                    "Currently, we do not support unbounded inputs on both sides."
+                } else {
+                    "Please consider a different type of join or sources."
+                }
+            )))
+        } else {
+            Ok(left || right)
+        }
+    }
+
     fn output_partitioning(&self) -> Partitioning {
         let left_columns_len = self.left.schema().fields.len();
         match self.mode {
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 07b4a6abb..3d6663fbd 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -110,6 +110,13 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// Specifies the output partitioning scheme of this plan
     fn output_partitioning(&self) -> Partitioning;
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+        Ok(false)
+    }
+
     /// If the output of this operator within each partition is sorted,
     /// returns `Some(keys)` with the description of how it was sorted.
     ///
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 3a2765e7f..bc8087c98 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -159,6 +159,13 @@ impl ExecutionPlan for ProjectionExec {
         self.schema.clone()
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![self.input.clone()]
     }
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 3dc0c6d33..967281c1c 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -284,6 +284,13 @@ impl ExecutionPlan for RepartitionExec {
         )?))
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.    
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
     fn output_partitioning(&self) -> Partitioning {
         self.partitioning.clone()
     }
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 85eca5450..89b7d414f 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -627,7 +627,7 @@ fn read_spill(sender: Sender<ArrowResult<RecordBatch>>, path: &Path) -> Result<(
 #[derive(Debug)]
 pub struct SortExec {
     /// Input schema
-    input: Arc<dyn ExecutionPlan>,
+    pub(crate) input: Arc<dyn ExecutionPlan>,
     /// Sort expressions
     expr: Vec<PhysicalSortExpr>,
     /// Containing all metrics set created during sort
@@ -704,6 +704,19 @@ impl ExecutionPlan for SortExec {
         }
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.    
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        if children[0] {
+            Err(DataFusionError::Plan(
+                "Sort Error: Can not sort unbounded inputs.".to_string(),
+            ))
+        } else {
+            Ok(false)
+        }
+    }
+
     fn required_input_distribution(&self) -> Vec<Distribution> {
         if self.preserve_partitioning {
             vec![Distribution::UnspecifiedDistribution]
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index af57c9ef9..3a92cd123 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -123,6 +123,13 @@ impl ExecutionPlan for UnionExec {
         self.schema.clone()
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.    
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children.iter().any(|x| *x))
+    }
+
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         self.inputs.clone()
     }
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index d1ea0af69..23ec2d179 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -54,7 +54,7 @@ use std::task::{Context, Poll};
 #[derive(Debug)]
 pub struct WindowAggExec {
     /// Input plan
-    input: Arc<dyn ExecutionPlan>,
+    pub(crate) input: Arc<dyn ExecutionPlan>,
     /// Window function expression
     window_expr: Vec<Arc<dyn WindowExpr>>,
     /// Schema after the window is run
@@ -206,6 +206,20 @@ impl ExecutionPlan for WindowAggExec {
         }
     }
 
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.    
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        if children[0] {
+            Err(DataFusionError::Plan(
+                "Window Error: Windowing is not currently support for unbounded inputs."
+                    .to_string(),
+            ))
+        } else {
+            Ok(false)
+        }
+    }
+
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         self.output_ordering.as_deref()
     }
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index e7ee746c9..f74b1eb31 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -509,6 +509,76 @@ impl ExecutionPlan for StatisticsExec {
     }
 }
 
+/// A mock execution plan that simply returns the provided data source characteristic
+#[derive(Debug, Clone)]
+pub struct UnboundedExec {
+    unbounded: bool,
+    schema: Arc<Schema>,
+}
+impl UnboundedExec {
+    pub fn new(unbounded: bool, schema: Schema) -> Self {
+        Self {
+            unbounded,
+            schema: Arc::new(schema),
+        }
+    }
+}
+impl ExecutionPlan for UnboundedExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(2)
+    }
+
+    fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
+        Ok(self.unbounded)
+    }
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        unimplemented!("This plan only serves for testing statistics")
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "UnboundableExec: unbounded={}", self.unbounded,)
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
 /// Execution plan that emits streams that block forever.
 ///
 /// This is useful to test shutdown / cancelation behavior of certain execution plans.
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index efa3fece1..cf7594c96 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -182,6 +182,7 @@ pub fn partitioned_csv_config(
         limit: None,
         table_partition_cols: vec![],
         output_ordering: None,
+        infinite_source: false,
     })
 }
 
diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs
new file mode 100644
index 000000000..677347594
--- /dev/null
+++ b/datafusion/core/tests/fifo.rs
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This test demonstrates the DataFusion FIFO capabilities.
+//!
+#[cfg(not(target_os = "windows"))]
+mod unix_test {
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::{
+        prelude::{CsvReadOptions, SessionConfig, SessionContext},
+        test_util::{aggr_test_schema, arrow_test_data},
+    };
+    use datafusion_common::{DataFusionError, Result};
+    use futures::StreamExt;
+    use itertools::enumerate;
+    use nix::sys::stat;
+    use nix::unistd;
+    use rstest::*;
+    use std::fs::{File, OpenOptions};
+    use std::io::Write;
+    use std::path::Path;
+    use std::path::PathBuf;
+    use std::sync::mpsc;
+    use std::sync::mpsc::{Receiver, Sender};
+    use std::sync::{Arc, Mutex};
+    use std::thread;
+    use std::time::{Duration, Instant};
+    use tempfile::TempDir;
+    // !  For the sake of the test, do not alter the numbers. !
+    // Session batch size
+    const TEST_BATCH_SIZE: usize = 20;
+    // Number of lines written to FIFO
+    const TEST_DATA_SIZE: usize = 20_000;
+    // Number of lines what can be joined. Each joinable key produced 20 lines with
+    // aggregate_test_100 dataset. We will use these joinable keys for understanding
+    // incremental execution.
+    const TEST_JOIN_RATIO: f64 = 0.01;
+
+    fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
+        let file_path = tmp_dir.path().join(file_name);
+        // Simulate an infinite environment via a FIFO file
+        if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
+            Err(DataFusionError::Execution(e.to_string()))
+        } else {
+            Ok(file_path)
+        }
+    }
+
+    fn write_to_fifo(
+        mut file: &File,
+        line: &str,
+        ref_time: Instant,
+        broken_pipe_timeout: Duration,
+    ) -> Result<usize> {
+        // We need to handle broken pipe error until the reader is ready. This
+        // is why we use a timeout to limit the wait duration for the reader.
+        // If the error is different than broken pipe, we fail immediately.
+        file.write(line.as_bytes()).or_else(|e| {
+            if e.raw_os_error().unwrap() == 32 {
+                let interval = Instant::now().duration_since(ref_time);
+                if interval < broken_pipe_timeout {
+                    thread::sleep(Duration::from_millis(100));
+                    return Ok(0);
+                }
+            }
+            Err(DataFusionError::Execution(e.to_string()))
+        })
+    }
+
+    async fn create_ctx(
+        fifo_path: &Path,
+        with_unbounded_execution: bool,
+    ) -> Result<SessionContext> {
+        let config = SessionConfig::new()
+            .with_batch_size(TEST_BATCH_SIZE)
+            .set_u64(
+                "datafusion.execution.coalesce_target_batch_size",
+                TEST_BATCH_SIZE as u64,
+            );
+        let ctx = SessionContext::with_config(config);
+        // Register left table
+        let left_schema = Arc::new(Schema::new(vec![
+            Field::new("a1", DataType::Utf8, false),
+            Field::new("a2", DataType::UInt32, false),
+        ]));
+        ctx.register_csv(
+            "left",
+            fifo_path.as_os_str().to_str().unwrap(),
+            CsvReadOptions::new()
+                .schema(left_schema.as_ref())
+                .has_header(false)
+                .mark_infinite(with_unbounded_execution),
+        )
+        .await?;
+        // Register right table
+        let schema = aggr_test_schema();
+        let test_data = arrow_test_data();
+        ctx.register_csv(
+            "right",
+            &format!("{}/csv/aggregate_test_100.csv", test_data),
+            CsvReadOptions::new().schema(schema.as_ref()),
+        )
+        .await?;
+        Ok(ctx)
+    }
+
+    #[derive(Debug, PartialEq)]
+    enum Operation {
+        Read,
+        Write,
+    }
+
+    /// Checks if there is a [Operation::Read] between [Operation::Write]s.
+    /// This indicates we did not wait for the file to finish before processing it.
+    fn interleave(result: &[Operation]) -> bool {
+        let first_read = result.iter().position(|op| op == &Operation::Read);
+        let last_write = result.iter().rev().position(|op| op == &Operation::Write);
+        match (first_read, last_write) {
+            (Some(first_read), Some(last_write)) => {
+                result.len() - 1 - last_write > first_read
+            }
+            (_, _) => false,
+        }
+    }
+
+    // This test provides a relatively realistic end-to-end scenario where
+    // we ensure that we swap join sides correctly to accommodate a FIFO source.
+    #[rstest]
+    #[timeout(std::time::Duration::from_secs(30))]
+    #[tokio::test(flavor = "multi_thread", worker_threads = 5)]
+    async fn unbounded_file_with_swapped_join(
+        #[values(true, false)] unbounded_file: bool,
+    ) -> Result<()> {
+        // To make unbounded deterministic
+        let waiting = Arc::new(Mutex::new(unbounded_file));
+        let waiting_thread = waiting.clone();
+        // Create a channel
+        let (tx, rx): (Sender<Operation>, Receiver<Operation>) = mpsc::channel();
+        // Create a new temporary FIFO file
+        let tmp_dir = TempDir::new()?;
+        let fifo_path = create_fifo_file(&tmp_dir, "fisrt_fifo.csv")?;
+        // Prevent move
+        let fifo_path_thread = fifo_path.clone();
+        // Timeout for a long period of BrokenPipe error
+        let broken_pipe_timeout = Duration::from_secs(5);
+        // The sender endpoint can be copied
+        let thread_tx = tx.clone();
+        // Spawn a new thread to write to the FIFO file
+        let fifo_writer = thread::spawn(move || {
+            let first_file = OpenOptions::new()
+                .write(true)
+                .open(fifo_path_thread)
+                .unwrap();
+            // Reference time to use when deciding to fail the test
+            let execution_start = Instant::now();
+            // Execution can calculated at least one RecordBatch after the number of
+            // "joinable_lines_length" lines are read.
+            let joinable_lines_length =
+                (TEST_DATA_SIZE as f64 * TEST_JOIN_RATIO).round() as usize;
+            // The row including "a" is joinable with aggregate_test_100.c1
+            let joinable_iterator = (0..joinable_lines_length).map(|_| "a".to_string());
+            let second_joinable_iterator =
+                (0..joinable_lines_length).map(|_| "a".to_string());
+            // The row including "zzz" is not joinable with aggregate_test_100.c1
+            let non_joinable_iterator =
+                (0..(TEST_DATA_SIZE - joinable_lines_length)).map(|_| "zzz".to_string());
+            let string_array = joinable_iterator
+                .chain(non_joinable_iterator)
+                .chain(second_joinable_iterator);
+            for (cnt, string_col) in enumerate(string_array) {
+                // Wait a reading sign for unbounded execution
+                // For unbounded execution:
+                //  After joinable_lines_length FIFO reading, we MUST get a Operation::Read.
+                // For bounded execution:
+                //  Never goes into while loop since waiting_thread initiated as false.
+                while *waiting_thread.lock().unwrap() && joinable_lines_length < cnt {
+                    thread::sleep(Duration::from_millis(200));
+                }
+                // Each thread queues a message in the channel
+                if cnt % TEST_BATCH_SIZE == 0 {
+                    thread_tx.send(Operation::Write).unwrap();
+                }
+                let line = format!("{},{}\n", string_col, cnt).to_owned();
+                write_to_fifo(&first_file, &line, execution_start, broken_pipe_timeout)
+                    .unwrap();
+            }
+        });
+        // Collects operations from both writer and executor.
+        let result_collector = thread::spawn(move || {
+            let mut results = vec![];
+            while let Ok(res) = rx.recv() {
+                results.push(res);
+            }
+            results
+        });
+        // Create an execution case with bounded or unbounded flag.
+        let ctx = create_ctx(&fifo_path, unbounded_file).await?;
+        // Execute the query
+        let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1 JOIN right as t2 ON t1.a1 = t2.c1").await?;
+        let mut stream = df.execute_stream().await?;
+        while (stream.next().await).is_some() {
+            *waiting.lock().unwrap() = false;
+            tx.send(Operation::Read).unwrap();
+        }
+        fifo_writer.join().unwrap();
+        drop(tx);
+        let result = result_collector.join().unwrap();
+        assert_eq!(interleave(&result), unbounded_file);
+        Ok(())
+    }
+}
diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs
index d3cf06c34..f40258219 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -83,6 +83,7 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: None,
+            infinite_source: false,
         },
         None,
         None,
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index e62124a33..4d2830ad8 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -69,6 +69,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: None,
+            infinite_source: false,
         },
         Some(filter),
         None,
diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs
index 1f071febe..0652961b0 100644
--- a/datafusion/core/tests/row.rs
+++ b/datafusion/core/tests/row.rs
@@ -113,6 +113,7 @@ async fn get_exec(
                 limit,
                 table_partition_cols: vec![],
                 output_ordering: None,
+                infinite_source: false,
             },
             &[],
         )
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 311ed46ff..01d8e31b3 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -377,6 +377,7 @@ pub fn parse_protobuf_file_scan_config(
         limit: proto.limit.as_ref().map(|sl| sl.limit as usize),
         table_partition_cols,
         output_ordering,
+        infinite_source: false,
     })
 }
 
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index c122b5f43..bd3614df0 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -1477,6 +1477,7 @@ mod roundtrip_tests {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: None,
+            infinite_source: false,
         };
 
         let predicate = datafusion::prelude::col("col").eq(datafusion::prelude::lit("1"));
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
index 5e8f15c0a..eb36b11df 100644
--- a/parquet-test-utils/src/lib.rs
+++ b/parquet-test-utils/src/lib.rs
@@ -144,6 +144,7 @@ impl TestParquetFile {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: None,
+            infinite_source: false,
         };
 
         let df_schema = self.schema.clone().to_dfschema_ref()?;