You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/02 11:41:29 UTC

[GitHub] [spark] grundprinzip opened a new pull request, #38883: [Spark 41366][CONNECT] DF.groupby.agg() should be compatible

grundprinzip opened a new pull request, #38883:
URL: https://github.com/apache/spark/pull/38883

   ### What changes were proposed in this pull request?
   This patch fixes small inconsistencies between the PySpark API and Spark Connect.
   
   ### Why are the changes needed?
   Compatibility
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1040470566


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38883: [Spark 41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38883:
URL: https://github.com/apache/spark/pull/38883#issuecomment-1336002584

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38883: [Spark 41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1038160665


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +60,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
+        ... def min_udf(v):
+        ...     return v.min()
+        ...
+        >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  # doctest: +SKIP
+        +-----+------------+
+        | name|min_udf(age)|
+        +-----+------------+
+        |Alice|           2|
+        |  Bob|           5|
+        +-----+------------+
+        """
+        assert exprs, "exprs should not be empty"
+        if len(exprs) == 1 and isinstance(exprs[0], dict):
+            from pyspark.sql.connect.function_builder import functions as FB
+
+            # Convert the dict into key value pairs
+            measures = [Column(ScalarFunctionExpression(exprs[0][k], col(k))) for k in exprs[0]]

Review Comment:
   I think this PR needs rebasing now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1040844793


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -397,7 +397,7 @@ class SparkConnectPlanner(session: SparkSession) {
   }
 
   private def transformUnresolvedExpression(exp: proto.Expression): UnresolvedAttribute = {
-    UnresolvedAttribute(exp.getUnresolvedAttribute.getUnparsedIdentifier)
+    UnresolvedAttribute.quotedString(exp.getUnresolvedAttribute.getUnparsedIdentifier)

Review Comment:
   This is tricky... it's `CatalystSqlParser.parseMultipartIdentifier` v.s. `parseAttributeName` under the hood. Ideally `parseMultipartIdentifier` is more standard but following existing pyspark behavior is more important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38883: [Spark 41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1038165826


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +60,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
+        ... def min_udf(v):
+        ...     return v.min()
+        ...
+        >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  # doctest: +SKIP
+        +-----+------------+
+        | name|min_udf(age)|
+        +-----+------------+
+        |Alice|           2|
+        |  Bob|           5|
+        +-----+------------+
+        """
+        assert exprs, "exprs should not be empty"
+        if len(exprs) == 1 and isinstance(exprs[0], dict):
+            from pyspark.sql.connect.function_builder import functions as FB
+
+            # Convert the dict into key value pairs
+            measures = [Column(ScalarFunctionExpression(exprs[0][k], col(k))) for k in exprs[0]]

Review Comment:
   Do you mean anything in particular? I had just built this on top of recent master.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1042916333


##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -21,6 +21,7 @@
 
 import grpc  # type: ignore
 
+from pyspark.sql.connect.column import Column

Review Comment:
   This line should be under `if have_pandas`. Here is a follow-up PR.
   https://github.com/apache/spark/pull/38976



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1040141502


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
+        ... def min_udf(v):
+        ...     return v.min()
+        ...
+        >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  # doctest: +SKIP
+        +-----+------------+
+        | name|min_udf(age)|
+        +-----+------------+
+        |Alice|           2|
+        |  Bob|           5|
+        +-----+------------+
+        """
+        assert exprs, "exprs should not be empty"
+        if len(exprs) == 1 and isinstance(exprs[0], dict):
+            from pyspark.sql.connect.function_builder import functions as FB
+
+            # Convert the dict into key value pairs
+            measures = [scalar_function(exprs[0][k], col(k)) for k in exprs[0]]
+        else:
+            # Columns
+            assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"

Review Comment:
   Nit:
   
   Do we need to put the full package name into the error message given that we have two Columns: spark.sql.column and spark.sql.connect.column? 
   
   Do you foresee when users use connect client, will they be confused on two columns or that is not the case after we have proper packaging/releasing implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1040143187


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP

Review Comment:
   Actually maybe should remove this: this is not supported by Connect python client yet (and probably we also don't have a timeline).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on PR #38883:
URL: https://github.com/apache/spark/pull/38883#issuecomment-1340276233

   merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1043011644


##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -21,6 +21,7 @@
 
 import grpc  # type: ignore
 
+from pyspark.sql.connect.column import Column

Review Comment:
   Thanks for following this up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38883: [Spark 41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1038176485


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +60,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
+        ... def min_udf(v):
+        ...     return v.min()
+        ...
+        >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  # doctest: +SKIP
+        +-----+------------+
+        | name|min_udf(age)|
+        +-----+------------+
+        |Alice|           2|
+        |  Bob|           5|
+        +-----+------------+
+        """
+        assert exprs, "exprs should not be empty"
+        if len(exprs) == 1 and isinstance(exprs[0], dict):
+            from pyspark.sql.connect.function_builder import functions as FB
+
+            # Convert the dict into key value pairs
+            measures = [Column(ScalarFunctionExpression(exprs[0][k], col(k))) for k in exprs[0]]

Review Comment:
   ok :( will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1040143187


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP

Review Comment:
   Actually maybe should modify this: this is not supported by Connect python client yet....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38883: [Spark 41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1038173508


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +60,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
+        ... def min_udf(v):
+        ...     return v.min()
+        ...
+        >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  # doctest: +SKIP
+        +-----+------------+
+        | name|min_udf(age)|
+        +-----+------------+
+        |Alice|           2|
+        |  Bob|           5|
+        +-----+------------+
+        """
+        assert exprs, "exprs should not be empty"
+        if len(exprs) == 1 and isinstance(exprs[0], dict):
+            from pyspark.sql.connect.function_builder import functions as FB
+
+            # Convert the dict into key value pairs
+            measures = [Column(ScalarFunctionExpression(exprs[0][k], col(k))) for k in exprs[0]]

Review Comment:
   `ScalarFunctionExpression` was just changed in https://github.com/apache/spark/commit/70ab135813fd224fdc80ab30622e57399a3a49f2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1040469381


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
+        ... def min_udf(v):
+        ...     return v.min()
+        ...
+        >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  # doctest: +SKIP
+        +-----+------------+
+        | name|min_udf(age)|
+        +-----+------------+
+        |Alice|           2|
+        |  Bob|           5|
+        +-----+------------+
+        """
+        assert exprs, "exprs should not be empty"
+        if len(exprs) == 1 and isinstance(exprs[0], dict):
+            from pyspark.sql.connect.function_builder import functions as FB

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1041316676


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -397,7 +397,7 @@ class SparkConnectPlanner(session: SparkSession) {
   }
 
   private def transformUnresolvedExpression(exp: proto.Expression): UnresolvedAttribute = {
-    UnresolvedAttribute(exp.getUnresolvedAttribute.getUnparsedIdentifier)
+    UnresolvedAttribute.quotedString(exp.getUnresolvedAttribute.getUnparsedIdentifier)

Review Comment:
   > This is tricky... it's `CatalystSqlParser.parseMultipartIdentifier` v.s. `parseAttributeName` under the hood. Ideally `parseMultipartIdentifier` is more standard but following existing pyspark behavior is more important.
   
   This follows not just Pyspark but the Column API in Scala as well. There the string constructor uses exactly this method call. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1040141502


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
+        ... def min_udf(v):
+        ...     return v.min()
+        ...
+        >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  # doctest: +SKIP
+        +-----+------------+
+        | name|min_udf(age)|
+        +-----+------------+
+        |Alice|           2|
+        |  Bob|           5|
+        +-----+------------+
+        """
+        assert exprs, "exprs should not be empty"
+        if len(exprs) == 1 and isinstance(exprs[0], dict):
+            from pyspark.sql.connect.function_builder import functions as FB
+
+            # Convert the dict into key value pairs
+            measures = [scalar_function(exprs[0][k], col(k)) for k in exprs[0]]
+        else:
+            # Columns
+            assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"

Review Comment:
   Nit:
   
   Do we need to put the full package name into the error message given that we have two Columns: spark.sql.column and spark.sql.connect.column? 
   
   Do you foresee when users use connect client, will they be confused on two columns?



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0

Review Comment:
   ```suggestion
           .. versionadded:: 3.4.0
   ```



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
+        ... def min_udf(v):
+        ...     return v.min()
+        ...
+        >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  # doctest: +SKIP
+        +-----+------------+
+        | name|min_udf(age)|
+        +-----+------------+
+        |Alice|           2|
+        |  Bob|           5|
+        +-----+------------+
+        """
+        assert exprs, "exprs should not be empty"
+        if len(exprs) == 1 and isinstance(exprs[0], dict):
+            from pyspark.sql.connect.function_builder import functions as FB

Review Comment:
   This import is not used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1040470307


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
+        ... def min_udf(v):
+        ...     return v.min()
+        ...
+        >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show()  # doctest: +SKIP
+        +-----+------------+
+        | name|min_udf(age)|
+        +-----+------------+
+        |Alice|           2|
+        |  Bob|           5|
+        +-----+------------+
+        """
+        assert exprs, "exprs should not be empty"
+        if len(exprs) == 1 and isinstance(exprs[0], dict):
+            from pyspark.sql.connect.function_builder import functions as FB
+
+            # Convert the dict into key value pairs
+            measures = [scalar_function(exprs[0][k], col(k)) for k in exprs[0]]
+        else:
+            # Columns
+            assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"

Review Comment:
   Right now this is the literal code from PySpark, we probably have to revisit this once Hyukjins patch is further along



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38883:
URL: https://github.com/apache/spark/pull/38883#discussion_r1040470477


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -55,8 +59,109 @@ def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None:
         self._df = df
         self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols]
 
-    def agg(self, measures: Sequence[Column]) -> "DataFrame":
-        assert len(measures) > 0, "exprs should not be empty"
+    @overload
+    def agg(self, *exprs: Column) -> "DataFrame":
+        ...
+
+    @overload
+    def agg(self, __exprs: Dict[str, str]) -> "DataFrame":
+        ...
+
+    def agg(self, *exprs: Union[Column, Dict[str, str]]) -> "DataFrame":
+        """Compute aggregates and returns the result as a :class:`DataFrame`.
+
+        The available aggregate functions can be:
+
+        1. built-in aggregation functions, such as `avg`, `max`, `min`, `sum`, `count`
+
+        2. group aggregate pandas UDFs, created with :func:`pyspark.sql.functions.pandas_udf`
+
+           .. note:: There is no partial aggregation with group aggregate UDFs, i.e.,
+               a full shuffle is required. Also, all the data of a group will be loaded into
+               memory, so the user should be aware of the potential OOM risk if data is skewed
+               and certain groups are too large to fit in memory.
+
+           .. seealso:: :func:`pyspark.sql.functions.pandas_udf`
+
+        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
+        is the column to perform aggregation on, and the value is the aggregate function.
+
+        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.
+
+        .. versionadded:: 1.3.0
+
+        Parameters
+        ----------
+        exprs : dict
+            a dict mapping from column name (string) to aggregate functions (string),
+            or a list of :class:`Column`.
+
+        Notes
+        -----
+        Built-in aggregation functions and group aggregate pandas UDFs cannot be mixed
+        in a single call to this function.
+
+        Examples
+        --------
+        >>> from pyspark.sql import functions as F
+        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> df = spark.createDataFrame(
+        ...      [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"])
+        >>> df.show()
+        +---+-----+
+        |age| name|
+        +---+-----+
+        |  2|Alice|
+        |  3|Alice|
+        |  5|  Bob|
+        | 10|  Bob|
+        +---+-----+
+
+        Group-by name, and count each group.
+
+        >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show()
+        +-----+--------+
+        | name|count(1)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       2|
+        +-----+--------+
+
+        Group-by name, and calculate the minimum age.
+
+        >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show()
+        +-----+--------+
+        | name|min(age)|
+        +-----+--------+
+        |Alice|       2|
+        |  Bob|       5|
+        +-----+--------+
+
+        Same as above but uses pandas UDF.
+
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible

Posted by GitBox <gi...@apache.org>.
hvanhovell closed pull request #38883: [SPARK-41366][CONNECT] DF.groupby.agg() should be compatible
URL: https://github.com/apache/spark/pull/38883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org