You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/18 00:50:07 UTC

[spark] branch branch-3.4 updated: [SPARK-42481][CONNECT] Implement agg.{max,min,mean,count,avg,sum}

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 681559ea46e [SPARK-42481][CONNECT] Implement agg.{max,min,mean,count,avg,sum}
681559ea46e is described below

commit 681559ea46ee160f5ece8d2743aab186a7a45aa3
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Fri Feb 17 20:49:40 2023 -0400

    [SPARK-42481][CONNECT] Implement agg.{max,min,mean,count,avg,sum}
    
    ### What changes were proposed in this pull request?
    
    Adding more API to `agg` including max,min,mean,count,avg,sum.
    
    ### Why are the changes needed?
    
    API coverage
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #40070 from amaliujia/rw-agg2.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit 74f53b8d008b8fd570439d5cc56a0c0753ff4910)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |   9 ++
 .../spark/sql/RelationalGroupedDataset.scala       | 157 +++++++++++++++------
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  42 ++++++
 .../explain-results/groupby_agg_columns.explain    |   2 +
 .../explain-results/groupby_avg.explain            |   2 +
 .../explain-results/groupby_count.explain          |   2 +
 .../explain-results/groupby_max.explain            |   2 +
 .../explain-results/groupby_mean.explain           |   2 +
 .../explain-results/groupby_min.explain            |   2 +
 .../explain-results/groupby_sum.explain            |   2 +
 .../resources/query-tests/queries/groupby_agg.json |   3 +-
 .../query-tests/queries/groupby_agg.proto.bin      | Bin 188 -> 186 bytes
 .../query-tests/queries/groupby_agg_columns.json   |  34 +++++
 .../queries/groupby_agg_columns.proto.bin          |   7 +
 .../resources/query-tests/queries/groupby_avg.json |  34 +++++
 .../query-tests/queries/groupby_avg.proto.bin      |   7 +
 .../query-tests/queries/groupby_count.json         |  30 ++++
 .../query-tests/queries/groupby_count.proto.bin    |   6 +
 .../resources/query-tests/queries/groupby_max.json |  34 +++++
 .../query-tests/queries/groupby_max.proto.bin      |   7 +
 .../query-tests/queries/groupby_mean.json          |  34 +++++
 .../query-tests/queries/groupby_mean.proto.bin     |   7 +
 .../resources/query-tests/queries/groupby_min.json |  34 +++++
 .../query-tests/queries/groupby_min.proto.bin      |   7 +
 .../resources/query-tests/queries/groupby_sum.json |  34 +++++
 .../query-tests/queries/groupby_sum.proto.bin      |   7 +
 26 files changed, 464 insertions(+), 43 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index c39fc6100f5..3c34b45fccb 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2065,6 +2065,15 @@ class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan:
     collectResult().iterator.asInstanceOf[java.util.Iterator[T]]
   }
 
+  /**
+   * Returns the number of rows in the Dataset.
+   * @group action
+   * @since 3.4.0
+   */
+  def count(): Long = {
+    groupBy().count().collect().head.getLong(0)
+  }
+
   private def buildRepartition(numPartitions: Int, shuffle: Boolean): Dataset[T] = {
     session.newDataset { builder =>
       builder.getRepartitionBuilder
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index a3dfcb01fdc..a6d3dc2e468 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -39,14 +39,14 @@ class RelationalGroupedDataset protected[sql] (
     private[sql] val df: DataFrame,
     private[sql] val groupingExprs: Seq[proto.Expression]) {
 
-  private[this] def toDF(aggExprs: Seq[proto.Expression]): DataFrame = {
+  private[this] def toDF(aggExprs: Seq[Column]): DataFrame = {
     // TODO: support other GroupByType such as Rollup, Cube, Pivot.
     df.session.newDataset { builder =>
       builder.getAggregateBuilder
         .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
         .setInput(df.plan.getRoot)
         .addAllGroupingExpressions(groupingExprs.asJava)
-        .addAllAggregateExpressions(aggExprs.asJava)
+        .addAllAggregateExpressions(aggExprs.map(e => e.expr).asJava)
     }
   }
 
@@ -67,7 +67,7 @@ class RelationalGroupedDataset protected[sql] (
    */
   def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
     toDF((aggExpr +: aggExprs).map { case (colName, expr) =>
-      strToExpr(expr, df(colName).expr)
+      strToColumn(expr, df(colName))
     })
   }
 
@@ -88,7 +88,7 @@ class RelationalGroupedDataset protected[sql] (
    */
   def agg(exprs: Map[String, String]): DataFrame = {
     toDF(exprs.map { case (colName, expr) =>
-      strToExpr(expr, df(colName).expr)
+      strToColumn(expr, df(colName))
     }.toSeq)
   }
 
@@ -109,44 +109,119 @@ class RelationalGroupedDataset protected[sql] (
     agg(exprs.asScala.toMap)
   }
 
-  private[this] def strToExpr(expr: String, inputExpr: proto.Expression): proto.Expression = {
-    val builder = proto.Expression.newBuilder()
-
+  private[this] def strToColumn(expr: String, inputExpr: Column): Column = {
     expr.toLowerCase(Locale.ROOT) match {
-      // We special handle a few cases that have alias that are not in function registry.
-      case "avg" | "average" | "mean" =>
-        builder.getUnresolvedFunctionBuilder
-          .setFunctionName("avg")
-          .addArguments(inputExpr)
-          .setIsDistinct(false)
-      case "stddev" | "std" =>
-        builder.getUnresolvedFunctionBuilder
-          .setFunctionName("stddev")
-          .addArguments(inputExpr)
-          .setIsDistinct(false)
-      // Also special handle count because we need to take care count(*).
-      case "count" | "size" =>
-        // Turn count(*) into count(1)
-        inputExpr match {
-          case s if s.hasUnresolvedStar =>
-            val exprBuilder = proto.Expression.newBuilder
-            exprBuilder.getLiteralBuilder.setInteger(1)
-            builder.getUnresolvedFunctionBuilder
-              .setFunctionName("count")
-              .addArguments(exprBuilder)
-              .setIsDistinct(false)
-          case _ =>
-            builder.getUnresolvedFunctionBuilder
-              .setFunctionName("count")
-              .addArguments(inputExpr)
-              .setIsDistinct(false)
-        }
-      case name =>
-        builder.getUnresolvedFunctionBuilder
-          .setFunctionName(name)
-          .addArguments(inputExpr)
-          .setIsDistinct(false)
+      case "avg" | "average" | "mean" => functions.avg(inputExpr)
+      case "stddev" | "std" => functions.stddev(inputExpr)
+      case "count" | "size" => functions.count(inputExpr)
+      case name => Column.fn(name, inputExpr)
     }
-    builder.build()
+  }
+
+  /**
+   * Compute aggregates by specifying a series of aggregate columns. Note that this function by
+   * default retains the grouping columns in its output. To not retain grouping columns, set
+   * `spark.sql.retainGroupColumns` to false.
+   *
+   * The available aggregate methods are defined in [[org.apache.spark.sql.functions]].
+   *
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for each department
+   *
+   *   // Scala:
+   *   import org.apache.spark.sql.functions._
+   *   df.groupBy("department").agg(max("age"), sum("expense"))
+   *
+   *   // Java:
+   *   import static org.apache.spark.sql.functions.*;
+   *   df.groupBy("department").agg(max("age"), sum("expense"));
+   * }}}
+   *
+   * Note that before Spark 1.4, the default behavior is to NOT retain grouping columns. To change
+   * to that behavior, set config variable `spark.sql.retainGroupColumns` to `false`.
+   * {{{
+   *   // Scala, 1.3.x:
+   *   df.groupBy("department").agg($"department", max("age"), sum("expense"))
+   *
+   *   // Java, 1.3.x:
+   *   df.groupBy("department").agg(col("department"), max("age"), sum("expense"));
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def agg(expr: Column, exprs: Column*): DataFrame = {
+    toDF((expr +: exprs).map { case c =>
+      c
+    // TODO: deal with typed columns.
+    })
+  }
+
+  /**
+   * Count the number of rows for each group. The resulting `DataFrame` will also contain the
+   * grouping columns.
+   *
+   * @since 3.4.0
+   */
+  def count(): DataFrame = toDF(Seq(functions.count(functions.lit(1)).alias("count")))
+
+  /**
+   * Compute the average value for each numeric columns for each group. This is an alias for
+   * `avg`. The resulting `DataFrame` will also contain the grouping columns. When specified
+   * columns are given, only compute the average values for them.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def mean(colNames: String*): DataFrame = {
+    toDF(colNames.map(colName => functions.mean(colName)))
+  }
+
+  /**
+   * Compute the max value for each numeric columns for each group. The resulting `DataFrame` will
+   * also contain the grouping columns. When specified columns are given, only compute the max
+   * values for them.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def max(colNames: String*): DataFrame = {
+    toDF(colNames.map(colName => functions.max(colName)))
+  }
+
+  /**
+   * Compute the mean value for each numeric columns for each group. The resulting `DataFrame`
+   * will also contain the grouping columns. When specified columns are given, only compute the
+   * mean values for them.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def avg(colNames: String*): DataFrame = {
+    toDF(colNames.map(colName => functions.avg(colName)))
+  }
+
+  /**
+   * Compute the min value for each numeric column for each group. The resulting `DataFrame` will
+   * also contain the grouping columns. When specified columns are given, only compute the min
+   * values for them.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def min(colNames: String*): DataFrame = {
+    toDF(colNames.map(colName => functions.min(colName)))
+  }
+
+  /**
+   * Compute the sum for each numeric columns for each group. The resulting `DataFrame` will also
+   * contain the grouping columns. When specified columns are given, only compute the sum for
+   * them.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def sum(colNames: String*): DataFrame = {
+    toDF(colNames.map(colName => functions.sum(colName)))
   }
 }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index b591daef391..c0b3bef4fbc 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -1201,6 +1201,48 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
         "a" -> "count")
   }
 
+  test("groupby agg columns") {
+    simple
+      .groupBy(Column("id"))
+      .agg(functions.max("a"), functions.sum("b"))
+  }
+
+  test("groupby max") {
+    simple
+      .groupBy(Column("id"))
+      .max("a", "b")
+  }
+
+  test("groupby min") {
+    simple
+      .groupBy(Column("id"))
+      .min("a", "b")
+  }
+
+  test("groupby mean") {
+    simple
+      .groupBy(Column("id"))
+      .mean("a", "b")
+  }
+
+  test("groupby avg") {
+    simple
+      .groupBy(Column("id"))
+      .avg("a", "b")
+  }
+
+  test("groupby sum") {
+    simple
+      .groupBy(Column("id"))
+      .sum("a", "b")
+  }
+
+  test("groupby count") {
+    simple
+      .groupBy(Column("id"))
+      .count()
+  }
+
   test("function lit") {
     select(
       fn.lit(fn.col("id")),
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_agg_columns.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_agg_columns.explain
new file mode 100644
index 00000000000..86b919a3919
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_agg_columns.explain
@@ -0,0 +1,2 @@
+Aggregate [id#0L], [id#0L, max(a#0) AS max(a)#0, sum(b#0) AS sum(b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_avg.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_avg.explain
new file mode 100644
index 00000000000..e7c559a1bf6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_avg.explain
@@ -0,0 +1,2 @@
+Aggregate [id#0L], [id#0L, avg(a#0) AS avg(a)#0, avg(b#0) AS avg(b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_count.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_count.explain
new file mode 100644
index 00000000000..dd08ec3bd59
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_count.explain
@@ -0,0 +1,2 @@
+Aggregate [id#0L], [id#0L, count(1) AS count#0L]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_max.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_max.explain
new file mode 100644
index 00000000000..8f00ba848ca
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_max.explain
@@ -0,0 +1,2 @@
+Aggregate [id#0L], [id#0L, max(a#0) AS max(a)#0, max(b#0) AS max(b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_mean.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_mean.explain
new file mode 100644
index 00000000000..e7c559a1bf6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_mean.explain
@@ -0,0 +1,2 @@
+Aggregate [id#0L], [id#0L, avg(a#0) AS avg(a)#0, avg(b#0) AS avg(b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_min.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_min.explain
new file mode 100644
index 00000000000..b46adbdc263
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_min.explain
@@ -0,0 +1,2 @@
+Aggregate [id#0L], [id#0L, min(a#0) AS min(a)#0, min(b#0) AS min(b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_sum.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_sum.explain
new file mode 100644
index 00000000000..5d6b075bbe6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/groupby_sum.explain
@@ -0,0 +1,2 @@
+Aggregate [id#0L], [id#0L, sum(a#0) AS sum(a)#0L, sum(b#0) AS sum(b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.json b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.json
index 7838a89974d..4cf2ae0c8c2 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.json
@@ -69,8 +69,7 @@
       "unresolvedFunction": {
         "functionName": "count",
         "arguments": [{
-          "literal": {
-            "integer": 1
+          "unresolvedStar": {
           }
         }]
       }
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.proto.bin
index 9c6d1cca8a4..eed57649c45 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg_columns.json b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg_columns.json
new file mode 100644
index 00000000000..fd2264fd2ae
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg_columns.json
@@ -0,0 +1,34 @@
+{
+  "aggregate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "groupType": "GROUP_TYPE_GROUPBY",
+    "groupingExpressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }],
+    "aggregateExpressions": [{
+      "unresolvedFunction": {
+        "functionName": "max",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "sum",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg_columns.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg_columns.proto.bin
new file mode 100644
index 00000000000..b12dd5229db
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_agg_columns.proto.bin
@@ -0,0 +1,7 @@
+JP
+$Z" struct<id:bigint,a:int,b:double>
+id"
+max
+a"
+sum
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_avg.json b/connector/connect/common/src/test/resources/query-tests/queries/groupby_avg.json
new file mode 100644
index 00000000000..df4216bdd51
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_avg.json
@@ -0,0 +1,34 @@
+{
+  "aggregate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "groupType": "GROUP_TYPE_GROUPBY",
+    "groupingExpressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }],
+    "aggregateExpressions": [{
+      "unresolvedFunction": {
+        "functionName": "avg",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "avg",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
new file mode 100644
index 00000000000..33cbb49f1fe
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_avg.proto.bin
@@ -0,0 +1,7 @@
+JP
+$Z" struct<id:bigint,a:int,b:double>
+id"
+avg
+a"
+avg
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_count.json b/connector/connect/common/src/test/resources/query-tests/queries/groupby_count.json
new file mode 100644
index 00000000000..c28c167f21b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_count.json
@@ -0,0 +1,30 @@
+{
+  "aggregate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "groupType": "GROUP_TYPE_GROUPBY",
+    "groupingExpressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }],
+    "aggregateExpressions": [{
+      "alias": {
+        "expr": {
+          "unresolvedFunction": {
+            "functionName": "count",
+            "arguments": [{
+              "literal": {
+                "integer": 1
+              }
+            }]
+          }
+        },
+        "name": ["count"]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_count.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupby_count.proto.bin
new file mode 100644
index 00000000000..d3920650eb5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_count.proto.bin
@@ -0,0 +1,6 @@
+JL
+$Z" struct<id:bigint,a:int,b:double>
+id"2
+
+count
+0count
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_max.json b/connector/connect/common/src/test/resources/query-tests/queries/groupby_max.json
new file mode 100644
index 00000000000..262232063dd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_max.json
@@ -0,0 +1,34 @@
+{
+  "aggregate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "groupType": "GROUP_TYPE_GROUPBY",
+    "groupingExpressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }],
+    "aggregateExpressions": [{
+      "unresolvedFunction": {
+        "functionName": "max",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "max",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
new file mode 100644
index 00000000000..e43c9e3e325
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_max.proto.bin
@@ -0,0 +1,7 @@
+JP
+$Z" struct<id:bigint,a:int,b:double>
+id"
+max
+a"
+max
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_mean.json b/connector/connect/common/src/test/resources/query-tests/queries/groupby_mean.json
new file mode 100644
index 00000000000..df4216bdd51
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_mean.json
@@ -0,0 +1,34 @@
+{
+  "aggregate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "groupType": "GROUP_TYPE_GROUPBY",
+    "groupingExpressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }],
+    "aggregateExpressions": [{
+      "unresolvedFunction": {
+        "functionName": "avg",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "avg",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
new file mode 100644
index 00000000000..33cbb49f1fe
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_mean.proto.bin
@@ -0,0 +1,7 @@
+JP
+$Z" struct<id:bigint,a:int,b:double>
+id"
+avg
+a"
+avg
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_min.json b/connector/connect/common/src/test/resources/query-tests/queries/groupby_min.json
new file mode 100644
index 00000000000..c6c55dde8b4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_min.json
@@ -0,0 +1,34 @@
+{
+  "aggregate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "groupType": "GROUP_TYPE_GROUPBY",
+    "groupingExpressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }],
+    "aggregateExpressions": [{
+      "unresolvedFunction": {
+        "functionName": "min",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "min",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
new file mode 100644
index 00000000000..c7ad1785cd8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_min.proto.bin
@@ -0,0 +1,7 @@
+JP
+$Z" struct<id:bigint,a:int,b:double>
+id"
+min
+a"
+min
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_sum.json b/connector/connect/common/src/test/resources/query-tests/queries/groupby_sum.json
new file mode 100644
index 00000000000..48d3820dea9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_sum.json
@@ -0,0 +1,34 @@
+{
+  "aggregate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "groupType": "GROUP_TYPE_GROUPBY",
+    "groupingExpressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }],
+    "aggregateExpressions": [{
+      "unresolvedFunction": {
+        "functionName": "sum",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "sum",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
new file mode 100644
index 00000000000..673b7452703
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/groupby_sum.proto.bin
@@ -0,0 +1,7 @@
+JP
+$Z" struct<id:bigint,a:int,b:double>
+id"
+sum
+a"
+sum
+b
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org