You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/25 10:12:32 UTC

[arrow-datafusion] branch master updated: move sql tests from `context.rs` to corresponding test files in `tests/sql` (#2329)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 2237c7ed4 move sql tests from `context.rs` to corresponding test files in `tests/sql` (#2329)
2237c7ed4 is described below

commit 2237c7ed43547150219cf7ac6dbf5d07d6c1b75c
Author: DuRipeng <45...@qq.com>
AuthorDate: Mon Apr 25 18:12:25 2022 +0800

    move sql tests from `context.rs` to corresponding test files in `tests/sql` (#2329)
    
    * move context.rs ut out
    
    * fix clippy
    
    * fix fmt
---
 datafusion/core/src/execution/context.rs | 1288 +-----------------------------
 datafusion/core/tests/sql/aggregates.rs  |  303 +++++++
 datafusion/core/tests/sql/functions.rs   |  237 ++++++
 datafusion/core/tests/sql/group_by.rs    |  196 +++++
 datafusion/core/tests/sql/joins.rs       |  107 +++
 datafusion/core/tests/sql/limit.rs       |   74 ++
 datafusion/core/tests/sql/mod.rs         |   95 +++
 datafusion/core/tests/sql/select.rs      |  146 ++++
 datafusion/core/tests/sql/window.rs      |  156 ++++
 9 files changed, 1344 insertions(+), 1258 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index eae531970..5378e38da 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1602,7 +1602,6 @@ impl FunctionRegistry for TaskContext {
 mod tests {
     use super::*;
     use crate::execution::context::QueryPlanner;
-    use crate::from_slice::FromSlice;
     use crate::logical_plan::{binary_expr, lit, Operator};
     use crate::physical_plan::functions::make_scalar_function;
     use crate::test;
@@ -1611,15 +1610,8 @@ mod tests {
         assert_batches_eq, assert_batches_sorted_eq,
         logical_plan::{col, create_udf, sum, Expr},
     };
-    use crate::{
-        datasource::MemTable, logical_plan::create_udaf,
-        physical_plan::expressions::AvgAccumulator,
-    };
-    use arrow::array::{
-        Array, ArrayRef, DictionaryArray, Float32Array, Float64Array, Int16Array,
-        Int32Array, Int64Array, Int8Array, LargeStringArray, UInt16Array, UInt32Array,
-        UInt64Array, UInt8Array,
-    };
+    use crate::{logical_plan::create_udaf, physical_plan::expressions::AvgAccumulator};
+    use arrow::array::ArrayRef;
     use arrow::datatypes::*;
     use arrow::record_batch::RecordBatch;
     use async_trait::async_trait;
@@ -1663,783 +1655,48 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn create_variable_expr() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let partition_count = 4;
-        let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
-
-        let variable_provider = test::variable::SystemVar::new();
-        ctx.register_variable(VarType::System, Arc::new(variable_provider));
-        let variable_provider = test::variable::UserDefinedVar::new();
-        ctx.register_variable(VarType::UserDefined, Arc::new(variable_provider));
-
-        let provider = test::create_table_dual();
-        ctx.register_table("dual", provider)?;
-
-        let results =
-            plan_and_collect(&ctx, "SELECT @@version, @name, @integer + 1 FROM dual")
-                .await?;
-
-        let expected = vec![
-            "+----------------------+------------------------+------------------------+",
-            "| @@version            | @name                  | @integer Plus Int64(1) |",
-            "+----------------------+------------------------+------------------------+",
-            "| system-var-@@version | user-defined-var-@name | 42                     |",
-            "+----------------------+------------------------+------------------------+",
-        ];
-        assert_batches_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn register_deregister() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let partition_count = 4;
-        let ctx = create_ctx(&tmp_dir, partition_count).await?;
-
-        let provider = test::create_table_dual();
-        ctx.register_table("dual", provider)?;
-
-        assert!(ctx.deregister_table("dual")?.is_some());
-        assert!(ctx.deregister_table("dual")?.is_none());
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn left_join_using() -> Result<()> {
-        let results = execute(
-            "SELECT t1.c1, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
-            1,
-        )
-        .await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+----+----+",
-            "| c1 | c2 |",
-            "+----+----+",
-            "| 0  | 1  |",
-            "| 0  | 2  |",
-            "| 0  | 3  |",
-            "| 0  | 4  |",
-            "| 0  | 5  |",
-            "| 0  | 6  |",
-            "| 0  | 7  |",
-            "| 0  | 8  |",
-            "| 0  | 9  |",
-            "| 0  | 10 |",
-            "+----+----+",
-        ];
-
-        assert_batches_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn left_join_using_join_key_projection() -> Result<()> {
-        let results = execute(
-            "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
-            1,
-        )
-            .await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+----+----+----+",
-            "| c1 | c2 | c2 |",
-            "+----+----+----+",
-            "| 0  | 1  | 1  |",
-            "| 0  | 2  | 2  |",
-            "| 0  | 3  | 3  |",
-            "| 0  | 4  | 4  |",
-            "| 0  | 5  | 5  |",
-            "| 0  | 6  | 6  |",
-            "| 0  | 7  | 7  |",
-            "| 0  | 8  | 8  |",
-            "| 0  | 9  | 9  |",
-            "| 0  | 10 | 10 |",
-            "+----+----+----+",
-        ];
-
-        assert_batches_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn left_join() -> Result<()> {
-        let results = execute(
-            "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 ON t1.c2 = t2.c2 ORDER BY t1.c2",
-            1,
-        )
-            .await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+----+----+----+",
-            "| c1 | c2 | c2 |",
-            "+----+----+----+",
-            "| 0  | 1  | 1  |",
-            "| 0  | 2  | 2  |",
-            "| 0  | 3  | 3  |",
-            "| 0  | 4  | 4  |",
-            "| 0  | 5  | 5  |",
-            "| 0  | 6  | 6  |",
-            "| 0  | 7  | 7  |",
-            "| 0  | 8  | 8  |",
-            "| 0  | 9  | 9  |",
-            "| 0  | 10 | 10 |",
-            "+----+----+----+",
-        ];
-
-        assert_batches_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn window() -> Result<()> {
-        let results = execute(
-            "SELECT \
-            c1, \
-            c2, \
-            SUM(c2) OVER (), \
-            COUNT(c2) OVER (), \
-            MAX(c2) OVER (), \
-            MIN(c2) OVER (), \
-            AVG(c2) OVER () \
-            FROM test \
-            ORDER BY c1, c2 \
-            LIMIT 5",
-            4,
-        )
-        .await?;
-        // result in one batch, although e.g. having 2 batches do not change
-        // result semantics, having a len=1 assertion upfront keeps surprises
-        // at bay
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+----+----+--------------+----------------+--------------+--------------+--------------+",
-            "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
-            "+----+----+--------------+----------------+--------------+--------------+--------------+",
-            "| 0  | 1  | 220          | 40             | 10           | 1            | 5.5          |",
-            "| 0  | 2  | 220          | 40             | 10           | 1            | 5.5          |",
-            "| 0  | 3  | 220          | 40             | 10           | 1            | 5.5          |",
-            "| 0  | 4  | 220          | 40             | 10           | 1            | 5.5          |",
-            "| 0  | 5  | 220          | 40             | 10           | 1            | 5.5          |",
-            "+----+----+--------------+----------------+--------------+--------------+--------------+",
-        ];
-
-        // window function shall respect ordering
-        assert_batches_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn window_order_by() -> Result<()> {
-        let results = execute(
-            "SELECT \
-            c1, \
-            c2, \
-            ROW_NUMBER() OVER (ORDER BY c1, c2), \
-            FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \
-            LAST_VALUE(c2) OVER (ORDER BY c1, c2), \
-            NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \
-            SUM(c2) OVER (ORDER BY c1, c2), \
-            COUNT(c2) OVER (ORDER BY c1, c2), \
-            MAX(c2) OVER (ORDER BY c1, c2), \
-            MIN(c2) OVER (ORDER BY c1, c2), \
-            AVG(c2) OVER (ORDER BY c1, c2) \
-            FROM test \
-            ORDER BY c1, c2 \
-            LIMIT 5",
-            4,
-        )
-        .await?;
-        // result in one batch, although e.g. having 2 batches do not change
-        // result semantics, having a len=1 assertion upfront keeps surprises
-        // at bay
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
-            "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2) | LAST_VALUE(test.c2) | NTH_VALUE(test.c2,Int64(2)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
-            "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
-            "| 0  | 1  | 1            | 1                    | 1                   |                             | 1            | 1              | 1            | 1            | 1            |",
-            "| 0  | 2  | 2            | 1                    | 2                   | 2                           | 3            | 2              | 2            | 1            | 1.5          |",
-            "| 0  | 3  | 3            | 1                    | 3                   | 2                           | 6            | 3              | 3            | 1            | 2            |",
-            "| 0  | 4  | 4            | 1                    | 4                   | 2                           | 10           | 4              | 4            | 1            | 2.5          |",
-            "| 0  | 5  | 5            | 1                    | 5                   | 2                           | 15           | 5              | 5            | 1            | 3            |",
-            "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
-        ];
-
-        // window function shall respect ordering
-        assert_batches_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn window_partition_by() -> Result<()> {
-        let results = execute(
-            "SELECT \
-            c1, \
-            c2, \
-            SUM(c2) OVER (PARTITION BY c2), \
-            COUNT(c2) OVER (PARTITION BY c2), \
-            MAX(c2) OVER (PARTITION BY c2), \
-            MIN(c2) OVER (PARTITION BY c2), \
-            AVG(c2) OVER (PARTITION BY c2) \
-            FROM test \
-            ORDER BY c1, c2 \
-            LIMIT 5",
-            4,
-        )
-        .await?;
-
-        let expected = vec![
-            "+----+----+--------------+----------------+--------------+--------------+--------------+",
-            "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
-            "+----+----+--------------+----------------+--------------+--------------+--------------+",
-            "| 0  | 1  | 4            | 4              | 1            | 1            | 1            |",
-            "| 0  | 2  | 8            | 4              | 2            | 2            | 2            |",
-            "| 0  | 3  | 12           | 4              | 3            | 3            | 3            |",
-            "| 0  | 4  | 16           | 4              | 4            | 4            | 4            |",
-            "| 0  | 5  | 20           | 4              | 5            | 5            | 5            |",
-            "+----+----+--------------+----------------+--------------+--------------+--------------+",
-        ];
-
-        // window function shall respect ordering
-        assert_batches_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn window_partition_by_order_by() -> Result<()> {
-        let results = execute(
-            "SELECT \
-            c1, \
-            c2, \
-            ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1), \
-            FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
-            LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
-            NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1), \
-            SUM(c2) OVER (PARTITION BY c2 ORDER BY c1), \
-            COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1), \
-            MAX(c2) OVER (PARTITION BY c2 ORDER BY c1), \
-            MIN(c2) OVER (PARTITION BY c2 ORDER BY c1), \
-            AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) \
-            FROM test \
-            ORDER BY c1, c2 \
-            LIMIT 5",
-            4,
-        )
-        .await?;
-
-        let expected = vec![
-            "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
-            "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2 + test.c1) | LAST_VALUE(test.c2 + test.c1) | NTH_VALUE(test.c2 + test.c1,Int64(1)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
-            "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
-            "| 0  | 1  | 1            | 1                              | 1                             | 1                                     | 1            | 1              | 1            | 1            | 1            |",
-            "| 0  | 2  | 1            | 2                              | 2                             | 2                                     | 2            | 1              | 2            | 2            | 2            |",
-            "| 0  | 3  | 1            | 3                              | 3                             | 3                                     | 3            | 1              | 3            | 3            | 3            |",
-            "| 0  | 4  | 1            | 4                              | 4                             | 4                                     | 4            | 1              | 4            | 4            | 4            |",
-            "| 0  | 5  | 1            | 5                              | 5                             | 5                                     | 5            | 1              | 5            | 5            | 5            |",
-            "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
-        ];
-
-        // window function shall respect ordering
-        assert_batches_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_decimal_min() -> Result<()> {
-        let ctx = SessionContext::new();
-        // the data type of c1 is decimal(10,3)
-        ctx.register_table("d_table", test::table_with_decimal())
-            .unwrap();
-        let result = plan_and_collect(&ctx, "select min(c1) from d_table")
-            .await
-            .unwrap();
-        let expected = vec![
-            "+-----------------+",
-            "| MIN(d_table.c1) |",
-            "+-----------------+",
-            "| -100.009        |",
-            "+-----------------+",
-        ];
-        assert_eq!(
-            &DataType::Decimal(10, 3),
-            result[0].schema().field(0).data_type()
-        );
-        assert_batches_sorted_eq!(expected, &result);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_decimal_max() -> Result<()> {
-        let ctx = SessionContext::new();
-        // the data type of c1 is decimal(10,3)
-        ctx.register_table("d_table", test::table_with_decimal())
-            .unwrap();
-
-        let result = plan_and_collect(&ctx, "select max(c1) from d_table")
-            .await
-            .unwrap();
-        let expected = vec![
-            "+-----------------+",
-            "| MAX(d_table.c1) |",
-            "+-----------------+",
-            "| 110.009         |",
-            "+-----------------+",
-        ];
-        assert_eq!(
-            &DataType::Decimal(10, 3),
-            result[0].schema().field(0).data_type()
-        );
-        assert_batches_sorted_eq!(expected, &result);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_decimal_sum() -> Result<()> {
-        let ctx = SessionContext::new();
-        // the data type of c1 is decimal(10,3)
-        ctx.register_table("d_table", test::table_with_decimal())
-            .unwrap();
-        let result = plan_and_collect(&ctx, "select sum(c1) from d_table")
-            .await
-            .unwrap();
-        let expected = vec![
-            "+-----------------+",
-            "| SUM(d_table.c1) |",
-            "+-----------------+",
-            "| 100.000         |",
-            "+-----------------+",
-        ];
-        assert_eq!(
-            &DataType::Decimal(20, 3),
-            result[0].schema().field(0).data_type()
-        );
-        assert_batches_sorted_eq!(expected, &result);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_decimal_avg() -> Result<()> {
-        let ctx = SessionContext::new();
-        // the data type of c1 is decimal(10,3)
-        ctx.register_table("d_table", test::table_with_decimal())
-            .unwrap();
-        let result = plan_and_collect(&ctx, "select avg(c1) from d_table")
-            .await
-            .unwrap();
-        let expected = vec![
-            "+-----------------+",
-            "| AVG(d_table.c1) |",
-            "+-----------------+",
-            "| 5.0000000       |",
-            "+-----------------+",
-        ];
-        assert_eq!(
-            &DataType::Decimal(14, 7),
-            result[0].schema().field(0).data_type()
-        );
-        assert_batches_sorted_eq!(expected, &result);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate() -> Result<()> {
-        let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+--------------+--------------+",
-            "| SUM(test.c1) | SUM(test.c2) |",
-            "+--------------+--------------+",
-            "| 60           | 220          |",
-            "+--------------+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_empty() -> Result<()> {
-        // The predicate on this query purposely generates no results
-        let results = execute("SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000", 4)
-            .await
-            .unwrap();
-
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+--------------+--------------+",
-            "| SUM(test.c1) | SUM(test.c2) |",
-            "+--------------+--------------+",
-            "|              |              |",
-            "+--------------+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_avg() -> Result<()> {
-        let results = execute("SELECT AVG(c1), AVG(c2) FROM test", 4).await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+--------------+--------------+",
-            "| AVG(test.c1) | AVG(test.c2) |",
-            "+--------------+--------------+",
-            "| 1.5          | 5.5          |",
-            "+--------------+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_max() -> Result<()> {
-        let results = execute("SELECT MAX(c1), MAX(c2) FROM test", 4).await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+--------------+--------------+",
-            "| MAX(test.c1) | MAX(test.c2) |",
-            "+--------------+--------------+",
-            "| 3            | 10           |",
-            "+--------------+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_min() -> Result<()> {
-        let results = execute("SELECT MIN(c1), MIN(c2) FROM test", 4).await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+--------------+--------------+",
-            "| MIN(test.c1) | MIN(test.c2) |",
-            "+--------------+--------------+",
-            "| 0            | 1            |",
-            "+--------------+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_grouped() -> Result<()> {
-        let results = execute("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?;
-
-        let expected = vec![
-            "+----+--------------+",
-            "| c1 | SUM(test.c2) |",
-            "+----+--------------+",
-            "| 0  | 55           |",
-            "| 1  | 55           |",
-            "| 2  | 55           |",
-            "| 3  | 55           |",
-            "+----+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_grouped_avg() -> Result<()> {
-        let results = execute("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?;
-
-        let expected = vec![
-            "+----+--------------+",
-            "| c1 | AVG(test.c2) |",
-            "+----+--------------+",
-            "| 0  | 5.5          |",
-            "| 1  | 5.5          |",
-            "| 2  | 5.5          |",
-            "| 3  | 5.5          |",
-            "+----+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn boolean_literal() -> Result<()> {
-        let results =
-            execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?;
-
-        let expected = vec![
-            "+----+------+",
-            "| c1 | c3   |",
-            "+----+------+",
-            "| 3  | true |",
-            "| 3  | true |",
-            "| 3  | true |",
-            "| 3  | true |",
-            "| 3  | true |",
-            "+----+------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_grouped_empty() -> Result<()> {
-        let results =
-            execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?;
-
-        let expected = vec![
-            "+----+--------------+",
-            "| c1 | AVG(test.c2) |",
-            "+----+--------------+",
-            "+----+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_grouped_max() -> Result<()> {
-        let results = execute("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?;
-
-        let expected = vec![
-            "+----+--------------+",
-            "| c1 | MAX(test.c2) |",
-            "+----+--------------+",
-            "| 0  | 10           |",
-            "| 1  | 10           |",
-            "| 2  | 10           |",
-            "| 3  | 10           |",
-            "+----+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_grouped_min() -> Result<()> {
-        let results = execute("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?;
-
-        let expected = vec![
-            "+----+--------------+",
-            "| c1 | MIN(test.c2) |",
-            "+----+--------------+",
-            "| 0  | 1            |",
-            "| 1  | 1            |",
-            "| 2  | 1            |",
-            "| 3  | 1            |",
-            "+----+--------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_avg_add() -> Result<()> {
-        let results = execute(
-            "SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test",
-            4,
-        )
-        .await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+--------------+----------------------------+----------------------------+----------------------------+",
-            "| AVG(test.c1) | AVG(test.c1) Plus Int64(1) | AVG(test.c1) Plus Int64(2) | Int64(1) Plus AVG(test.c1) |",
-            "+--------------+----------------------------+----------------------------+----------------------------+",
-            "| 1.5          | 2.5                        | 3.5                        | 2.5                        |",
-            "+--------------+----------------------------+----------------------------+----------------------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn join_partitioned() -> Result<()> {
-        // self join on partition id (workaround for duplicate column name)
-        let results = execute(
-            "SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1",
-            4,
-        )
-        .await?;
-
-        assert_eq!(
-            results.iter().map(|b| b.num_rows()).sum::<usize>(),
-            4 * 10 * 10
-        );
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn count_basic() -> Result<()> {
-        let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+----------------+----------------+",
-            "| COUNT(test.c1) | COUNT(test.c2) |",
-            "+----------------+----------------+",
-            "| 10             | 10             |",
-            "+----------------+----------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn count_partitioned() -> Result<()> {
-        let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 4).await?;
-        assert_eq!(results.len(), 1);
-
-        let expected = vec![
-            "+----------------+----------------+",
-            "| COUNT(test.c1) | COUNT(test.c2) |",
-            "+----------------+----------------+",
-            "| 40             | 40             |",
-            "+----------------+----------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn count_aggregated() -> Result<()> {
-        let results = execute("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?;
-
-        let expected = vec![
-            "+----+----------------+",
-            "| c1 | COUNT(test.c2) |",
-            "+----+----------------+",
-            "| 0  | 10             |",
-            "| 1  | 10             |",
-            "| 2  | 10             |",
-            "| 3  | 10             |",
-            "+----+----------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn group_by_date_trunc() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let ctx = SessionContext::new();
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("c2", DataType::UInt64, false),
-            Field::new(
-                "t1",
-                DataType::Timestamp(TimeUnit::Microsecond, None),
-                false,
-            ),
-        ]));
-
-        // generate a partitioned file
-        for partition in 0..4 {
-            let filename = format!("partition-{}.{}", partition, "csv");
-            let file_path = tmp_dir.path().join(&filename);
-            let mut file = File::create(file_path)?;
+    async fn create_variable_expr() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let partition_count = 4;
+        let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
 
-            // generate some data
-            for i in 0..10 {
-                let data = format!("{},2020-12-{}T00:00:00.000Z\n", i, i + 10);
-                file.write_all(data.as_bytes())?;
-            }
-        }
+        let variable_provider = test::variable::SystemVar::new();
+        ctx.register_variable(VarType::System, Arc::new(variable_provider));
+        let variable_provider = test::variable::UserDefinedVar::new();
+        ctx.register_variable(VarType::UserDefined, Arc::new(variable_provider));
 
-        ctx.register_csv(
-            "test",
-            tmp_dir.path().to_str().unwrap(),
-            CsvReadOptions::new().schema(&schema).has_header(false),
-        )
-        .await?;
+        let provider = test::create_table_dual();
+        ctx.register_table("dual", provider)?;
 
-        let results = plan_and_collect(
-            &ctx,
-            "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)",
-        ).await?;
+        let results =
+            plan_and_collect(&ctx, "SELECT @@version, @name, @integer + 1 FROM dual")
+                .await?;
 
         let expected = vec![
-            "+---------------------+--------------+",
-            "| week                | SUM(test.c2) |",
-            "+---------------------+--------------+",
-            "| 2020-12-07 00:00:00 | 24           |",
-            "| 2020-12-14 00:00:00 | 156          |",
-            "+---------------------+--------------+",
+            "+----------------------+------------------------+------------------------+",
+            "| @@version            | @name                  | @integer Plus Int64(1) |",
+            "+----------------------+------------------------+------------------------+",
+            "| system-var-@@version | user-defined-var-@name | 42                     |",
+            "+----------------------+------------------------+------------------------+",
         ];
-        assert_batches_sorted_eq!(expected, &results);
+        assert_batches_eq!(expected, &results);
 
         Ok(())
     }
 
     #[tokio::test]
-    async fn group_by_largeutf8() {
-        {
-            let ctx = SessionContext::new();
-
-            // input data looks like:
-            // A, 1
-            // B, 2
-            // A, 2
-            // A, 4
-            // C, 1
-            // A, 1
-
-            let str_array: LargeStringArray = vec!["A", "B", "A", "A", "C", "A"]
-                .into_iter()
-                .map(Some)
-                .collect();
-            let str_array = Arc::new(str_array);
-
-            let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into();
-            let val_array = Arc::new(val_array);
-
-            let schema = Arc::new(Schema::new(vec![
-                Field::new("str", str_array.data_type().clone(), false),
-                Field::new("val", val_array.data_type().clone(), false),
-            ]));
-
-            let batch =
-                RecordBatch::try_new(schema.clone(), vec![str_array, val_array]).unwrap();
+    async fn register_deregister() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let partition_count = 4;
+        let ctx = create_ctx(&tmp_dir, partition_count).await?;
 
-            let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
-            ctx.register_table("t", Arc::new(provider)).unwrap();
+        let provider = test::create_table_dual();
+        ctx.register_table("dual", provider)?;
 
-            let results =
-                plan_and_collect(&ctx, "SELECT str, count(val) FROM t GROUP BY str")
-                    .await
-                    .expect("ran plan correctly");
+        assert!(ctx.deregister_table("dual")?.is_some());
+        assert!(ctx.deregister_table("dual")?.is_none());
 
-            let expected = vec![
-                "+-----+--------------+",
-                "| str | COUNT(t.val) |",
-                "+-----+--------------+",
-                "| A   | 4            |",
-                "| B   | 1            |",
-                "| C   | 1            |",
-                "+-----+--------------+",
-            ];
-            assert_batches_sorted_eq!(expected, &results);
-        }
+        Ok(())
     }
 
     #[tokio::test]
@@ -2466,224 +1723,6 @@ mod tests {
         assert_batches_sorted_eq!(expected, &results);
     }
 
-    #[tokio::test]
-    async fn group_by_dictionary() {
-        async fn run_test_case<K: ArrowDictionaryKeyType>() {
-            let ctx = SessionContext::new();
-
-            // input data looks like:
-            // A, 1
-            // B, 2
-            // A, 2
-            // A, 4
-            // C, 1
-            // A, 1
-
-            let dict_array: DictionaryArray<K> =
-                vec!["A", "B", "A", "A", "C", "A"].into_iter().collect();
-            let dict_array = Arc::new(dict_array);
-
-            let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into();
-            let val_array = Arc::new(val_array);
-
-            let schema = Arc::new(Schema::new(vec![
-                Field::new("dict", dict_array.data_type().clone(), false),
-                Field::new("val", val_array.data_type().clone(), false),
-            ]));
-
-            let batch = RecordBatch::try_new(schema.clone(), vec![dict_array, val_array])
-                .unwrap();
-
-            let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
-            ctx.register_table("t", Arc::new(provider)).unwrap();
-
-            let results =
-                plan_and_collect(&ctx, "SELECT dict, count(val) FROM t GROUP BY dict")
-                    .await
-                    .expect("ran plan correctly");
-
-            let expected = vec![
-                "+------+--------------+",
-                "| dict | COUNT(t.val) |",
-                "+------+--------------+",
-                "| A    | 4            |",
-                "| B    | 1            |",
-                "| C    | 1            |",
-                "+------+--------------+",
-            ];
-            assert_batches_sorted_eq!(expected, &results);
-
-            // Now, use dict as an aggregate
-            let results =
-                plan_and_collect(&ctx, "SELECT val, count(dict) FROM t GROUP BY val")
-                    .await
-                    .expect("ran plan correctly");
-
-            let expected = vec![
-                "+-----+---------------+",
-                "| val | COUNT(t.dict) |",
-                "+-----+---------------+",
-                "| 1   | 3             |",
-                "| 2   | 2             |",
-                "| 4   | 1             |",
-                "+-----+---------------+",
-            ];
-            assert_batches_sorted_eq!(expected, &results);
-
-            // Now, use dict as an aggregate
-            let results = plan_and_collect(
-                &ctx,
-                "SELECT val, count(distinct dict) FROM t GROUP BY val",
-            )
-            .await
-            .expect("ran plan correctly");
-
-            let expected = vec![
-                "+-----+------------------------+",
-                "| val | COUNT(DISTINCT t.dict) |",
-                "+-----+------------------------+",
-                "| 1   | 2                      |",
-                "| 2   | 2                      |",
-                "| 4   | 1                      |",
-                "+-----+------------------------+",
-            ];
-            assert_batches_sorted_eq!(expected, &results);
-        }
-
-        run_test_case::<Int8Type>().await;
-        run_test_case::<Int16Type>().await;
-        run_test_case::<Int32Type>().await;
-        run_test_case::<Int64Type>().await;
-        run_test_case::<UInt8Type>().await;
-        run_test_case::<UInt16Type>().await;
-        run_test_case::<UInt32Type>().await;
-        run_test_case::<UInt64Type>().await;
-    }
-
-    async fn run_count_distinct_integers_aggregated_scenario(
-        partitions: Vec<Vec<(&str, u64)>>,
-    ) -> Result<Vec<RecordBatch>> {
-        let tmp_dir = TempDir::new()?;
-        let ctx = SessionContext::new();
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("c_group", DataType::Utf8, false),
-            Field::new("c_int8", DataType::Int8, false),
-            Field::new("c_int16", DataType::Int16, false),
-            Field::new("c_int32", DataType::Int32, false),
-            Field::new("c_int64", DataType::Int64, false),
-            Field::new("c_uint8", DataType::UInt8, false),
-            Field::new("c_uint16", DataType::UInt16, false),
-            Field::new("c_uint32", DataType::UInt32, false),
-            Field::new("c_uint64", DataType::UInt64, false),
-        ]));
-
-        for (i, partition) in partitions.iter().enumerate() {
-            let filename = format!("partition-{}.csv", i);
-            let file_path = tmp_dir.path().join(&filename);
-            let mut file = File::create(file_path)?;
-            for row in partition {
-                let row_str = format!(
-                    "{},{}\n",
-                    row.0,
-                    // Populate values for each of the integer fields in the
-                    // schema.
-                    (0..8)
-                        .map(|_| { row.1.to_string() })
-                        .collect::<Vec<_>>()
-                        .join(","),
-                );
-                file.write_all(row_str.as_bytes())?;
-            }
-        }
-        ctx.register_csv(
-            "test",
-            tmp_dir.path().to_str().unwrap(),
-            CsvReadOptions::new().schema(&schema).has_header(false),
-        )
-        .await?;
-
-        let results = plan_and_collect(
-            &ctx,
-            "
-              SELECT
-                c_group,
-                COUNT(c_uint64),
-                COUNT(DISTINCT c_int8),
-                COUNT(DISTINCT c_int16),
-                COUNT(DISTINCT c_int32),
-                COUNT(DISTINCT c_int64),
-                COUNT(DISTINCT c_uint8),
-                COUNT(DISTINCT c_uint16),
-                COUNT(DISTINCT c_uint32),
-                COUNT(DISTINCT c_uint64)
-              FROM test
-              GROUP BY c_group
-            ",
-        )
-        .await?;
-
-        Ok(results)
-    }
-
-    #[tokio::test]
-    async fn count_distinct_integers_aggregated_single_partition() -> Result<()> {
-        let partitions = vec![
-            // The first member of each tuple will be the value for the
-            // `c_group` column, and the second member will be the value for
-            // each of the int/uint fields.
-            vec![
-                ("a", 1),
-                ("a", 1),
-                ("a", 2),
-                ("b", 9),
-                ("c", 9),
-                ("c", 10),
-                ("c", 9),
-            ],
-        ];
-
-        let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
-
-        let expected = vec![
-            "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
-            "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
-            "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
-            "| a       | 3                    | 2                           | 2                            | 2                            | 2                            | 2                            | 2                             | 2                             | 2                             |",
-            "| b       | 1                    | 1                           | 1                            | 1                            | 1                            | 1                            | 1                             | 1                             | 1                             |",
-            "| c       | 3                    | 2                           | 2                            | 2                            | 2                            | 2                            | 2                             | 2                             | 2                             |",
-            "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()> {
-        let partitions = vec![
-            // The first member of each tuple will be the value for the
-            // `c_group` column, and the second member will be the value for
-            // each of the int/uint fields.
-            vec![("a", 1), ("a", 1), ("a", 2), ("b", 9), ("c", 9)],
-            vec![("a", 1), ("a", 3), ("b", 8), ("b", 9), ("b", 10), ("b", 11)],
-        ];
-
-        let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
-
-        let expected = vec![
-            "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
-            "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
-            "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
-            "| a       | 5                    | 3                           | 3                            | 3                            | 3                            | 3                            | 3                             | 3                             | 3                             |",
-            "| b       | 5                    | 4                           | 4                            | 4                            | 4                            | 4                            | 4                             | 4                             | 4                             |",
-            "| c       | 1                    | 1                           | 1                            | 1                            | 1                            | 1                            | 1                             | 1                             | 1                             |",
-            "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
     #[tokio::test]
     async fn aggregate_with_alias() -> Result<()> {
         let tmp_dir = TempDir::new()?;
@@ -2710,190 +1749,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn limit() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let ctx = create_ctx(&tmp_dir, 1).await?;
-        ctx.register_table("t", test::table_with_sequence(1, 1000).unwrap())
-            .unwrap();
-
-        let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i DESC limit 3")
-            .await
-            .unwrap();
-
-        let expected = vec![
-            "+------+", "| i    |", "+------+", "| 1000 |", "| 999  |", "| 998  |",
-            "+------+",
-        ];
-
-        assert_batches_eq!(expected, &results);
-
-        let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i limit 3")
-            .await
-            .unwrap();
-
-        let expected = vec![
-            "+---+", "| i |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+",
-        ];
-
-        assert_batches_eq!(expected, &results);
-
-        let results = plan_and_collect(&ctx, "SELECT i FROM t limit 3")
-            .await
-            .unwrap();
-
-        // the actual rows are not guaranteed, so only check the count (should be 3)
-        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
-        assert_eq!(num_rows, 3);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn limit_multi_partitions() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let ctx = create_ctx(&tmp_dir, 1).await?;
-
-        let partitions = vec![
-            vec![test::make_partition(0)],
-            vec![test::make_partition(1)],
-            vec![test::make_partition(2)],
-            vec![test::make_partition(3)],
-            vec![test::make_partition(4)],
-            vec![test::make_partition(5)],
-        ];
-        let schema = partitions[0][0].schema();
-        let provider = Arc::new(MemTable::try_new(schema, partitions).unwrap());
-
-        ctx.register_table("t", provider).unwrap();
-
-        // select all rows
-        let results = plan_and_collect(&ctx, "SELECT i FROM t").await.unwrap();
-
-        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
-        assert_eq!(num_rows, 15);
-
-        for limit in 1..10 {
-            let query = format!("SELECT i FROM t limit {}", limit);
-            let results = plan_and_collect(&ctx, &query).await.unwrap();
-
-            let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
-            assert_eq!(num_rows, limit, "mismatch with query {}", query);
-        }
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn case_sensitive_identifiers_functions() {
-        let ctx = SessionContext::new();
-        ctx.register_table("t", test::table_with_sequence(1, 1).unwrap())
-            .unwrap();
-
-        let expected = vec![
-            "+-----------+",
-            "| sqrt(t.i) |",
-            "+-----------+",
-            "| 1         |",
-            "+-----------+",
-        ];
-
-        let results = plan_and_collect(&ctx, "SELECT sqrt(i) FROM t")
-            .await
-            .unwrap();
-
-        assert_batches_sorted_eq!(expected, &results);
-
-        let results = plan_and_collect(&ctx, "SELECT SQRT(i) FROM t")
-            .await
-            .unwrap();
-        assert_batches_sorted_eq!(expected, &results);
-
-        // Using double quotes allows specifying the function name with capitalization
-        let err = plan_and_collect(&ctx, "SELECT \"SQRT\"(i) FROM t")
-            .await
-            .unwrap_err();
-        assert_eq!(
-            err.to_string(),
-            "Error during planning: Invalid function 'SQRT'"
-        );
-
-        let results = plan_and_collect(&ctx, "SELECT \"sqrt\"(i) FROM t")
-            .await
-            .unwrap();
-        assert_batches_sorted_eq!(expected, &results);
-    }
-
-    #[tokio::test]
-    async fn case_builtin_math_expression() {
-        let ctx = SessionContext::new();
-
-        let type_values = vec![
-            (
-                DataType::Int8,
-                Arc::new(Int8Array::from_slice(&[1])) as ArrayRef,
-            ),
-            (
-                DataType::Int16,
-                Arc::new(Int16Array::from_slice(&[1])) as ArrayRef,
-            ),
-            (
-                DataType::Int32,
-                Arc::new(Int32Array::from_slice(&[1])) as ArrayRef,
-            ),
-            (
-                DataType::Int64,
-                Arc::new(Int64Array::from_slice(&[1])) as ArrayRef,
-            ),
-            (
-                DataType::UInt8,
-                Arc::new(UInt8Array::from_slice(&[1])) as ArrayRef,
-            ),
-            (
-                DataType::UInt16,
-                Arc::new(UInt16Array::from_slice(&[1])) as ArrayRef,
-            ),
-            (
-                DataType::UInt32,
-                Arc::new(UInt32Array::from_slice(&[1])) as ArrayRef,
-            ),
-            (
-                DataType::UInt64,
-                Arc::new(UInt64Array::from_slice(&[1])) as ArrayRef,
-            ),
-            (
-                DataType::Float32,
-                Arc::new(Float32Array::from_slice(&[1.0_f32])) as ArrayRef,
-            ),
-            (
-                DataType::Float64,
-                Arc::new(Float64Array::from_slice(&[1.0_f64])) as ArrayRef,
-            ),
-        ];
-
-        for (data_type, array) in type_values.iter() {
-            let schema =
-                Arc::new(Schema::new(vec![Field::new("v", data_type.clone(), false)]));
-            let batch =
-                RecordBatch::try_new(schema.clone(), vec![array.clone()]).unwrap();
-            let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
-            ctx.deregister_table("t").unwrap();
-            ctx.register_table("t", Arc::new(provider)).unwrap();
-            let expected = vec![
-                "+-----------+",
-                "| sqrt(t.v) |",
-                "+-----------+",
-                "| 1         |",
-                "+-----------+",
-            ];
-            let results = plan_and_collect(&ctx, "SELECT sqrt(v) FROM t")
-                .await
-                .unwrap();
-
-            assert_batches_sorted_eq!(expected, &results);
-        }
-    }
-
     #[tokio::test]
     async fn case_sensitive_identifiers_user_defined_functions() -> Result<()> {
         let mut ctx = SessionContext::new();
@@ -2935,46 +1790,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn case_sensitive_identifiers_aggregates() {
-        let ctx = SessionContext::new();
-        ctx.register_table("t", test::table_with_sequence(1, 1).unwrap())
-            .unwrap();
-
-        let expected = vec![
-            "+----------+",
-            "| MAX(t.i) |",
-            "+----------+",
-            "| 1        |",
-            "+----------+",
-        ];
-
-        let results = plan_and_collect(&ctx, "SELECT max(i) FROM t")
-            .await
-            .unwrap();
-
-        assert_batches_sorted_eq!(expected, &results);
-
-        let results = plan_and_collect(&ctx, "SELECT MAX(i) FROM t")
-            .await
-            .unwrap();
-        assert_batches_sorted_eq!(expected, &results);
-
-        // Using double quotes allows specifying the function name with capitalization
-        let err = plan_and_collect(&ctx, "SELECT \"MAX\"(i) FROM t")
-            .await
-            .unwrap_err();
-        assert_eq!(
-            err.to_string(),
-            "Error during planning: Invalid function 'MAX'"
-        );
-
-        let results = plan_and_collect(&ctx, "SELECT \"max\"(i) FROM t")
-            .await
-            .unwrap();
-        assert_batches_sorted_eq!(expected, &results);
-    }
-
     #[tokio::test]
     async fn case_sensitive_identifiers_user_defined_aggregates() -> Result<()> {
         let mut ctx = SessionContext::new();
@@ -3097,42 +1912,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn simple_avg() -> Result<()> {
-        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
-
-        let batch1 = RecordBatch::try_new(
-            Arc::new(schema.clone()),
-            vec![Arc::new(Int32Array::from_slice(&[1, 2, 3]))],
-        )?;
-        let batch2 = RecordBatch::try_new(
-            Arc::new(schema.clone()),
-            vec![Arc::new(Int32Array::from_slice(&[4, 5]))],
-        )?;
-
-        let ctx = SessionContext::new();
-
-        let provider =
-            MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?;
-        ctx.register_table("t", Arc::new(provider))?;
-
-        let result = plan_and_collect(&ctx, "SELECT AVG(a) FROM t").await?;
-
-        let batch = &result[0];
-        assert_eq!(1, batch.num_columns());
-        assert_eq!(1, batch.num_rows());
-
-        let values = batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .expect("failed to cast version");
-        assert_eq!(values.len(), 1);
-        // avg(1,2,3,4,5) = 3.0
-        assert_eq!(values.value(0), 3.0_f64);
-        Ok(())
-    }
-
     #[tokio::test]
     async fn custom_query_planner() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
@@ -3385,13 +2164,6 @@ mod tests {
         ctx.sql(sql).await?.collect().await
     }
 
-    /// Execute SQL and return results
-    async fn execute(sql: &str, partition_count: usize) -> Result<Vec<RecordBatch>> {
-        let tmp_dir = TempDir::new()?;
-        let ctx = create_ctx(&tmp_dir, partition_count).await?;
-        plan_and_collect(&ctx, sql).await
-    }
-
     /// Generate CSV partitions within the supplied directory
     fn populate_csv_partitions(
         tmp_dir: &TempDir,
diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs
index 272bb82d6..f84a7c2b3 100644
--- a/datafusion/core/tests/sql/aggregates.rs
+++ b/datafusion/core/tests/sql/aggregates.rs
@@ -804,3 +804,306 @@ async fn aggregate_timestamps_avg() -> Result<()> {
     assert_eq!(results.to_string(), "Error during planning: The function Avg does not support inputs of type Timestamp(Nanosecond, None).");
     Ok(())
 }
+
+#[tokio::test]
+async fn aggregate_decimal_min() -> Result<()> {
+    let ctx = SessionContext::new();
+    // the data type of c1 is decimal(10,3)
+    ctx.register_table("d_table", table_with_decimal()).unwrap();
+    let result = plan_and_collect(&ctx, "select min(c1) from d_table")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+-----------------+",
+        "| MIN(d_table.c1) |",
+        "+-----------------+",
+        "| -100.009        |",
+        "+-----------------+",
+    ];
+    assert_eq!(
+        &DataType::Decimal(10, 3),
+        result[0].schema().field(0).data_type()
+    );
+    assert_batches_sorted_eq!(expected, &result);
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_decimal_max() -> Result<()> {
+    let ctx = SessionContext::new();
+    // the data type of c1 is decimal(10,3)
+    ctx.register_table("d_table", table_with_decimal()).unwrap();
+
+    let result = plan_and_collect(&ctx, "select max(c1) from d_table")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+-----------------+",
+        "| MAX(d_table.c1) |",
+        "+-----------------+",
+        "| 110.009         |",
+        "+-----------------+",
+    ];
+    assert_eq!(
+        &DataType::Decimal(10, 3),
+        result[0].schema().field(0).data_type()
+    );
+    assert_batches_sorted_eq!(expected, &result);
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_decimal_sum() -> Result<()> {
+    let ctx = SessionContext::new();
+    // the data type of c1 is decimal(10,3)
+    ctx.register_table("d_table", table_with_decimal()).unwrap();
+    let result = plan_and_collect(&ctx, "select sum(c1) from d_table")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+-----------------+",
+        "| SUM(d_table.c1) |",
+        "+-----------------+",
+        "| 100.000         |",
+        "+-----------------+",
+    ];
+    assert_eq!(
+        &DataType::Decimal(20, 3),
+        result[0].schema().field(0).data_type()
+    );
+    assert_batches_sorted_eq!(expected, &result);
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_decimal_avg() -> Result<()> {
+    let ctx = SessionContext::new();
+    // the data type of c1 is decimal(10,3)
+    ctx.register_table("d_table", table_with_decimal()).unwrap();
+    let result = plan_and_collect(&ctx, "select avg(c1) from d_table")
+        .await
+        .unwrap();
+    let expected = vec![
+        "+-----------------+",
+        "| AVG(d_table.c1) |",
+        "+-----------------+",
+        "| 5.0000000       |",
+        "+-----------------+",
+    ];
+    assert_eq!(
+        &DataType::Decimal(14, 7),
+        result[0].schema().field(0).data_type()
+    );
+    assert_batches_sorted_eq!(expected, &result);
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate() -> Result<()> {
+    let results = execute_with_partition("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+--------------+--------------+",
+        "| SUM(test.c1) | SUM(test.c2) |",
+        "+--------------+--------------+",
+        "| 60           | 220          |",
+        "+--------------+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_empty() -> Result<()> {
+    // The predicate on this query purposely generates no results
+    let results =
+        execute_with_partition("SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000", 4)
+            .await
+            .unwrap();
+
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+--------------+--------------+",
+        "| SUM(test.c1) | SUM(test.c2) |",
+        "+--------------+--------------+",
+        "|              |              |",
+        "+--------------+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_avg() -> Result<()> {
+    let results = execute_with_partition("SELECT AVG(c1), AVG(c2) FROM test", 4).await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+--------------+--------------+",
+        "| AVG(test.c1) | AVG(test.c2) |",
+        "+--------------+--------------+",
+        "| 1.5          | 5.5          |",
+        "+--------------+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_max() -> Result<()> {
+    let results = execute_with_partition("SELECT MAX(c1), MAX(c2) FROM test", 4).await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+--------------+--------------+",
+        "| MAX(test.c1) | MAX(test.c2) |",
+        "+--------------+--------------+",
+        "| 3            | 10           |",
+        "+--------------+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_min() -> Result<()> {
+    let results = execute_with_partition("SELECT MIN(c1), MIN(c2) FROM test", 4).await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+--------------+--------------+",
+        "| MIN(test.c1) | MIN(test.c2) |",
+        "+--------------+--------------+",
+        "| 0            | 1            |",
+        "+--------------+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped() -> Result<()> {
+    let results =
+        execute_with_partition("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?;
+
+    let expected = vec![
+        "+----+--------------+",
+        "| c1 | SUM(test.c2) |",
+        "+----+--------------+",
+        "| 0  | 55           |",
+        "| 1  | 55           |",
+        "| 2  | 55           |",
+        "| 3  | 55           |",
+        "+----+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped_avg() -> Result<()> {
+    let results =
+        execute_with_partition("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?;
+
+    let expected = vec![
+        "+----+--------------+",
+        "| c1 | AVG(test.c2) |",
+        "+----+--------------+",
+        "| 0  | 5.5          |",
+        "| 1  | 5.5          |",
+        "| 2  | 5.5          |",
+        "| 3  | 5.5          |",
+        "+----+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped_empty() -> Result<()> {
+    let results = execute_with_partition(
+        "SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1",
+        4,
+    )
+    .await?;
+
+    let expected = vec![
+        "+----+--------------+",
+        "| c1 | AVG(test.c2) |",
+        "+----+--------------+",
+        "+----+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped_max() -> Result<()> {
+    let results =
+        execute_with_partition("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?;
+
+    let expected = vec![
+        "+----+--------------+",
+        "| c1 | MAX(test.c2) |",
+        "+----+--------------+",
+        "| 0  | 10           |",
+        "| 1  | 10           |",
+        "| 2  | 10           |",
+        "| 3  | 10           |",
+        "+----+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped_min() -> Result<()> {
+    let results =
+        execute_with_partition("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?;
+
+    let expected = vec![
+        "+----+--------------+",
+        "| c1 | MIN(test.c2) |",
+        "+----+--------------+",
+        "| 0  | 1            |",
+        "| 1  | 1            |",
+        "| 2  | 1            |",
+        "| 3  | 1            |",
+        "+----+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_avg_add() -> Result<()> {
+    let results = execute_with_partition(
+        "SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test",
+        4,
+    )
+    .await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+--------------+----------------------------+----------------------------+----------------------------+",
+        "| AVG(test.c1) | AVG(test.c1) Plus Int64(1) | AVG(test.c1) Plus Int64(2) | Int64(1) Plus AVG(test.c1) |",
+        "+--------------+----------------------------+----------------------------+----------------------------+",
+        "| 1.5          | 2.5                        | 3.5                        | 2.5                        |",
+        "+--------------+----------------------------+----------------------------+----------------------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
diff --git a/datafusion/core/tests/sql/functions.rs b/datafusion/core/tests/sql/functions.rs
index 226bb8159..06ddea09d 100644
--- a/datafusion/core/tests/sql/functions.rs
+++ b/datafusion/core/tests/sql/functions.rs
@@ -357,3 +357,240 @@ async fn coalesce_mul_with_default_value() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn count_basic() -> Result<()> {
+    let results =
+        execute_with_partition("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+----------------+----------------+",
+        "| COUNT(test.c1) | COUNT(test.c2) |",
+        "+----------------+----------------+",
+        "| 10             | 10             |",
+        "+----------------+----------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn count_partitioned() -> Result<()> {
+    let results =
+        execute_with_partition("SELECT COUNT(c1), COUNT(c2) FROM test", 4).await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+----------------+----------------+",
+        "| COUNT(test.c1) | COUNT(test.c2) |",
+        "+----------------+----------------+",
+        "| 40             | 40             |",
+        "+----------------+----------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn count_aggregated() -> Result<()> {
+    let results =
+        execute_with_partition("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?;
+
+    let expected = vec![
+        "+----+----------------+",
+        "| c1 | COUNT(test.c2) |",
+        "+----+----------------+",
+        "| 0  | 10             |",
+        "| 1  | 10             |",
+        "| 2  | 10             |",
+        "| 3  | 10             |",
+        "+----+----------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn simple_avg() -> Result<()> {
+    let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+
+    let batch1 = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(Int32Array::from_slice(&[1, 2, 3]))],
+    )?;
+    let batch2 = RecordBatch::try_new(
+        Arc::new(schema.clone()),
+        vec![Arc::new(Int32Array::from_slice(&[4, 5]))],
+    )?;
+
+    let ctx = SessionContext::new();
+
+    let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?;
+    ctx.register_table("t", Arc::new(provider))?;
+
+    let result = plan_and_collect(&ctx, "SELECT AVG(a) FROM t").await?;
+
+    let batch = &result[0];
+    assert_eq!(1, batch.num_columns());
+    assert_eq!(1, batch.num_rows());
+
+    let values = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<Float64Array>()
+        .expect("failed to cast version");
+    assert_eq!(values.len(), 1);
+    // avg(1,2,3,4,5) = 3.0
+    assert_eq!(values.value(0), 3.0_f64);
+    Ok(())
+}
+
+#[tokio::test]
+async fn case_sensitive_identifiers_functions() {
+    let ctx = SessionContext::new();
+    ctx.register_table("t", table_with_sequence(1, 1).unwrap())
+        .unwrap();
+
+    let expected = vec![
+        "+-----------+",
+        "| sqrt(t.i) |",
+        "+-----------+",
+        "| 1         |",
+        "+-----------+",
+    ];
+
+    let results = plan_and_collect(&ctx, "SELECT sqrt(i) FROM t")
+        .await
+        .unwrap();
+
+    assert_batches_sorted_eq!(expected, &results);
+
+    let results = plan_and_collect(&ctx, "SELECT SQRT(i) FROM t")
+        .await
+        .unwrap();
+    assert_batches_sorted_eq!(expected, &results);
+
+    // Using double quotes allows specifying the function name with capitalization
+    let err = plan_and_collect(&ctx, "SELECT \"SQRT\"(i) FROM t")
+        .await
+        .unwrap_err();
+    assert_eq!(
+        err.to_string(),
+        "Error during planning: Invalid function 'SQRT'"
+    );
+
+    let results = plan_and_collect(&ctx, "SELECT \"sqrt\"(i) FROM t")
+        .await
+        .unwrap();
+    assert_batches_sorted_eq!(expected, &results);
+}
+
+#[tokio::test]
+async fn case_builtin_math_expression() {
+    let ctx = SessionContext::new();
+
+    let type_values = vec![
+        (
+            DataType::Int8,
+            Arc::new(Int8Array::from_slice(&[1])) as ArrayRef,
+        ),
+        (
+            DataType::Int16,
+            Arc::new(Int16Array::from_slice(&[1])) as ArrayRef,
+        ),
+        (
+            DataType::Int32,
+            Arc::new(Int32Array::from_slice(&[1])) as ArrayRef,
+        ),
+        (
+            DataType::Int64,
+            Arc::new(Int64Array::from_slice(&[1])) as ArrayRef,
+        ),
+        (
+            DataType::UInt8,
+            Arc::new(UInt8Array::from_slice(&[1])) as ArrayRef,
+        ),
+        (
+            DataType::UInt16,
+            Arc::new(UInt16Array::from_slice(&[1])) as ArrayRef,
+        ),
+        (
+            DataType::UInt32,
+            Arc::new(UInt32Array::from_slice(&[1])) as ArrayRef,
+        ),
+        (
+            DataType::UInt64,
+            Arc::new(UInt64Array::from_slice(&[1])) as ArrayRef,
+        ),
+        (
+            DataType::Float32,
+            Arc::new(Float32Array::from_slice(&[1.0_f32])) as ArrayRef,
+        ),
+        (
+            DataType::Float64,
+            Arc::new(Float64Array::from_slice(&[1.0_f64])) as ArrayRef,
+        ),
+    ];
+
+    for (data_type, array) in type_values.iter() {
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("v", data_type.clone(), false)]));
+        let batch = RecordBatch::try_new(schema.clone(), vec![array.clone()]).unwrap();
+        let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
+        ctx.deregister_table("t").unwrap();
+        ctx.register_table("t", Arc::new(provider)).unwrap();
+        let expected = vec![
+            "+-----------+",
+            "| sqrt(t.v) |",
+            "+-----------+",
+            "| 1         |",
+            "+-----------+",
+        ];
+        let results = plan_and_collect(&ctx, "SELECT sqrt(v) FROM t")
+            .await
+            .unwrap();
+
+        assert_batches_sorted_eq!(expected, &results);
+    }
+}
+
+#[tokio::test]
+async fn case_sensitive_identifiers_aggregates() {
+    let ctx = SessionContext::new();
+    ctx.register_table("t", table_with_sequence(1, 1).unwrap())
+        .unwrap();
+
+    let expected = vec![
+        "+----------+",
+        "| MAX(t.i) |",
+        "+----------+",
+        "| 1        |",
+        "+----------+",
+    ];
+
+    let results = plan_and_collect(&ctx, "SELECT max(i) FROM t")
+        .await
+        .unwrap();
+
+    assert_batches_sorted_eq!(expected, &results);
+
+    let results = plan_and_collect(&ctx, "SELECT MAX(i) FROM t")
+        .await
+        .unwrap();
+    assert_batches_sorted_eq!(expected, &results);
+
+    // Using double quotes allows specifying the function name with capitalization
+    let err = plan_and_collect(&ctx, "SELECT \"MAX\"(i) FROM t")
+        .await
+        .unwrap_err();
+    assert_eq!(
+        err.to_string(),
+        "Error during planning: Invalid function 'MAX'"
+    );
+
+    let results = plan_and_collect(&ctx, "SELECT \"max\"(i) FROM t")
+        .await
+        .unwrap();
+    assert_batches_sorted_eq!(expected, &results);
+}
diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs
index 78323310a..7f38fbda8 100644
--- a/datafusion/core/tests/sql/group_by.rs
+++ b/datafusion/core/tests/sql/group_by.rs
@@ -442,3 +442,199 @@ async fn csv_group_by_date() -> Result<()> {
     assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn group_by_date_trunc() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = SessionContext::new();
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c2", DataType::UInt64, false),
+        Field::new(
+            "t1",
+            DataType::Timestamp(TimeUnit::Microsecond, None),
+            false,
+        ),
+    ]));
+
+    // generate a partitioned file
+    for partition in 0..4 {
+        let filename = format!("partition-{}.{}", partition, "csv");
+        let file_path = tmp_dir.path().join(&filename);
+        let mut file = File::create(file_path)?;
+
+        // generate some data
+        for i in 0..10 {
+            let data = format!("{},2020-12-{}T00:00:00.000Z\n", i, i + 10);
+            file.write_all(data.as_bytes())?;
+        }
+    }
+
+    ctx.register_csv(
+        "test",
+        tmp_dir.path().to_str().unwrap(),
+        CsvReadOptions::new().schema(&schema).has_header(false),
+    )
+    .await?;
+
+    let results = plan_and_collect(
+        &ctx,
+        "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)",
+    ).await?;
+
+    let expected = vec![
+        "+---------------------+--------------+",
+        "| week                | SUM(test.c2) |",
+        "+---------------------+--------------+",
+        "| 2020-12-07 00:00:00 | 24           |",
+        "| 2020-12-14 00:00:00 | 156          |",
+        "+---------------------+--------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn group_by_largeutf8() {
+    {
+        let ctx = SessionContext::new();
+
+        // input data looks like:
+        // A, 1
+        // B, 2
+        // A, 2
+        // A, 4
+        // C, 1
+        // A, 1
+
+        let str_array: LargeStringArray = vec!["A", "B", "A", "A", "C", "A"]
+            .into_iter()
+            .map(Some)
+            .collect();
+        let str_array = Arc::new(str_array);
+
+        let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into();
+        let val_array = Arc::new(val_array);
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("str", str_array.data_type().clone(), false),
+            Field::new("val", val_array.data_type().clone(), false),
+        ]));
+
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![str_array, val_array]).unwrap();
+
+        let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
+        ctx.register_table("t", Arc::new(provider)).unwrap();
+
+        let results =
+            plan_and_collect(&ctx, "SELECT str, count(val) FROM t GROUP BY str")
+                .await
+                .expect("ran plan correctly");
+
+        let expected = vec![
+            "+-----+--------------+",
+            "| str | COUNT(t.val) |",
+            "+-----+--------------+",
+            "| A   | 4            |",
+            "| B   | 1            |",
+            "| C   | 1            |",
+            "+-----+--------------+",
+        ];
+        assert_batches_sorted_eq!(expected, &results);
+    }
+}
+
+#[tokio::test]
+async fn group_by_dictionary() {
+    async fn run_test_case<K: ArrowDictionaryKeyType>() {
+        let ctx = SessionContext::new();
+
+        // input data looks like:
+        // A, 1
+        // B, 2
+        // A, 2
+        // A, 4
+        // C, 1
+        // A, 1
+
+        let dict_array: DictionaryArray<K> =
+            vec!["A", "B", "A", "A", "C", "A"].into_iter().collect();
+        let dict_array = Arc::new(dict_array);
+
+        let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into();
+        let val_array = Arc::new(val_array);
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("dict", dict_array.data_type().clone(), false),
+            Field::new("val", val_array.data_type().clone(), false),
+        ]));
+
+        let batch =
+            RecordBatch::try_new(schema.clone(), vec![dict_array, val_array]).unwrap();
+
+        let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
+        ctx.register_table("t", Arc::new(provider)).unwrap();
+
+        let results =
+            plan_and_collect(&ctx, "SELECT dict, count(val) FROM t GROUP BY dict")
+                .await
+                .expect("ran plan correctly");
+
+        let expected = vec![
+            "+------+--------------+",
+            "| dict | COUNT(t.val) |",
+            "+------+--------------+",
+            "| A    | 4            |",
+            "| B    | 1            |",
+            "| C    | 1            |",
+            "+------+--------------+",
+        ];
+        assert_batches_sorted_eq!(expected, &results);
+
+        // Now, use dict as an aggregate
+        let results =
+            plan_and_collect(&ctx, "SELECT val, count(dict) FROM t GROUP BY val")
+                .await
+                .expect("ran plan correctly");
+
+        let expected = vec![
+            "+-----+---------------+",
+            "| val | COUNT(t.dict) |",
+            "+-----+---------------+",
+            "| 1   | 3             |",
+            "| 2   | 2             |",
+            "| 4   | 1             |",
+            "+-----+---------------+",
+        ];
+        assert_batches_sorted_eq!(expected, &results);
+
+        // Now, use dict as an aggregate
+        let results = plan_and_collect(
+            &ctx,
+            "SELECT val, count(distinct dict) FROM t GROUP BY val",
+        )
+        .await
+        .expect("ran plan correctly");
+
+        let expected = vec![
+            "+-----+------------------------+",
+            "| val | COUNT(DISTINCT t.dict) |",
+            "+-----+------------------------+",
+            "| 1   | 2                      |",
+            "| 2   | 2                      |",
+            "| 4   | 1                      |",
+            "+-----+------------------------+",
+        ];
+        assert_batches_sorted_eq!(expected, &results);
+    }
+
+    run_test_case::<Int8Type>().await;
+    run_test_case::<Int16Type>().await;
+    run_test_case::<Int32Type>().await;
+    run_test_case::<Int64Type>().await;
+    run_test_case::<UInt8Type>().await;
+    run_test_case::<UInt16Type>().await;
+    run_test_case::<UInt32Type>().await;
+    run_test_case::<UInt64Type>().await;
+}
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 6bfc3f384..aaa8adac5 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -1020,3 +1020,110 @@ async fn left_join_should_not_panic_with_empty_side() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn left_join_using_2() -> Result<()> {
+    let results = execute_with_partition(
+        "SELECT t1.c1, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
+        1,
+    )
+    .await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+----+----+",
+        "| c1 | c2 |",
+        "+----+----+",
+        "| 0  | 1  |",
+        "| 0  | 2  |",
+        "| 0  | 3  |",
+        "| 0  | 4  |",
+        "| 0  | 5  |",
+        "| 0  | 6  |",
+        "| 0  | 7  |",
+        "| 0  | 8  |",
+        "| 0  | 9  |",
+        "| 0  | 10 |",
+        "+----+----+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn left_join_using_join_key_projection() -> Result<()> {
+    let results = execute_with_partition(
+        "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
+        1,
+    )
+    .await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+----+----+----+",
+        "| c1 | c2 | c2 |",
+        "+----+----+----+",
+        "| 0  | 1  | 1  |",
+        "| 0  | 2  | 2  |",
+        "| 0  | 3  | 3  |",
+        "| 0  | 4  | 4  |",
+        "| 0  | 5  | 5  |",
+        "| 0  | 6  | 6  |",
+        "| 0  | 7  | 7  |",
+        "| 0  | 8  | 8  |",
+        "| 0  | 9  | 9  |",
+        "| 0  | 10 | 10 |",
+        "+----+----+----+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn left_join_2() -> Result<()> {
+    let results = execute_with_partition(
+        "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 ON t1.c2 = t2.c2 ORDER BY t1.c2",
+        1,
+    )
+        .await?;
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+----+----+----+",
+        "| c1 | c2 | c2 |",
+        "+----+----+----+",
+        "| 0  | 1  | 1  |",
+        "| 0  | 2  | 2  |",
+        "| 0  | 3  | 3  |",
+        "| 0  | 4  | 4  |",
+        "| 0  | 5  | 5  |",
+        "| 0  | 6  | 6  |",
+        "| 0  | 7  | 7  |",
+        "| 0  | 8  | 8  |",
+        "| 0  | 9  | 9  |",
+        "| 0  | 10 | 10 |",
+        "+----+----+----+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn join_partitioned() -> Result<()> {
+    // self join on partition id (workaround for duplicate column name)
+    let results = execute_with_partition(
+        "SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1",
+        4,
+    )
+    .await?;
+
+    assert_eq!(
+        results.iter().map(|b| b.num_rows()).sum::<usize>(),
+        4 * 10 * 10
+    );
+
+    Ok(())
+}
diff --git a/datafusion/core/tests/sql/limit.rs b/datafusion/core/tests/sql/limit.rs
index fc2dc4c95..e3b1466bc 100644
--- a/datafusion/core/tests/sql/limit.rs
+++ b/datafusion/core/tests/sql/limit.rs
@@ -89,3 +89,77 @@ async fn csv_query_limit_zero() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn limit() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = create_ctx_with_partition(&tmp_dir, 1).await?;
+    ctx.register_table("t", table_with_sequence(1, 1000).unwrap())
+        .unwrap();
+
+    let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i DESC limit 3")
+        .await
+        .unwrap();
+
+    let expected = vec![
+        "+------+", "| i    |", "+------+", "| 1000 |", "| 999  |", "| 998  |",
+        "+------+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i limit 3")
+        .await
+        .unwrap();
+
+    let expected = vec![
+        "+---+", "| i |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    let results = plan_and_collect(&ctx, "SELECT i FROM t limit 3")
+        .await
+        .unwrap();
+
+    // the actual rows are not guaranteed, so only check the count (should be 3)
+    let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
+    assert_eq!(num_rows, 3);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn limit_multi_partitions() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = create_ctx_with_partition(&tmp_dir, 1).await?;
+
+    let partitions = vec![
+        vec![make_partition(0)],
+        vec![make_partition(1)],
+        vec![make_partition(2)],
+        vec![make_partition(3)],
+        vec![make_partition(4)],
+        vec![make_partition(5)],
+    ];
+    let schema = partitions[0][0].schema();
+    let provider = Arc::new(MemTable::try_new(schema, partitions).unwrap());
+
+    ctx.register_table("t", provider).unwrap();
+
+    // select all rows
+    let results = plan_and_collect(&ctx, "SELECT i FROM t").await.unwrap();
+
+    let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
+    assert_eq!(num_rows, 15);
+
+    for limit in 1..10 {
+        let query = format!("SELECT i FROM t limit {}", limit);
+        let results = plan_and_collect(&ctx, &query).await.unwrap();
+
+        let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
+        assert_eq!(num_rows, limit, "mismatch with query {}", query);
+    }
+
+    Ok(())
+}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 300ad0d9f..12570a419 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -46,6 +46,9 @@ use datafusion::{
 };
 use datafusion::{execution::context::SessionContext, physical_plan::displayable};
 use datafusion_expr::Volatility;
+use std::fs::File;
+use std::io::Write;
+use tempfile::TempDir;
 
 /// A macro to assert that some particular line contains two substrings
 ///
@@ -556,6 +559,98 @@ async fn execute(ctx: &SessionContext, sql: &str) -> Vec<Vec<String>> {
     result_vec(&execute_to_batches(ctx, sql).await)
 }
 
+/// Execute SQL and return results
+async fn execute_with_partition(
+    sql: &str,
+    partition_count: usize,
+) -> Result<Vec<RecordBatch>> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
+    plan_and_collect(&ctx, sql).await
+}
+
+/// Generate a partitioned CSV file and register it with an execution context
+async fn create_ctx_with_partition(
+    tmp_dir: &TempDir,
+    partition_count: usize,
+) -> Result<SessionContext> {
+    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(8));
+
+    let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?;
+
+    // register csv file with the execution context
+    ctx.register_csv(
+        "test",
+        tmp_dir.path().to_str().unwrap(),
+        CsvReadOptions::new().schema(&schema),
+    )
+    .await?;
+
+    Ok(ctx)
+}
+
+/// Generate CSV partitions within the supplied directory
+fn populate_csv_partitions(
+    tmp_dir: &TempDir,
+    partition_count: usize,
+    file_extension: &str,
+) -> Result<SchemaRef> {
+    // define schema for data source (csv file)
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::UInt32, false),
+        Field::new("c2", DataType::UInt64, false),
+        Field::new("c3", DataType::Boolean, false),
+    ]));
+
+    // generate a partitioned file
+    for partition in 0..partition_count {
+        let filename = format!("partition-{}.{}", partition, file_extension);
+        let file_path = tmp_dir.path().join(&filename);
+        let mut file = File::create(file_path)?;
+
+        // generate some data
+        for i in 0..=10 {
+            let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+            file.write_all(data.as_bytes())?;
+        }
+    }
+
+    Ok(schema)
+}
+
+/// Return a new table which provide this decimal column
+pub fn table_with_decimal() -> Arc<dyn TableProvider> {
+    let batch_decimal = make_decimal();
+    let schema = batch_decimal.schema();
+    let partitions = vec![vec![batch_decimal]];
+    Arc::new(MemTable::try_new(schema, partitions).unwrap())
+}
+
+fn make_decimal() -> RecordBatch {
+    let mut decimal_builder = DecimalBuilder::new(20, 10, 3);
+    for i in 110000..110010 {
+        decimal_builder.append_value(i as i128).unwrap();
+    }
+    for i in 100000..100010 {
+        decimal_builder.append_value(-i as i128).unwrap();
+    }
+    let array = decimal_builder.finish();
+    let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
+    RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
+}
+
+/// Return a RecordBatch with a single Int32 array with values (0..sz)
+pub fn make_partition(sz: i32) -> RecordBatch {
+    let seq_start = 0;
+    let seq_end = sz;
+    let values = (seq_start..seq_end).collect::<Vec<_>>();
+    let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
+    let arr = Arc::new(Int32Array::from(values));
+    let arr = arr as ArrayRef;
+
+    RecordBatch::try_new(schema, vec![arr]).unwrap()
+}
+
 /// Specialised String representation
 fn col_str(column: &ArrayRef, row_index: usize) -> String {
     if column.is_null(row_index) {
diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs
index 8ce74ebf6..f6cf74257 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -1008,3 +1008,149 @@ async fn query_empty_table() {
     let expected = vec!["++", "++"];
     assert_batches_sorted_eq!(expected, &result);
 }
+
+#[tokio::test]
+async fn boolean_literal() -> Result<()> {
+    let results =
+        execute_with_partition("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4)
+            .await?;
+
+    let expected = vec![
+        "+----+------+",
+        "| c1 | c3   |",
+        "+----+------+",
+        "| 3  | true |",
+        "| 3  | true |",
+        "| 3  | true |",
+        "| 3  | true |",
+        "| 3  | true |",
+        "+----+------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+async fn run_count_distinct_integers_aggregated_scenario(
+    partitions: Vec<Vec<(&str, u64)>>,
+) -> Result<Vec<RecordBatch>> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = SessionContext::new();
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c_group", DataType::Utf8, false),
+        Field::new("c_int8", DataType::Int8, false),
+        Field::new("c_int16", DataType::Int16, false),
+        Field::new("c_int32", DataType::Int32, false),
+        Field::new("c_int64", DataType::Int64, false),
+        Field::new("c_uint8", DataType::UInt8, false),
+        Field::new("c_uint16", DataType::UInt16, false),
+        Field::new("c_uint32", DataType::UInt32, false),
+        Field::new("c_uint64", DataType::UInt64, false),
+    ]));
+
+    for (i, partition) in partitions.iter().enumerate() {
+        let filename = format!("partition-{}.csv", i);
+        let file_path = tmp_dir.path().join(&filename);
+        let mut file = File::create(file_path)?;
+        for row in partition {
+            let row_str = format!(
+                "{},{}\n",
+                row.0,
+                // Populate values for each of the integer fields in the
+                // schema.
+                (0..8)
+                    .map(|_| { row.1.to_string() })
+                    .collect::<Vec<_>>()
+                    .join(","),
+            );
+            file.write_all(row_str.as_bytes())?;
+        }
+    }
+    ctx.register_csv(
+        "test",
+        tmp_dir.path().to_str().unwrap(),
+        CsvReadOptions::new().schema(&schema).has_header(false),
+    )
+    .await?;
+
+    let results = plan_and_collect(
+        &ctx,
+        "
+          SELECT
+            c_group,
+            COUNT(c_uint64),
+            COUNT(DISTINCT c_int8),
+            COUNT(DISTINCT c_int16),
+            COUNT(DISTINCT c_int32),
+            COUNT(DISTINCT c_int64),
+            COUNT(DISTINCT c_uint8),
+            COUNT(DISTINCT c_uint16),
+            COUNT(DISTINCT c_uint32),
+            COUNT(DISTINCT c_uint64)
+          FROM test
+          GROUP BY c_group
+        ",
+    )
+    .await?;
+
+    Ok(results)
+}
+
+#[tokio::test]
+async fn count_distinct_integers_aggregated_single_partition() -> Result<()> {
+    let partitions = vec![
+        // The first member of each tuple will be the value for the
+        // `c_group` column, and the second member will be the value for
+        // each of the int/uint fields.
+        vec![
+            ("a", 1),
+            ("a", 1),
+            ("a", 2),
+            ("b", 9),
+            ("c", 9),
+            ("c", 10),
+            ("c", 9),
+        ],
+    ];
+
+    let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
+
+    let expected = vec![
+        "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+        "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
+        "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+        "| a       | 3                    | 2                           | 2                            | 2                            | 2                            | 2                            | 2                             | 2                             | 2                             |",
+        "| b       | 1                    | 1                           | 1                            | 1                            | 1                            | 1                            | 1                             | 1                             | 1                             |",
+        "| c       | 3                    | 2                           | 2                            | 2                            | 2                            | 2                            | 2                             | 2                             | 2                             |",
+        "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()> {
+    let partitions = vec![
+        // The first member of each tuple will be the value for the
+        // `c_group` column, and the second member will be the value for
+        // each of the int/uint fields.
+        vec![("a", 1), ("a", 1), ("a", 2), ("b", 9), ("c", 9)],
+        vec![("a", 1), ("a", 3), ("b", 8), ("b", 9), ("b", 10), ("b", 11)],
+    ];
+
+    let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
+
+    let expected = vec![
+        "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+        "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
+        "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+        "| a       | 5                    | 3                           | 3                            | 3                            | 3                            | 3                            | 3                             | 3                             | 3                             |",
+        "| b       | 5                    | 4                           | 4                            | 4                            | 4                            | 4                            | 4                             | 4                             | 4                             |",
+        "| c       | 1                    | 1                           | 1                            | 1                            | 1                            | 1                            | 1                             | 1                             | 1                             |",
+        "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index ba493e6fe..7e36177fd 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -142,3 +142,159 @@ async fn csv_query_window_with_partition_by_order_by() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn window() -> Result<()> {
+    let results = execute_with_partition(
+        "SELECT \
+        c1, \
+        c2, \
+        SUM(c2) OVER (), \
+        COUNT(c2) OVER (), \
+        MAX(c2) OVER (), \
+        MIN(c2) OVER (), \
+        AVG(c2) OVER () \
+        FROM test \
+        ORDER BY c1, c2 \
+        LIMIT 5",
+        4,
+    )
+    .await?;
+    // result in one batch, although e.g. having 2 batches do not change
+    // result semantics, having a len=1 assertion upfront keeps surprises
+    // at bay
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+----+----+--------------+----------------+--------------+--------------+--------------+",
+        "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
+        "+----+----+--------------+----------------+--------------+--------------+--------------+",
+        "| 0  | 1  | 220          | 40             | 10           | 1            | 5.5          |",
+        "| 0  | 2  | 220          | 40             | 10           | 1            | 5.5          |",
+        "| 0  | 3  | 220          | 40             | 10           | 1            | 5.5          |",
+        "| 0  | 4  | 220          | 40             | 10           | 1            | 5.5          |",
+        "| 0  | 5  | 220          | 40             | 10           | 1            | 5.5          |",
+        "+----+----+--------------+----------------+--------------+--------------+--------------+",
+    ];
+
+    // window function shall respect ordering
+    assert_batches_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_order_by() -> Result<()> {
+    let results = execute_with_partition(
+        "SELECT \
+        c1, \
+        c2, \
+        ROW_NUMBER() OVER (ORDER BY c1, c2), \
+        FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \
+        LAST_VALUE(c2) OVER (ORDER BY c1, c2), \
+        NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \
+        SUM(c2) OVER (ORDER BY c1, c2), \
+        COUNT(c2) OVER (ORDER BY c1, c2), \
+        MAX(c2) OVER (ORDER BY c1, c2), \
+        MIN(c2) OVER (ORDER BY c1, c2), \
+        AVG(c2) OVER (ORDER BY c1, c2) \
+        FROM test \
+        ORDER BY c1, c2 \
+        LIMIT 5",
+        4,
+    )
+    .await?;
+    // result in one batch, although e.g. having 2 batches do not change
+    // result semantics, having a len=1 assertion upfront keeps surprises
+    // at bay
+    assert_eq!(results.len(), 1);
+
+    let expected = vec![
+        "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
+        "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2) | LAST_VALUE(test.c2) | NTH_VALUE(test.c2,Int64(2)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
+        "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
+        "| 0  | 1  | 1            | 1                    | 1                   |                             | 1            | 1              | 1            | 1            | 1            |",
+        "| 0  | 2  | 2            | 1                    | 2                   | 2                           | 3            | 2              | 2            | 1            | 1.5          |",
+        "| 0  | 3  | 3            | 1                    | 3                   | 2                           | 6            | 3              | 3            | 1            | 2            |",
+        "| 0  | 4  | 4            | 1                    | 4                   | 2                           | 10           | 4              | 4            | 1            | 2.5          |",
+        "| 0  | 5  | 5            | 1                    | 5                   | 2                           | 15           | 5              | 5            | 1            | 3            |",
+        "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
+    ];
+
+    // window function shall respect ordering
+    assert_batches_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_partition_by() -> Result<()> {
+    let results = execute_with_partition(
+        "SELECT \
+        c1, \
+        c2, \
+        SUM(c2) OVER (PARTITION BY c2), \
+        COUNT(c2) OVER (PARTITION BY c2), \
+        MAX(c2) OVER (PARTITION BY c2), \
+        MIN(c2) OVER (PARTITION BY c2), \
+        AVG(c2) OVER (PARTITION BY c2) \
+        FROM test \
+        ORDER BY c1, c2 \
+        LIMIT 5",
+        4,
+    )
+    .await?;
+
+    let expected = vec![
+        "+----+----+--------------+----------------+--------------+--------------+--------------+",
+        "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
+        "+----+----+--------------+----------------+--------------+--------------+--------------+",
+        "| 0  | 1  | 4            | 4              | 1            | 1            | 1            |",
+        "| 0  | 2  | 8            | 4              | 2            | 2            | 2            |",
+        "| 0  | 3  | 12           | 4              | 3            | 3            | 3            |",
+        "| 0  | 4  | 16           | 4              | 4            | 4            | 4            |",
+        "| 0  | 5  | 20           | 4              | 5            | 5            | 5            |",
+        "+----+----+--------------+----------------+--------------+--------------+--------------+",
+    ];
+
+    // window function shall respect ordering
+    assert_batches_eq!(expected, &results);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_partition_by_order_by() -> Result<()> {
+    let results = execute_with_partition(
+        "SELECT \
+        c1, \
+        c2, \
+        ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1), \
+        FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
+        LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
+        NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1), \
+        SUM(c2) OVER (PARTITION BY c2 ORDER BY c1), \
+        COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1), \
+        MAX(c2) OVER (PARTITION BY c2 ORDER BY c1), \
+        MIN(c2) OVER (PARTITION BY c2 ORDER BY c1), \
+        AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) \
+        FROM test \
+        ORDER BY c1, c2 \
+        LIMIT 5",
+        4,
+    )
+    .await?;
+
+    let expected = vec![
+        "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
+        "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2 + test.c1) | LAST_VALUE(test.c2 + test.c1) | NTH_VALUE(test.c2 + test.c1,Int64(1)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
+        "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
+        "| 0  | 1  | 1            | 1                              | 1                             | 1                                     | 1            | 1              | 1            | 1            | 1            |",
+        "| 0  | 2  | 1            | 2                              | 2                             | 2                                     | 2            | 1              | 2            | 2            | 2            |",
+        "| 0  | 3  | 1            | 3                              | 3                             | 3                                     | 3            | 1              | 3            | 3            | 3            |",
+        "| 0  | 4  | 1            | 4                              | 4                             | 4                                     | 4            | 1              | 4            | 4            | 4            |",
+        "| 0  | 5  | 1            | 5                              | 5                             | 5                                     | 5            | 1              | 5            | 5            | 5            |",
+        "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
+    ];
+
+    // window function shall respect ordering
+    assert_batches_eq!(expected, &results);
+    Ok(())
+}