You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Piyush Narang <p....@criteo.com> on 2019/06/25 19:16:05 UTC

open() setup method not being called for AggregateFunctions?

Hi folks,

I’ve tried to create some Flink UDAFs that I’m invoking using the Table / SQL api.  In these UDAFs I’ve overridden the open() method to perform some setup operations (in my case initialize some metric counters). I noticed that this open() function isn’t being invoked in either the Dataset or the Datastream versions. Incidentally when I tested this out with a UDF, the open method ends up getting invoked just fine. Anyone know if this is a known issue else what I might be doing incorrectly?

I’ve been able to write a short repo to demonstrate this here - https://gist.github.com/piyushnarang/fe562060789ffeb01d59dcc3da375849

I’m not super familiar with the planner code, though I did try to dig into it a bit in a debugger for the DataSet scenario and it looks like we are creating a DataSetAggFunction<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala#L46> in the AggregateUtil.createDataSetAggregateFunctions. However, the DataSetAggFunction.open() doesn’t seem to be invoking the open() / close() methods on the underlying functions (not sure if this is intended).

Thanks,

-- Piyush


Re: open() setup method not being called for AggregateFunctions?

Posted by Piyush Narang <p....@criteo.com>.
Circling back on this as I was able to dig in a bit more our specific use-case (Datastream API and we perform a window + groupby). It seems as though the planner is creating an AggregateAggFunction which currently isn’t a RichFunction. From what I understand, not allowing rich aggregation functions on WindowedStreams is currently by design? (https://issues.apache.org/jira/browse/FLINK-10250).

Any suggestions for how we can track metrics in our AggregateFunctions when we don’t support RichAggregateFunctions? I can think of some hacks on our end for this particular application where I can wrap the returned values in a UDF and capture metrics on that, but I’d love something cleaner.

Thanks,

-- Piyush


From: Piyush Narang <p....@criteo.com>
Date: Tuesday, June 25, 2019 at 3:16 PM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: open() setup method not being called for AggregateFunctions?

Hi folks,

I’ve tried to create some Flink UDAFs that I’m invoking using the Table / SQL api.  In these UDAFs I’ve overridden the open() method to perform some setup operations (in my case initialize some metric counters). I noticed that this open() function isn’t being invoked in either the Dataset or the Datastream versions. Incidentally when I tested this out with a UDF, the open method ends up getting invoked just fine. Anyone know if this is a known issue else what I might be doing incorrectly?

I’ve been able to write a short repo to demonstrate this here - https://gist.github.com/piyushnarang/fe562060789ffeb01d59dcc3da375849

I’m not super familiar with the planner code, though I did try to dig into it a bit in a debugger for the DataSet scenario and it looks like we are creating a DataSetAggFunction<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala#L46> in the AggregateUtil.createDataSetAggregateFunctions. However, the DataSetAggFunction.open() doesn’t seem to be invoking the open() / close() methods on the underlying functions (not sure if this is intended).

Thanks,

-- Piyush