You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/22 23:56:45 UTC

[GitHub] [spark] itholic opened a new pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF.

itholic opened a new pull request #35615:
URL: https://github.com/apache/spark/pull/35615


   ### What changes were proposed in this pull request?
   
   This PR proposes to add test util `TestGroupedAggPandasUDF` to help testing grouped aggregate pandas UDF more comfortable.
   
   ### Why are the changes needed?
   
   To improve testability by integrating various UDF tests with existing test utilities.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, it's test only.
   
   ### How was this patch tested?
   
   Manually tested, and this will be tested in the SPARK-38107 when testing grouped aggregate pandas UDF related errors.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF

Posted by GitBox <gi...@apache.org>.
HyukjinKwon closed pull request #35615:
URL: https://github.com/apache/spark/pull/35615


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35615:
URL: https://github.com/apache/spark/pull/35615#discussion_r812477288



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
##########
@@ -333,6 +367,56 @@ object IntegratedUDFTestUtils extends SQLHelper {
     val prettyName: String = "Scalar Pandas UDF"
   }
 
+  /**
+   * A Grouped Aggregate Pandas UDF that takes one column, executes the
+   * Python native function calculating the count of the column using pandas.
+   *
+   * Virtually equivalent to:
+   *
+   * {{{
+   *   import pandas as pd
+   *   from pyspark.sql.functions import pandas_udf
+   *
+   *   df = spark.createDataFrame(
+   *       [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
+   *
+   *   @pandas_udf("double")
+   *   def pandas_count(v: pd.Series) -> int:
+   *       return v.count()
+   *   count_col = pandas_count(df['v'])
+   * }}}
+   */
+  case class TestGroupedAggPandasUDF(name: String) extends TestUDF {
+    private[IntegratedUDFTestUtils] lazy val udf = new UserDefinedPythonFunction(
+      name = name,
+      func = PythonFunction(
+        command = pandasGroupedAggFunc,
+        envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]],
+        pythonIncludes = List.empty[String].asJava,
+        pythonExec = pythonExec,
+        pythonVer = pythonVer,
+        broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
+        accumulator = null),
+      dataType = IntegerType,
+      pythonEvalType = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
+      udfDeterministic = true) {
+
+      override def builder(e: Seq[Expression]): Expression = {
+        assert(e.length == 1, "Defined UDF only has one column")
+        val expr = e.head
+        assert(expr.resolved, "column should be resolved to use the same type " +
+          "as input. Try df(name) or df.col(name)")
+        val pythonUDF = new PythonUDFWithoutId(
+          super.builder(Cast(expr, IntegerType) :: Nil).asInstanceOf[PythonUDF])
+        Cast(pythonUDF, expr.dataType)

Review comment:
       I think we don't need a cast here because the UDF always returns integer. I think we could just remove `builder`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35615:
URL: https://github.com/apache/spark/pull/35615#discussion_r812476471



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
##########
@@ -333,6 +367,56 @@ object IntegratedUDFTestUtils extends SQLHelper {
     val prettyName: String = "Scalar Pandas UDF"
   }
 
+  /**
+   * A Grouped Aggregate Pandas UDF that takes one column, executes the
+   * Python native function calculating the count of the column using pandas.
+   *
+   * Virtually equivalent to:
+   *
+   * {{{
+   *   import pandas as pd
+   *   from pyspark.sql.functions import pandas_udf
+   *
+   *   df = spark.createDataFrame(
+   *       [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
+   *
+   *   @pandas_udf("double")
+   *   def pandas_count(v: pd.Series) -> int:
+   *       return v.count()

Review comment:
       ```suggestion
      *       return v.count()
      *
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on a change in pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF.

Posted by GitBox <gi...@apache.org>.
itholic commented on a change in pull request #35615:
URL: https://github.com/apache/spark/pull/35615#discussion_r812558248



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
##########
@@ -69,6 +70,15 @@ import org.apache.spark.sql.types.{DataType, StringType}
  *   df.select(expr("udf_name(id)")
  *   df.select(pandasTestUDF(df("id")))
  * }}}
+ *
+ * For Grouped Aggregate Pandas UDF, it defines an UDF that calculate the count using pandas.
+ * UDF returns the count of given column, so the input and output length could be different.
+ *
+ * To register Grouped Aggregate Pandas UDF in SQL:
+ * {{{
+ *   val groupedAggPandasTestUDF = TestGroupedAggPandasUDF(name = "udf_name")
+ *   registerTestUDF(groupedAggPandasTestUDF, spark)

Review comment:
       Sure, let me add




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35615:
URL: https://github.com/apache/spark/pull/35615#discussion_r812475789



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
##########
@@ -69,6 +70,15 @@ import org.apache.spark.sql.types.{DataType, StringType}
  *   df.select(expr("udf_name(id)")
  *   df.select(pandasTestUDF(df("id")))
  * }}}
+ *
+ * For Grouped Aggregate Pandas UDF, it defines an UDF that calculate the count using pandas.
+ * UDF returns the count of given column, so the input and output length could be different.

Review comment:
       ```suggestion
    * The UDF returns the count of the given column.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35615:
URL: https://github.com/apache/spark/pull/35615#discussion_r812475371



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
##########
@@ -69,6 +70,15 @@ import org.apache.spark.sql.types.{DataType, StringType}
  *   df.select(expr("udf_name(id)")
  *   df.select(pandasTestUDF(df("id")))
  * }}}
+ *
+ * For Grouped Aggregate Pandas UDF, it defines an UDF that calculate the count using pandas.

Review comment:
       ```suggestion
    * For Grouped Aggregate Pandas UDF, it defines an UDF that calculates the count using pandas.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35615:
URL: https://github.com/apache/spark/pull/35615#discussion_r812475306



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
##########
@@ -31,15 +31,16 @@ import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExprId, Pyth
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.expressions.SparkUserDefinedFunction
-import org.apache.spark.sql.types.{DataType, StringType}
+import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
 
 /**
- * This object targets to integrate various UDF test cases so that Scalar UDF, Python UDF and
- * Scalar Pandas UDFs can be tested in SBT & Maven tests.
+ * This object targets to integrate various UDF test cases so that Scalar UDF, Python UDF,
+ * Scalar Pandas UDF and Grouped Aggregate Pandas UDF can be tested in SBT & Maven tests.
  *
- * The available UDFs are special. It defines an UDF wrapped by cast. So, the input column is
- * casted into string, UDF returns strings as are, and then output column is casted back to
- * the input column. In this way, UDF is virtually no-op.
+ * The available UDFs are special. For Scalar UDF, Python UDF and Scalar Pandas UDF,
+ * it defines an UDF wrapped by cast. So, the input column is casted into string,
+ * UDF returns strings as are, and then output column is casted back to the input column.
+ * In this way, UDF is virtually no-op.

Review comment:
       I think we should mention that. grouped aggregtate `count`, not no-op.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #35615:
URL: https://github.com/apache/spark/pull/35615#discussion_r812476068



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
##########
@@ -69,6 +70,15 @@ import org.apache.spark.sql.types.{DataType, StringType}
  *   df.select(expr("udf_name(id)")
  *   df.select(pandasTestUDF(df("id")))
  * }}}
+ *
+ * For Grouped Aggregate Pandas UDF, it defines an UDF that calculate the count using pandas.
+ * UDF returns the count of given column, so the input and output length could be different.
+ *
+ * To register Grouped Aggregate Pandas UDF in SQL:
+ * {{{
+ *   val groupedAggPandasTestUDF = TestGroupedAggPandasUDF(name = "udf_name")
+ *   registerTestUDF(groupedAggPandasTestUDF, spark)

Review comment:
       Can we add an example to use it in SQL too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35615: [SPARK-38235][SQL][TESTS] Add test util for testing grouped aggregate pandas UDF

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35615:
URL: https://github.com/apache/spark/pull/35615#issuecomment-1048593004


   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org