You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Shaoxuan Wang <ws...@gmail.com> on 2017/05/12 09:53:27 UTC

[DISCUSS] Expose State Backend Interface for UDAGG

Hi everyone,

We made some progress in the implementation of UDAGG (FLINK-5564). However,
we realized that there are cases where users may want to use state backend
to store the data. For instance, the built-in MaxWithRetractAggFunction
currently create a hashMap to store the historical data. It will have
problem when the # of keys are huge enough, thereby leading to OOM.

In FLINK-6544, we have proposed an approach to expose State Backend
Interface for UDAGG. A brief design doc can be found in
https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26nWscLIOn50c/edit

I am opening this discussion thread, as I realized there are recently some
open jiras which are towards to implement some special aggregators, such as
"count distinct". IMO, "count distinct" is just an UDAGG. With the new
proposed FLINK-6544, we can just make it as a built-in agg without changing
the current UDAGG framework.

@Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me know
what you think.
Btw, we do not need include this change for release 1.3 in our opinion.

Regards,
Shaoxuan

Re: [DISCUSS] Expose State Backend Interface for UDAGG

Posted by Shaoxuan Wang <ws...@gmail.com>.
Radu,
Thanks for the feedback. ProcessFunction is a lower level execution
operator which is not able to be accessed by tableAPI and SQL users.
FLINK-6544 is trying to create a generic interface to let tableAPI &
SQL users access backend state via UDAGG. It will be eventually
code-generated to the processFunction, similar as what you have described.

Regards,
Shaoxuan



On Fri, May 12, 2017 at 7:46 PM, Radu Tudoran <ra...@huawei.com>
wrote:

> Hi,
>
> In general I believe it is a good idea to expose the state backend to the
> functions. You can always optimize the data processing based on the data
> storage. Hence, as the level of the processing (aggregation here) you would
> be able to control the access to data, you can implement this in a smart
> way. Moreover, we can also construct different data organizations/partition
> strategy/etc based on the specific computation. I understand that this
> would be quite an effort, but at some point it is worth making it.
>
> Meanwhile if it would not be possible to have the aggregation function
> extending the rich interface, wouldn't we be able to supplement this with
> some extra logic in the process function that would provide the aggregates
> the needed data or at least pointers to the required state?
>
> As far as I know it would be legal now to have something like:
>
> ProcessFunction () {
>
> ValueState state = ...
>
> processElement(newElement) {
>
> acc.accumulate(newElement, state)
>
> }
> }
>
> WeightedAvgAccum {
>
> public void accumulate(Row newElement, ValueState state) {
>
>     state.value....
> }
> }
> Would something like this at least partially solve the problem? ...it
> would allow you to manage the intermediate data directly in the state
> instead of the memory
>
>
> -----Original Message-----
> From: Shaoxuan Wang [mailto:wshaoxuan@gmail.com]
> Sent: Friday, May 12, 2017 1:20 PM
> To: Dev
> Cc: Stephan Ewen
> Subject: Re: [DISCUSS] Expose State Backend Interface for UDAGG
>
> Fabian,
> Thanks for your quick reply.
> The goal of "FLINK-6544" is not to expose state backend in all UDAGG cases.
> It is designed to provide an interface which provides an ability for user
> to access state backend when it is allowed (yes, right now this is only
> allowed by ProcessFunction).  This interface itself does not make the
> things better. Instead, it provides a generic interface for the future
> adoption of exposing backend state in all different UDAGG cases, and the
> current over Aggregate and unbounded group aggregate can enjoy the benefits
> of accessing state backend.
>
> In the meanwhile, I am also curious why we cannot build AggregateFunction
> on RichFunction. We will lose lots of benefit of having state backend for
> window Aggregate if it does not provide runtime context.
> @Stephan It is really appreciate if you can share the concerns or blocking
> reasons of not having AggregateFunction designed on top of RichFunction.
>
> Regards,
> Shaoxuan
>
>
> On Fri, May 12, 2017 at 6:21 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > Hi, thanks for the proposal.
> >
> > I think exposing state to UDAGGs would be very difficult and a lot of
> work.
> >
> > UDAGGs are called from ProcessFunctions (stream, OVER window and
> > non-window aggs), AggregateFunctions (stream, group-window aggs),
> > CombineFunctions
> > (batch) and GroupReduceFunctions (batch). The batch functions do not
> > support state backends at all, ProcessFunctions can register state,
> > and AggregateFunction cannot.
> > Even when putting the batch case aside this is very hard.
> >
> > AggregateFunctions support merging of windows. Right now, this only
> > involves merging of accumulators. If we allow AggregateFunctions to
> > have state, we would also need to provide logic to merge the state.
> > Moreover, it is not clearly defined when AggregateFunctions are called
> > (similar to Combiners in MapReduce) which would make state handling very
> complex.
> > Changing this would be a major effort in the DataStream API.
> >
> > An alternative would be to reimplement the group-window logic in the
> > Table API, but this will he a huge effort as well (maybe we have to do
> > it anyway at some point though).
> >
> > @Stephan knows more about the implications of allowing state in
> > AggregateFunctions.
> >
> > Best, Fabian
> >
> > 2017-05-12 11:53 GMT+02:00 Shaoxuan Wang <ws...@gmail.com>:
> >
> > > Hi everyone,
> > >
> > > We made some progress in the implementation of UDAGG (FLINK-5564).
> > However,
> > > we realized that there are cases where users may want to use state
> > backend
> > > to store the data. For instance, the built-in
> > > MaxWithRetractAggFunction currently create a hashMap to store the
> > > historical data. It will have problem when the # of keys are huge
> enough, thereby leading to OOM.
> > >
> > > In FLINK-6544, we have proposed an approach to expose State Backend
> > > Interface for UDAGG. A brief design doc can be found in
> > > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26
> > > nWscLIOn50c/edit
> > >
> > > I am opening this discussion thread, as I realized there are
> > > recently
> > some
> > > open jiras which are towards to implement some special aggregators,
> > > such
> > as
> > > "count distinct". IMO, "count distinct" is just an UDAGG. With the
> > > new proposed FLINK-6544, we can just make it as a built-in agg
> > > without
> > changing
> > > the current UDAGG framework.
> > >
> > > @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me
> > > know what you think.
> > > Btw, we do not need include this change for release 1.3 in our opinion.
> > >
> > > Regards,
> > > Shaoxuan
> > >
> >
>

RE: [DISCUSS] Expose State Backend Interface for UDAGG

Posted by Radu Tudoran <ra...@huawei.com>.
Hi,

In general I believe it is a good idea to expose the state backend to the functions. You can always optimize the data processing based on the data storage. Hence, as the level of the processing (aggregation here) you would be able to control the access to data, you can implement this in a smart way. Moreover, we can also construct different data organizations/partition strategy/etc based on the specific computation. I understand that this would be quite an effort, but at some point it is worth making it.

Meanwhile if it would not be possible to have the aggregation function extending the rich interface, wouldn't we be able to supplement this with some extra logic in the process function that would provide the aggregates the needed data or at least pointers to the required state?

As far as I know it would be legal now to have something like:

ProcessFunction () {

ValueState state = ...

processElement(newElement) {

acc.accumulate(newElement, state)

}
}

WeightedAvgAccum {

public void accumulate(Row newElement, ValueState state) {

    state.value....
}
}
Would something like this at least partially solve the problem? ...it would allow you to manage the intermediate data directly in the state instead of the memory


-----Original Message-----
From: Shaoxuan Wang [mailto:wshaoxuan@gmail.com] 
Sent: Friday, May 12, 2017 1:20 PM
To: Dev
Cc: Stephan Ewen
Subject: Re: [DISCUSS] Expose State Backend Interface for UDAGG

Fabian,
Thanks for your quick reply.
The goal of "FLINK-6544" is not to expose state backend in all UDAGG cases.
It is designed to provide an interface which provides an ability for user to access state backend when it is allowed (yes, right now this is only allowed by ProcessFunction).  This interface itself does not make the things better. Instead, it provides a generic interface for the future adoption of exposing backend state in all different UDAGG cases, and the current over Aggregate and unbounded group aggregate can enjoy the benefits of accessing state backend.

In the meanwhile, I am also curious why we cannot build AggregateFunction on RichFunction. We will lose lots of benefit of having state backend for window Aggregate if it does not provide runtime context.
@Stephan It is really appreciate if you can share the concerns or blocking reasons of not having AggregateFunction designed on top of RichFunction.

Regards,
Shaoxuan


On Fri, May 12, 2017 at 6:21 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi, thanks for the proposal.
>
> I think exposing state to UDAGGs would be very difficult and a lot of work.
>
> UDAGGs are called from ProcessFunctions (stream, OVER window and 
> non-window aggs), AggregateFunctions (stream, group-window aggs), 
> CombineFunctions
> (batch) and GroupReduceFunctions (batch). The batch functions do not 
> support state backends at all, ProcessFunctions can register state, 
> and AggregateFunction cannot.
> Even when putting the batch case aside this is very hard.
>
> AggregateFunctions support merging of windows. Right now, this only 
> involves merging of accumulators. If we allow AggregateFunctions to 
> have state, we would also need to provide logic to merge the state. 
> Moreover, it is not clearly defined when AggregateFunctions are called 
> (similar to Combiners in MapReduce) which would make state handling very complex.
> Changing this would be a major effort in the DataStream API.
>
> An alternative would be to reimplement the group-window logic in the 
> Table API, but this will he a huge effort as well (maybe we have to do 
> it anyway at some point though).
>
> @Stephan knows more about the implications of allowing state in 
> AggregateFunctions.
>
> Best, Fabian
>
> 2017-05-12 11:53 GMT+02:00 Shaoxuan Wang <ws...@gmail.com>:
>
> > Hi everyone,
> >
> > We made some progress in the implementation of UDAGG (FLINK-5564).
> However,
> > we realized that there are cases where users may want to use state
> backend
> > to store the data. For instance, the built-in 
> > MaxWithRetractAggFunction currently create a hashMap to store the 
> > historical data. It will have problem when the # of keys are huge enough, thereby leading to OOM.
> >
> > In FLINK-6544, we have proposed an approach to expose State Backend 
> > Interface for UDAGG. A brief design doc can be found in
> > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26
> > nWscLIOn50c/edit
> >
> > I am opening this discussion thread, as I realized there are 
> > recently
> some
> > open jiras which are towards to implement some special aggregators, 
> > such
> as
> > "count distinct". IMO, "count distinct" is just an UDAGG. With the 
> > new proposed FLINK-6544, we can just make it as a built-in agg 
> > without
> changing
> > the current UDAGG framework.
> >
> > @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me 
> > know what you think.
> > Btw, we do not need include this change for release 1.3 in our opinion.
> >
> > Regards,
> > Shaoxuan
> >
>

Re: [DISCUSS] Expose State Backend Interface for UDAGG

Posted by Shaoxuan Wang <ws...@gmail.com>.
Fabian,
Thanks for your quick reply.
The goal of "FLINK-6544" is not to expose state backend in all UDAGG cases.
It is designed to provide an interface which provides an ability for user
to access state backend when it is allowed (yes, right now this is only
allowed by ProcessFunction).  This interface itself does not make the
things better. Instead, it provides a generic interface for the future
adoption of exposing backend state in all different UDAGG cases, and the
current over Aggregate and unbounded group aggregate can enjoy the benefits
of accessing state backend.

In the meanwhile, I am also curious why we cannot build AggregateFunction
on RichFunction. We will lose lots of benefit of having state backend for
window Aggregate if it does not provide runtime context.
@Stephan It is really appreciate if you can share the concerns or blocking
reasons of not having AggregateFunction designed on top of RichFunction.

Regards,
Shaoxuan


On Fri, May 12, 2017 at 6:21 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi, thanks for the proposal.
>
> I think exposing state to UDAGGs would be very difficult and a lot of work.
>
> UDAGGs are called from ProcessFunctions (stream, OVER window and non-window
> aggs), AggregateFunctions (stream, group-window aggs), CombineFunctions
> (batch) and GroupReduceFunctions (batch). The batch functions do not
> support state backends at all, ProcessFunctions can register state, and
> AggregateFunction cannot.
> Even when putting the batch case aside this is very hard.
>
> AggregateFunctions support merging of windows. Right now, this only
> involves merging of accumulators. If we allow AggregateFunctions to have
> state, we would also need to provide logic to merge the state. Moreover, it
> is not clearly defined when AggregateFunctions are called (similar to
> Combiners in MapReduce) which would make state handling very complex.
> Changing this would be a major effort in the DataStream API.
>
> An alternative would be to reimplement the group-window logic in the Table
> API, but this will he a huge effort as well (maybe we have to do it anyway
> at some point though).
>
> @Stephan knows more about the implications of allowing state in
> AggregateFunctions.
>
> Best, Fabian
>
> 2017-05-12 11:53 GMT+02:00 Shaoxuan Wang <ws...@gmail.com>:
>
> > Hi everyone,
> >
> > We made some progress in the implementation of UDAGG (FLINK-5564).
> However,
> > we realized that there are cases where users may want to use state
> backend
> > to store the data. For instance, the built-in MaxWithRetractAggFunction
> > currently create a hashMap to store the historical data. It will have
> > problem when the # of keys are huge enough, thereby leading to OOM.
> >
> > In FLINK-6544, we have proposed an approach to expose State Backend
> > Interface for UDAGG. A brief design doc can be found in
> > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26
> > nWscLIOn50c/edit
> >
> > I am opening this discussion thread, as I realized there are recently
> some
> > open jiras which are towards to implement some special aggregators, such
> as
> > "count distinct". IMO, "count distinct" is just an UDAGG. With the new
> > proposed FLINK-6544, we can just make it as a built-in agg without
> changing
> > the current UDAGG framework.
> >
> > @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me know
> > what you think.
> > Btw, we do not need include this change for release 1.3 in our opinion.
> >
> > Regards,
> > Shaoxuan
> >
>

Re: [DISCUSS] Expose State Backend Interface for UDAGG

Posted by Fabian Hueske <fh...@gmail.com>.
Hi, thanks for the proposal.

I think exposing state to UDAGGs would be very difficult and a lot of work.

UDAGGs are called from ProcessFunctions (stream, OVER window and non-window
aggs), AggregateFunctions (stream, group-window aggs), CombineFunctions
(batch) and GroupReduceFunctions (batch). The batch functions do not
support state backends at all, ProcessFunctions can register state, and
AggregateFunction cannot.
Even when putting the batch case aside this is very hard.

AggregateFunctions support merging of windows. Right now, this only
involves merging of accumulators. If we allow AggregateFunctions to have
state, we would also need to provide logic to merge the state. Moreover, it
is not clearly defined when AggregateFunctions are called (similar to
Combiners in MapReduce) which would make state handling very complex.
Changing this would be a major effort in the DataStream API.

An alternative would be to reimplement the group-window logic in the Table
API, but this will he a huge effort as well (maybe we have to do it anyway
at some point though).

@Stephan knows more about the implications of allowing state in
AggregateFunctions.

Best, Fabian

2017-05-12 11:53 GMT+02:00 Shaoxuan Wang <ws...@gmail.com>:

> Hi everyone,
>
> We made some progress in the implementation of UDAGG (FLINK-5564). However,
> we realized that there are cases where users may want to use state backend
> to store the data. For instance, the built-in MaxWithRetractAggFunction
> currently create a hashMap to store the historical data. It will have
> problem when the # of keys are huge enough, thereby leading to OOM.
>
> In FLINK-6544, we have proposed an approach to expose State Backend
> Interface for UDAGG. A brief design doc can be found in
> https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26
> nWscLIOn50c/edit
>
> I am opening this discussion thread, as I realized there are recently some
> open jiras which are towards to implement some special aggregators, such as
> "count distinct". IMO, "count distinct" is just an UDAGG. With the new
> proposed FLINK-6544, we can just make it as a built-in agg without changing
> the current UDAGG framework.
>
> @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me know
> what you think.
> Btw, we do not need include this change for release 1.3 in our opinion.
>
> Regards,
> Shaoxuan
>