You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by AlstonWilliams <ps...@gmail.com> on 2021/08/12 03:20:11 UTC
Register an Aggregator as an UDAF for Spark SQL 3
HI all,
I use Spark 3.0.2, I have written an Aggregator function, and I wanna
register it to Spark SQL, so I can call it by ThriftServer.
In Spark 2.4, I can extends `UserDefinedAggregationFunction`, and use
the following statement to register it in Spark SQL shell:
```
CREATE FUNCTION funnel AS 'org.example.udaf.FunnelUDAF' USING JAR
'/home/xxx/funnel-udaf-1.0-SNAPSHOT-jar-with-dependencies-spark2.jar';
```
So my web application can execute following SQL by ThriftServer to get
the result:
```
select funnel(args1, args2, args3) from table;
```
But in Spark 3.0.2, something changed. I have rewritten my UDAF to
satisfy Spark 3.0.2.
First, I rewritten UDAF to extend `Aggregator`, instead of
`UserDefinedAggregationFunction`. And I use the same statement to register
it in the Spark SQL shell, except for different function name and class
name.
```
CREATE FUNCTION funnel_spark_3 AS
'org.example.udaf.FunnelUDAFSpark3' USING JAR
'/home/xxx/funnel-udaf-1.0-SNAPSHOT-jar-with-dependencies-spark3.jar';
```
And when I call this function in Spark SQL shell, I got following
exception:
```
Error: Error operating EXECUTE_STATEMENT:
org.apache.spark.sql.AnalysisException: No handler for UDF/UDAF/UDTF
'org.example.udaf.FunnelUDAFSpark3'; line 9 pos 0
at
org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$makeFunctionExpression$4(HiveSessionCatalog.scala:113)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$makeFunctionExpression$3(HiveSessionCatalog.scala:113)
at scala.util.Failure.getOrElse(Try.scala:222)
at
org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$makeFunctionExpression$1(HiveSessionCatalog.scala:72)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:221)
at
org.apache.spark.sql.hive.HiveSessionCatalog.makeFunctionExpression(HiveSessionCatalog.scala:72)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$makeFunctionBuilder$1(SessionCatalog.scala:1277)
at
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:121)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1470)
at
org.apache.spark.sql.hive.HiveSessionCatalog.super$lookupFunction(HiveSessionCatalog.scala:135)
```
I read `SessionCatalog.scala`'s source code, and I found It only
accepts `UserDefinedAggregationFunction`.
So my question is, is there a way to register an Aggregator to use in
Thrift Server for Spark3?
Please help me. Thanks.