You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by zhaifengwei <zh...@163.com> on 2017/12/15 07:04:45 UTC

Can flink aggregate in local TM,then aggregate in global TM?

I have a cluster environment, I need aggregate dataStream on it. 
I`m wonder whether I can aggregate in local server first, then aggregate in
global. 
When I aggregate dataStream in global, the Network IO will increase fast. 
I just want decrease the Network IO, So I need aggregate in local server
first. 
How can I do it. 

DataStream<String> dataIn.... 
dataIn.map().filter().assignTimestampsAndWatermarks().keyBy().window().Fold()



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Can flink aggregate in local TM,then aggregate in global TM?

Posted by zhaifengwei <zh...@163.com>.
Hi Fabian Hueske-2,
    Thanks for your reply.This is exactly what I was looking for. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Can flink aggregate in local TM,then aggregate in global TM?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

(copying my answer from Stack Overflow)

The current release of Flink (Flink 1.4.0, Dec 2017) does not feature
built-in support for pre-aggregations.
However, there are efforts on the way to add this for the next release
(1.5.0), see FLINK-7561 [4]

You can implement a pre-aggregation operation based on a ProcessFunction
[1]. The ProcessFunction could keep the pre-aggregates in a HashMap (of
fixed size) in memory and register timers event-time and processing-time)
to periodically emit the pre-aggregates. The state (i.e., content of the
`HashMap`) should be persisted in managed operator state [2] to prevent
data loss in case of a failure. When setting the timers, you need to
respect the window boundaries.

Please note that FoldFunction has been deprecated and should be replaced by
AggregateFunction [3].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#operator-state
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#aggregatefunction
[4] https://issues.apache.org/jira/browse/FLINK-7561

2017-12-15 8:04 GMT+01:00 zhaifengwei <zh...@163.com>:

> I have a cluster environment, I need aggregate dataStream on it.
> I`m wonder whether I can aggregate in local server first, then aggregate in
> global.
> When I aggregate dataStream in global, the Network IO will increase fast.
> I just want decrease the Network IO, So I need aggregate in local server
> first.
> How can I do it.
>
> DataStream<String> dataIn....
> dataIn.map().filter().assignTimestampsAndWatermarks(
> ).keyBy().window().Fold()
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>