You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Thomas Wang <w...@datability.io> on 2023/04/23 21:42:52 UTC

Use Spark Aggregator in PySpark

Hi Spark Community,

I have implemented a custom Spark Aggregator (a subclass to
org.apache.spark.sql.expressions.Aggregator). Now I'm trying to use it in a
PySpark application, but for some reason, I'm not able to trigger the
function. Here is what I'm doing, could someone help me take a look? Thanks.

spark = self._gen_spark_session()
spark.udf.registerJavaFunction(
name="MyAggrator",
javaClassName="my.package.MyAggrator",
returnType=ArrayType(elementType=LongType()),
)

The above code runs successfully. However, to call it, I assume I should do
something like the following.

df = df.groupBy().agg(
functions.expr("MyAggrator(input)").alias("output"),
)

But this one gives me the following error:

pyspark.sql.utils.AnalysisException: UDF class my.package.MyAggrator
doesn't implement any UDF interface


My question is how can I use the Spark Aggregator defined in a jar file in
PySpark? Thanks.

Thomas

Re: Use Spark Aggregator in PySpark

Posted by Enrico Minack <in...@enrico.minack.dev>.
Hi,

For an aggregating UDF, use spark.udf.registerJavaUDAF(name, className).

Enrico



Am 23.04.23 um 23:42 schrieb Thomas Wang:
> Hi Spark Community,
>
> I have implemented a custom Spark Aggregator (a subclass to 
> |org.apache.spark.sql.expressions.Aggregator|). Now I'm trying to use 
> it in a PySpark application, but for some reason, I'm not able to 
> trigger the function. Here is what I'm doing, could someone help me 
> take a look? Thanks.
>
> spark = self._gen_spark_session()
> spark.udf.registerJavaFunction(
> name="MyAggrator",
> javaClassName="my.package.MyAggrator",
> returnType=ArrayType(elementType=LongType()),
> )
>
> The above code runs successfully. However, to call it, I assume I 
> should do something like the following.
>
> df = df.groupBy().agg(
> functions.expr("MyAggrator(input)").alias("output"),
> )
>
> But this one gives me the following error:
>
> pyspark.sql.utils.AnalysisException: UDF class my.package.MyAggrator doesn't implement any UDF interface
>
> My question is how can I use the Spark Aggregator defined in a jar 
> file in PySpark? Thanks.
>
> Thomas