You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by AlstonWilliams <ps...@gmail.com> on 2021/08/12 03:20:11 UTC

Register an Aggregator as an UDAF for Spark SQL 3

HI all,
    I use Spark 3.0.2, I have written an Aggregator function, and I wanna
register it to Spark SQL, so I can call it by ThriftServer.
    In Spark 2.4, I can extends `UserDefinedAggregationFunction`, and use
the following statement to register it in Spark SQL shell:
    ```
        CREATE FUNCTION funnel AS 'org.example.udaf.FunnelUDAF' USING JAR
'/home/xxx/funnel-udaf-1.0-SNAPSHOT-jar-with-dependencies-spark2.jar';
    ```
    So my web application can execute following SQL by ThriftServer to get
the result:
    ```
        select funnel(args1, args2, args3) from table;
    ```

    But in Spark 3.0.2, something changed. I have rewritten my UDAF to
satisfy Spark 3.0.2.

    First, I rewritten UDAF to extend `Aggregator`, instead of
`UserDefinedAggregationFunction`. And I use the same statement to register
it in the Spark SQL shell, except for different function name and class
name.
    ```
       CREATE FUNCTION funnel_spark_3 AS
'org.example.udaf.FunnelUDAFSpark3' USING JAR
'/home/xxx/funnel-udaf-1.0-SNAPSHOT-jar-with-dependencies-spark3.jar';
    ```
     And when I call this function in Spark SQL shell, I got following
exception:
    ```
       Error: Error operating EXECUTE_STATEMENT:
org.apache.spark.sql.AnalysisException: No handler for UDF/UDAF/UDTF
'org.example.udaf.FunnelUDAFSpark3'; line 9 pos 0
at
org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$makeFunctionExpression$4(HiveSessionCatalog.scala:113)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$makeFunctionExpression$3(HiveSessionCatalog.scala:113)
at scala.util.Failure.getOrElse(Try.scala:222)
at
org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$makeFunctionExpression$1(HiveSessionCatalog.scala:72)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:221)
at
org.apache.spark.sql.hive.HiveSessionCatalog.makeFunctionExpression(HiveSessionCatalog.scala:72)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$makeFunctionBuilder$1(SessionCatalog.scala:1277)
at
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:121)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1470)
at
org.apache.spark.sql.hive.HiveSessionCatalog.super$lookupFunction(HiveSessionCatalog.scala:135)
    ```

    I read `SessionCatalog.scala`'s source code, and I found It only
accepts `UserDefinedAggregationFunction`.

    So my question is, is there a way to register an Aggregator to use in
Thrift Server for Spark3?

    Please help me. Thanks.