You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/21 11:16:22 UTC

[arrow-datafusion] branch master updated: Start update tests in sql.rs to use `assert_batches_eq!` (#760)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new eccd074  Start update tests in sql.rs to use `assert_batches_eq!` (#760)
eccd074 is described below

commit eccd074827e7625960e56313420168755ee4a564
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Jul 21 07:16:16 2021 -0400

    Start update tests in sql.rs to use `assert_batches_eq!` (#760)
    
    * Start update tests in sql.rs to use `assert_batches_eq!`
    
    * fix whitespace
---
 datafusion/tests/sql.rs | 741 +++++++++++++++++++++++++-----------------------
 1 file changed, 389 insertions(+), 352 deletions(-)

diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 0ef8b4c..d9f7c6e 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! This module contains end to end tests of running SQL queries using
+//! DataFusion
+
 use std::convert::TryFrom;
 use std::sync::Arc;
 
@@ -36,6 +39,7 @@ use arrow::{
 };
 
 use datafusion::assert_batches_eq;
+use datafusion::assert_batches_sorted_eq;
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::prelude::*;
 use datafusion::{
@@ -262,17 +266,21 @@ async fn csv_select_nested() -> Result<()> {
                    ORDER BY c2 ASC, c3 ASC
                  )
                )";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["a", "5", "-101"],
-        vec!["a", "5", "-54"],
-        vec!["a", "5", "-38"],
-        vec!["a", "5", "65"],
-        vec!["a", "6", "-101"],
-        vec!["a", "6", "-31"],
-        vec!["a", "6", "36"],
+        "+----+----+------+",
+        "| o1 | o2 | c3   |",
+        "+----+----+------+",
+        "| a  | 5  | -101 |",
+        "| a  | 5  | -54  |",
+        "| a  | 5  | -38  |",
+        "| a  | 5  | 65   |",
+        "| a  | 6  | -101 |",
+        "| a  | 6  | -31  |",
+        "| a  | 6  | 36   |",
+        "+----+----+------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -281,9 +289,15 @@ async fn csv_count_star() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT COUNT(*), COUNT(1) AS c, COUNT(c1) FROM aggregate_test_100";
-    let actual = execute(&mut ctx, sql).await;
-    let expected = vec![vec!["100", "100", "100"]];
-    assert_eq!(expected, actual);
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+-----------------+-----+-----------+",
+        "| COUNT(UInt8(1)) | c   | COUNT(c1) |",
+        "+-----------------+-----+-----------+",
+        "| 100             | 100 | 100       |",
+        "+-----------------+-----+-----------+",
+    ];
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -292,12 +306,16 @@ async fn csv_query_with_predicate() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c1, c12 FROM aggregate_test_100 WHERE c12 > 0.376 AND c12 < 0.4";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["e", "0.39144436569161134"],
-        vec!["d", "0.38870280983958583"],
+        "+----+---------------------+",
+        "| c1 | c12                 |",
+        "+----+---------------------+",
+        "| e  | 0.39144436569161134 |",
+        "| d  | 0.38870280983958583 |",
+        "+----+---------------------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -306,9 +324,16 @@ async fn csv_query_with_negative_predicate() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c1, c4 FROM aggregate_test_100 WHERE c3 < -55 AND -c4 > 30000";
-    let actual = execute(&mut ctx, sql).await;
-    let expected = vec![vec!["e", "-31500"], vec!["c", "-30187"]];
-    assert_eq!(expected, actual);
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+--------+",
+        "| c1 | c4     |",
+        "+----+--------+",
+        "| e  | -31500 |",
+        "| c  | -30187 |",
+        "+----+--------+",
+    ];
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -317,9 +342,15 @@ async fn csv_query_with_negated_predicate() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT COUNT(1) FROM aggregate_test_100 WHERE NOT(c1 != 'a')";
-    let actual = execute(&mut ctx, sql).await;
-    let expected = vec![vec!["21"]];
-    assert_eq!(expected, actual);
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+-----------------+",
+        "| COUNT(UInt8(1)) |",
+        "+-----------------+",
+        "| 21              |",
+        "+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -328,9 +359,15 @@ async fn csv_query_with_is_not_null_predicate() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT COUNT(1) FROM aggregate_test_100 WHERE c1 IS NOT NULL";
-    let actual = execute(&mut ctx, sql).await;
-    let expected = vec![vec!["100"]];
-    assert_eq!(expected, actual);
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+-----------------+",
+        "| COUNT(UInt8(1)) |",
+        "+-----------------+",
+        "| 100             |",
+        "+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -339,9 +376,15 @@ async fn csv_query_with_is_null_predicate() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT COUNT(1) FROM aggregate_test_100 WHERE c1 IS NULL";
-    let actual = execute(&mut ctx, sql).await;
-    let expected = vec![vec!["0"]];
-    assert_eq!(expected, actual);
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+-----------------+",
+        "| COUNT(UInt8(1)) |",
+        "+-----------------+",
+        "| 0               |",
+        "+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -350,16 +393,19 @@ async fn csv_query_group_by_int_min_max() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c2, MIN(c12), MAX(c12) FROM aggregate_test_100 GROUP BY c2";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["1", "0.05636955101974106", "0.9965400387585364"],
-        vec!["2", "0.16301110515739792", "0.991517828651004"],
-        vec!["3", "0.047343434291126085", "0.9293883502480845"],
-        vec!["4", "0.02182578039211991", "0.9237877978193884"],
-        vec!["5", "0.01479305307777301", "0.9723580396501548"],
+        "+----+----------------------+--------------------+",
+        "| c2 | MIN(c12)             | MAX(c12)           |",
+        "+----+----------------------+--------------------+",
+        "| 1  | 0.05636955101974106  | 0.9965400387585364 |",
+        "| 2  | 0.16301110515739792  | 0.991517828651004  |",
+        "| 3  | 0.047343434291126085 | 0.9293883502480845 |",
+        "| 4  | 0.02182578039211991  | 0.9237877978193884 |",
+        "| 5  | 0.01479305307777301  | 0.9723580396501548 |",
+        "+----+----------------------+--------------------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
 
@@ -370,16 +416,20 @@ async fn csv_query_group_by_float32() -> Result<()> {
 
     let sql =
         "SELECT COUNT(*) as cnt, c1 FROM aggregate_simple GROUP BY c1 ORDER BY cnt DESC";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
 
     let expected = vec![
-        vec!["5", "0.00005"],
-        vec!["4", "0.00004"],
-        vec!["3", "0.00003"],
-        vec!["2", "0.00002"],
-        vec!["1", "0.00001"],
+        "+-----+---------+",
+        "| cnt | c1      |",
+        "+-----+---------+",
+        "| 5   | 0.00005 |",
+        "| 4   | 0.00004 |",
+        "| 3   | 0.00003 |",
+        "| 2   | 0.00002 |",
+        "| 1   | 0.00001 |",
+        "+-----+---------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_eq!(expected, &actual);
 
     Ok(())
 }
@@ -418,45 +468,88 @@ async fn select_distinct() -> Result<()> {
 }
 
 #[tokio::test]
-async fn select_distinct_simple() -> Result<()> {
+async fn select_distinct_simple_1() {
     let mut ctx = ExecutionContext::new();
-    register_aggregate_simple_csv(&mut ctx)?;
+    register_aggregate_simple_csv(&mut ctx).unwrap();
 
     let sql = "SELECT DISTINCT c1 FROM aggregate_simple order by c1";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
 
     let expected = vec![
-        vec!["0.00001"],
-        vec!["0.00002"],
-        vec!["0.00003"],
-        vec!["0.00004"],
-        vec!["0.00005"],
+        "+---------+",
+        "| c1      |",
+        "+---------+",
+        "| 0.00001 |",
+        "| 0.00002 |",
+        "| 0.00003 |",
+        "| 0.00004 |",
+        "| 0.00005 |",
+        "+---------+",
     ];
-    assert_eq!(actual, expected);
+    assert_batches_eq!(expected, &actual);
+}
+
+#[tokio::test]
+async fn select_distinct_simple_2() {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_simple_csv(&mut ctx).unwrap();
 
     let sql = "SELECT DISTINCT c1, c2 FROM aggregate_simple order by c1";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
 
     let expected = vec![
-        vec!["0.00001", "0.000000000001"],
-        vec!["0.00002", "0.000000000002"],
-        vec!["0.00003", "0.000000000003"],
-        vec!["0.00004", "0.000000000004"],
-        vec!["0.00005", "0.000000000005"],
+        "+---------+----------------+",
+        "| c1      | c2             |",
+        "+---------+----------------+",
+        "| 0.00001 | 0.000000000001 |",
+        "| 0.00002 | 0.000000000002 |",
+        "| 0.00003 | 0.000000000003 |",
+        "| 0.00004 | 0.000000000004 |",
+        "| 0.00005 | 0.000000000005 |",
+        "+---------+----------------+",
     ];
-    assert_eq!(actual, expected);
+    assert_batches_eq!(expected, &actual);
+}
+
+#[tokio::test]
+async fn select_distinct_simple_3() {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_simple_csv(&mut ctx).unwrap();
 
     let sql = "SELECT distinct c3 FROM aggregate_simple order by c3";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
 
-    let expected = vec![vec!["false"], vec!["true"]];
-    assert_eq!(actual, expected);
+    let expected = vec![
+        "+-------+",
+        "| c3    |",
+        "+-------+",
+        "| false |",
+        "| true  |",
+        "+-------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+}
+
+#[tokio::test]
+async fn select_distinct_simple_4() {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_simple_csv(&mut ctx).unwrap();
 
     let sql = "SELECT distinct c1+c2 as a FROM aggregate_simple";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
 
-    assert_eq!(actual.len(), 5);
-    Ok(())
+    let expected = vec![
+        "+-------------------------+",
+        "| a                       |",
+        "+-------------------------+",
+        "| 0.000030000002242136256 |",
+        "| 0.000040000002989515004 |",
+        "| 0.000010000000747378751 |",
+        "| 0.00005000000373689376  |",
+        "| 0.000020000001494757502 |",
+        "+-------------------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &actual);
 }
 
 #[tokio::test]
@@ -464,10 +557,10 @@ async fn projection_same_fields() -> Result<()> {
     let mut ctx = ExecutionContext::new();
 
     let sql = "select (1+1) as a from (select 1 as a);";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
 
-    let expected = vec![vec!["2"]];
-    assert_eq!(actual, expected);
+    let expected = vec!["+---+", "| a |", "+---+", "| 2 |", "+---+"];
+    assert_batches_eq!(expected, &actual);
 
     Ok(())
 }
@@ -479,16 +572,20 @@ async fn csv_query_group_by_float64() -> Result<()> {
 
     let sql =
         "SELECT COUNT(*) as cnt, c2 FROM aggregate_simple GROUP BY c2 ORDER BY cnt DESC";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
 
     let expected = vec![
-        vec!["5", "0.000000000005"],
-        vec!["4", "0.000000000004"],
-        vec!["3", "0.000000000003"],
-        vec!["2", "0.000000000002"],
-        vec!["1", "0.000000000001"],
+        "+-----+----------------+",
+        "| cnt | c2             |",
+        "+-----+----------------+",
+        "| 5   | 0.000000000005 |",
+        "| 4   | 0.000000000004 |",
+        "| 3   | 0.000000000003 |",
+        "| 2   | 0.000000000002 |",
+        "| 1   | 0.000000000001 |",
+        "+-----+----------------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_eq!(expected, &actual);
 
     Ok(())
 }
@@ -500,10 +597,17 @@ async fn csv_query_group_by_boolean() -> Result<()> {
 
     let sql =
         "SELECT COUNT(*) as cnt, c3 FROM aggregate_simple GROUP BY c3 ORDER BY cnt DESC";
-    let actual = execute(&mut ctx, sql).await;
+    let actual = execute_to_batches(&mut ctx, sql).await;
 
-    let expected = vec![vec!["9", "true"], vec!["6", "false"]];
-    assert_eq!(expected, actual);
+    let expected = vec![
+        "+-----+-------+",
+        "| cnt | c3    |",
+        "+-----+-------+",
+        "| 9   | true  |",
+        "| 6   | false |",
+        "+-----+-------+",
+    ];
+    assert_batches_eq!(expected, &actual);
 
     Ok(())
 }
@@ -513,36 +617,39 @@ async fn csv_query_group_by_two_columns() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c1, c2, MIN(c3) FROM aggregate_test_100 GROUP BY c1, c2";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["a", "1", "-85"],
-        vec!["a", "2", "-48"],
-        vec!["a", "3", "-72"],
-        vec!["a", "4", "-101"],
-        vec!["a", "5", "-101"],
-        vec!["b", "1", "12"],
-        vec!["b", "2", "-60"],
-        vec!["b", "3", "-101"],
-        vec!["b", "4", "-117"],
-        vec!["b", "5", "-82"],
-        vec!["c", "1", "-24"],
-        vec!["c", "2", "-117"],
-        vec!["c", "3", "-2"],
-        vec!["c", "4", "-90"],
-        vec!["c", "5", "-94"],
-        vec!["d", "1", "-99"],
-        vec!["d", "2", "93"],
-        vec!["d", "3", "-76"],
-        vec!["d", "4", "5"],
-        vec!["d", "5", "-59"],
-        vec!["e", "1", "36"],
-        vec!["e", "2", "-61"],
-        vec!["e", "3", "-95"],
-        vec!["e", "4", "-56"],
-        vec!["e", "5", "-86"],
+        "+----+----+---------+",
+        "| c1 | c2 | MIN(c3) |",
+        "+----+----+---------+",
+        "| a  | 1  | -85     |",
+        "| a  | 2  | -48     |",
+        "| a  | 3  | -72     |",
+        "| a  | 4  | -101    |",
+        "| a  | 5  | -101    |",
+        "| b  | 1  | 12      |",
+        "| b  | 2  | -60     |",
+        "| b  | 3  | -101    |",
+        "| b  | 4  | -117    |",
+        "| b  | 5  | -82     |",
+        "| c  | 1  | -24     |",
+        "| c  | 2  | -117    |",
+        "| c  | 3  | -2      |",
+        "| c  | 4  | -90     |",
+        "| c  | 5  | -94     |",
+        "| d  | 1  | -99     |",
+        "| d  | 2  | 93      |",
+        "| d  | 3  | -76     |",
+        "| d  | 4  | 5       |",
+        "| d  | 5  | -59     |",
+        "| e  | 1  | 36      |",
+        "| e  | 2  | -61     |",
+        "| e  | 3  | -95     |",
+        "| e  | 4  | -56     |",
+        "| e  | 5  | -86     |",
+        "+----+----+---------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
 
@@ -551,10 +658,16 @@ async fn csv_query_group_by_and_having() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c1, MIN(c3) AS m FROM aggregate_test_100 GROUP BY c1 HAVING m < -100 AND MAX(c3) > 70";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
-    let expected = vec![vec!["a", "-101"], vec!["c", "-117"]];
-    assert_eq!(expected, actual);
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+------+",
+        "| c1 | m    |",
+        "+----+------+",
+        "| a  | -101 |",
+        "| c  | -117 |",
+        "+----+------+",
+    ];
+    assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
 
@@ -567,10 +680,15 @@ async fn csv_query_group_by_and_having_and_where() -> Result<()> {
                WHERE c1 IN ('a', 'b')
                GROUP BY c1
                HAVING m < -100 AND MAX(c3) > 70";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
-    let expected = vec![vec!["a", "-101"]];
-    assert_eq!(expected, actual);
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+----+------+",
+        "| c1 | m    |",
+        "+----+------+",
+        "| a  | -101 |",
+        "+----+------+",
+    ];
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -581,10 +699,9 @@ async fn all_where_empty() -> Result<()> {
     let sql = "SELECT *
                FROM aggregate_test_100
                WHERE 1=2";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
-    let expected: Vec<Vec<String>> = vec![];
-    assert_eq!(expected, actual);
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec!["++", "++"];
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -593,16 +710,19 @@ async fn csv_query_having_without_group_by() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c1, c2, c3 FROM aggregate_test_100 HAVING c2 >= 4 AND c3 > 90";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["c", "4", "123"],
-        vec!["c", "5", "118"],
-        vec!["d", "4", "102"],
-        vec!["e", "4", "96"],
-        vec!["e", "4", "97"],
+        "+----+----+-----+",
+        "| c1 | c2 | c3  |",
+        "+----+----+-----+",
+        "| c  | 4  | 123 |",
+        "| c  | 5  | 118 |",
+        "| d  | 4  | 102 |",
+        "| e  | 4  | 96  |",
+        "| e  | 4  | 97  |",
+        "+----+----+-----+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
 
@@ -721,16 +841,19 @@ async fn csv_query_group_by_avg() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c1, avg(c12) FROM aggregate_test_100 GROUP BY c1";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["a", "0.48754517466109415"],
-        vec!["b", "0.41040709263815384"],
-        vec!["c", "0.6600456536439784"],
-        vec!["d", "0.48855379387549824"],
-        vec!["e", "0.48600669271341534"],
+        "+----+---------------------+",
+        "| c1 | AVG(c12)            |",
+        "+----+---------------------+",
+        "| a  | 0.48754517466109415 |",
+        "| b  | 0.41040709263815384 |",
+        "| c  | 0.6600456536439784  |",
+        "| d  | 0.48855379387549824 |",
+        "| e  | 0.48600669271341534 |",
+        "+----+---------------------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
 
@@ -739,16 +862,19 @@ async fn csv_query_group_by_avg_with_projection() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT avg(c12), c1 FROM aggregate_test_100 GROUP BY c1";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["0.41040709263815384", "b"],
-        vec!["0.48600669271341534", "e"],
-        vec!["0.48754517466109415", "a"],
-        vec!["0.48855379387549824", "d"],
-        vec!["0.6600456536439784", "c"],
+        "+---------------------+----+",
+        "| AVG(c12)            | c1 |",
+        "+---------------------+----+",
+        "| 0.41040709263815384 | b  |",
+        "| 0.48600669271341534 | e  |",
+        "| 0.48754517466109415 | a  |",
+        "| 0.48855379387549824 | d  |",
+        "| 0.6600456536439784  | c  |",
+        "+---------------------+----+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
 
@@ -800,9 +926,15 @@ async fn csv_query_count() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT count(c12) FROM aggregate_test_100";
-    let actual = execute(&mut ctx, sql).await;
-    let expected = vec![vec!["100"]];
-    assert_eq!(expected, actual);
+    let actual = execute_to_batches(&mut ctx, sql).await;
+    let expected = vec![
+        "+------------+",
+        "| COUNT(c12) |",
+        "+------------+",
+        "| 100        |",
+        "+------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -811,65 +943,29 @@ async fn csv_query_window_with_empty_over() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "select \
-    c9, \
-    count(c5) over (), \
-    max(c5) over (), \
-    min(c5) over (), \
-    first_value(c5) over (), \
-    last_value(c5) over (), \
-    nth_value(c5, 2) over () \
-    from aggregate_test_100 \
-    order by c9 \
-    limit 5";
-    let actual = execute(&mut ctx, sql).await;
+               c9, \
+               count(c5) over (), \
+               max(c5) over (), \
+               min(c5) over (), \
+               first_value(c5) over (), \
+               last_value(c5) over (), \
+               nth_value(c5, 2) over () \
+               from aggregate_test_100 \
+               order by c9 \
+               limit 5";
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec![
-            "28774375",
-            "100",
-            "2143473091",
-            "-2141999138",
-            "2033001162",
-            "61035129",
-            "706441268",
-        ],
-        vec![
-            "63044568",
-            "100",
-            "2143473091",
-            "-2141999138",
-            "2033001162",
-            "61035129",
-            "706441268",
-        ],
-        vec![
-            "141047417",
-            "100",
-            "2143473091",
-            "-2141999138",
-            "2033001162",
-            "61035129",
-            "706441268",
-        ],
-        vec![
-            "141680161",
-            "100",
-            "2143473091",
-            "-2141999138",
-            "2033001162",
-            "61035129",
-            "706441268",
-        ],
-        vec![
-            "145294611",
-            "100",
-            "2143473091",
-            "-2141999138",
-            "2033001162",
-            "61035129",
-            "706441268",
-        ],
+        "+-----------+-----------+------------+-------------+-----------------+----------------+------------------------+",
+        "| c9        | COUNT(c5) | MAX(c5)    | MIN(c5)     | FIRST_VALUE(c5) | LAST_VALUE(c5) | NTH_VALUE(c5,Int64(2)) |",
+        "+-----------+-----------+------------+-------------+-----------------+----------------+------------------------+",
+        "| 28774375  | 100       | 2143473091 | -2141999138 | 2033001162      | 61035129       | 706441268              |",
+        "| 63044568  | 100       | 2143473091 | -2141999138 | 2033001162      | 61035129       | 706441268              |",
+        "| 141047417 | 100       | 2143473091 | -2141999138 | 2033001162      | 61035129       | 706441268              |",
+        "| 141680161 | 100       | 2143473091 | -2141999138 | 2033001162      | 61035129       | 706441268              |",
+        "| 145294611 | 100       | 2143473091 | -2141999138 | 2033001162      | 61035129       | 706441268              |",
+        "+-----------+-----------+------------+-------------+-----------------+----------------+------------------------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -878,62 +974,31 @@ async fn csv_query_window_with_partition_by() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "select \
-    c9, \
-    sum(cast(c4 as Int)) over (partition by c3), \
-    avg(cast(c4 as Int)) over (partition by c3), \
-    count(cast(c4 as Int)) over (partition by c3), \
-    max(cast(c4 as Int)) over (partition by c3), \
-    min(cast(c4 as Int)) over (partition by c3), \
-    first_value(cast(c4 as Int)) over (partition by c3), \
-    last_value(cast(c4 as Int)) over (partition by c3), \
-    nth_value(cast(c4 as Int), 2) over (partition by c3) \
-    from aggregate_test_100 \
-    order by c9 \
-    limit 5";
-    let actual = execute(&mut ctx, sql).await;
+               c9, \
+               sum(cast(c4 as Int)) over (partition by c3), \
+               avg(cast(c4 as Int)) over (partition by c3), \
+               count(cast(c4 as Int)) over (partition by c3), \
+               max(cast(c4 as Int)) over (partition by c3), \
+               min(cast(c4 as Int)) over (partition by c3), \
+               first_value(cast(c4 as Int)) over (partition by c3), \
+               last_value(cast(c4 as Int)) over (partition by c3), \
+               nth_value(cast(c4 as Int), 2) over (partition by c3) \
+               from aggregate_test_100 \
+               order by c9 \
+               limit 5";
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec![
-            "28774375", "-16110", "-16110", "1", "-16110", "-16110", "-16110", "-16110",
-            "NULL",
-        ],
-        vec![
-            "63044568", "3917", "3917", "1", "3917", "3917", "3917", "3917", "NULL",
-        ],
-        vec![
-            "141047417",
-            "-38455",
-            "-19227.5",
-            "2",
-            "-16974",
-            "-21481",
-            "-16974",
-            "-21481",
-            "NULL",
-        ],
-        vec![
-            "141680161",
-            "-1114",
-            "-1114",
-            "1",
-            "-1114",
-            "-1114",
-            "-1114",
-            "-1114",
-            "NULL",
-        ],
-        vec![
-            "145294611",
-            "15673",
-            "15673",
-            "1",
-            "15673",
-            "15673",
-            "15673",
-            "15673",
-            "NULL",
-        ],
+        "+-----------+------------------------+------------------------+--------------------------+------------------------+------------------------+--------------------------------+-------------------------------+---------------------------------------+",
+        "| c9        | SUM(CAST(c4 AS Int32)) | AVG(CAST(c4 AS Int32)) | COUNT(CAST(c4 AS Int32)) | MAX(CAST(c4 AS Int32)) | MIN(CAST(c4 AS Int32)) | FIRST_VALUE(CAST(c4 AS Int32)) | LAST_VALUE(CAST(c4 AS Int32)) | NTH_VALUE(CAST(c4 AS Int32),Int64(2)) |",
+        "+-----------+------------------------+------------------------+--------------------------+------------------------+------------------------+--------------------------------+-------------------------------+---------------------------------------+",
+        "| 28774375  | -16110                 | -16110                 | 1                        | -16110                 | -16110                 | -16110                         | -16110                        |                                       |",
+        "| 63044568  | 3917                   | 3917                   | 1                        | 3917                   | 3917                   | 3917                           | 3917                          |                                       |",
+        "| 141047417 | -38455                 | -19227.5               | 2                        | -16974                 | -21481                 | -16974                         | -21481                        |                                       |",
+        "| 141680161 | -1114                  | -1114                  | 1                        | -1114                  | -1114                  | -1114                          | -1114                         |                                       |",
+        "| 145294611 | 15673                  | 15673                  | 1                        | 15673                  | 15673                  | 15673                          | 15673                         |                                       |",
+        "+-----------+------------------------+------------------------+--------------------------+------------------------+------------------------+--------------------------------+-------------------------------+---------------------------------------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -942,70 +1007,31 @@ async fn csv_query_window_with_order_by() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "select \
-    c9, \
-    sum(c5) over (order by c9), \
-    avg(c5) over (order by c9), \
-    count(c5) over (order by c9), \
-    max(c5) over (order by c9), \
-    min(c5) over (order by c9), \
-    first_value(c5) over (order by c9), \
-    last_value(c5) over (order by c9), \
-    nth_value(c5, 2) over (order by c9) \
-    from aggregate_test_100 \
-    order by c9 \
-    limit 5";
-    let actual = execute(&mut ctx, sql).await;
+               c9, \
+               sum(c5) over (order by c9), \
+               avg(c5) over (order by c9), \
+               count(c5) over (order by c9), \
+               max(c5) over (order by c9), \
+               min(c5) over (order by c9), \
+               first_value(c5) over (order by c9), \
+               last_value(c5) over (order by c9), \
+               nth_value(c5, 2) over (order by c9) \
+               from aggregate_test_100 \
+               order by c9 \
+               limit 5";
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec![
-            "28774375", "61035129", "61035129", "1", "61035129", "61035129", "61035129",
-            "61035129", "NULL",
-        ],
-        vec![
-            "63044568",
-            "-47938237",
-            "-23969118.5",
-            "2",
-            "61035129",
-            "-108973366",
-            "61035129",
-            "-108973366",
-            "-108973366",
-        ],
-        vec![
-            "141047417",
-            "575165281",
-            "191721760.33333334",
-            "3",
-            "623103518",
-            "-108973366",
-            "61035129",
-            "623103518",
-            "-108973366",
-        ],
-        vec![
-            "141680161",
-            "-1352462829",
-            "-338115707.25",
-            "4",
-            "623103518",
-            "-1927628110",
-            "61035129",
-            "-1927628110",
-            "-108973366",
-        ],
-        vec![
-            "145294611",
-            "-3251637940",
-            "-650327588",
-            "5",
-            "623103518",
-            "-1927628110",
-            "61035129",
-            "-1899175111",
-            "-108973366",
-        ],
+        "+-----------+-------------+--------------------+-----------+-----------+-------------+-----------------+----------------+------------------------+",
+        "| c9        | SUM(c5)     | AVG(c5)            | COUNT(c5) | MAX(c5)   | MIN(c5)     | FIRST_VALUE(c5) | LAST_VALUE(c5) | NTH_VALUE(c5,Int64(2)) |",
+        "+-----------+-------------+--------------------+-----------+-----------+-------------+-----------------+----------------+------------------------+",
+        "| 28774375  | 61035129    | 61035129           | 1         | 61035129  | 61035129    | 61035129        | 61035129       |                        |",
+        "| 63044568  | -47938237   | -23969118.5        | 2         | 61035129  | -108973366  | 61035129        | -108973366     | -108973366             |",
+        "| 141047417 | 575165281   | 191721760.33333334 | 3         | 623103518 | -108973366  | 61035129        | 623103518      | -108973366             |",
+        "| 141680161 | -1352462829 | -338115707.25      | 4         | 623103518 | -1927628110 | 61035129        | -1927628110    | -108973366             |",
+        "| 145294611 | -3251637940 | -650327588         | 5         | 623103518 | -1927628110 | 61035129        | -1899175111    | -108973366             |",
+        "+-----------+-------------+--------------------+-----------+-----------+-------------+-----------------+----------------+------------------------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_eq!(expected, &actual);
     Ok(())
 }
 
@@ -1014,16 +1040,19 @@ async fn csv_query_group_by_int_count() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c1, count(c12) FROM aggregate_test_100 GROUP BY c1";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["a", "21"],
-        vec!["b", "19"],
-        vec!["c", "21"],
-        vec!["d", "18"],
-        vec!["e", "21"],
+        "+----+------------+",
+        "| c1 | COUNT(c12) |",
+        "+----+------------+",
+        "| a  | 21         |",
+        "| b  | 19         |",
+        "| c  | 21         |",
+        "| d  | 18         |",
+        "| e  | 21         |",
+        "+----+------------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
 
@@ -1032,16 +1061,19 @@ async fn csv_query_group_with_aliased_aggregate() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c1, count(c12) AS count FROM aggregate_test_100 GROUP BY c1";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["a", "21"],
-        vec!["b", "19"],
-        vec!["c", "21"],
-        vec!["d", "18"],
-        vec!["e", "21"],
+        "+----+-------+",
+        "| c1 | count |",
+        "+----+-------+",
+        "| a  | 21    |",
+        "| b  | 19    |",
+        "| c  | 21    |",
+        "| d  | 18    |",
+        "| e  | 21    |",
+        "+----+-------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
 
@@ -1050,19 +1082,24 @@ async fn csv_query_group_by_string_min_max() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx)?;
     let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 GROUP BY c1";
-    let mut actual = execute(&mut ctx, sql).await;
-    actual.sort();
+    let actual = execute_to_batches(&mut ctx, sql).await;
     let expected = vec![
-        vec!["a", "0.02182578039211991", "0.9800193410444061"],
-        vec!["b", "0.04893135681998029", "0.9185813970744787"],
-        vec!["c", "0.0494924465469434", "0.991517828651004"],
-        vec!["d", "0.061029375346466685", "0.9748360509016578"],
-        vec!["e", "0.01479305307777301", "0.9965400387585364"],
+        "+----+----------------------+--------------------+",
+        "| c1 | MIN(c12)             | MAX(c12)           |",
+        "+----+----------------------+--------------------+",
+        "| a  | 0.02182578039211991  | 0.9800193410444061 |",
+        "| b  | 0.04893135681998029  | 0.9185813970744787 |",
+        "| c  | 0.0494924465469434   | 0.991517828651004  |",
+        "| d  | 0.061029375346466685 | 0.9748360509016578 |",
+        "| e  | 0.01479305307777301  | 0.9965400387585364 |",
+        "+----+----------------------+--------------------+",
     ];
-    assert_eq!(expected, actual);
+    assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
 
+// --- End Test Porting ---
+
 #[tokio::test]
 async fn csv_query_cast() -> Result<()> {
     let mut ctx = ExecutionContext::new();
@@ -3850,11 +3887,11 @@ async fn test_physical_plan_display_indent() {
     let mut ctx = ExecutionContext::with_config(config);
     register_aggregate_csv(&mut ctx).unwrap();
     let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
-         FROM aggregate_test_100 \
-         WHERE c12 < 10 \
-         GROUP BY c1 \
-         ORDER BY the_min DESC \
-         LIMIT 10";
+               FROM aggregate_test_100 \
+               WHERE c12 < 10 \
+               GROUP BY c1 \
+               ORDER BY the_min DESC \
+               LIMIT 10";
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
 
@@ -3872,7 +3909,7 @@ async fn test_physical_plan_display_indent() {
         "                  FilterExec: c12@1 < CAST(10 AS Float64)",
         "                    RepartitionExec: partitioning=RoundRobinBatch(3)",
         "                      CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
-        ];
+    ];
 
     let data_path = datafusion::test_util::arrow_test_data();
     let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
@@ -3898,10 +3935,10 @@ async fn test_physical_plan_display_indent_multi_children() {
     register_aggregate_csv(&mut ctx).unwrap();
     let sql = "SELECT c1 \
                FROM (select c1 from aggregate_test_100)\
-                 JOIN\
-                    (select c1 as c2 from aggregate_test_100)\
-                 ON c1=c2\
-                 ";
+               JOIN\
+               (select c1 as c2 from aggregate_test_100)\
+               ON c1=c2\
+               ";
 
     let plan = ctx.create_logical_plan(sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();