You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Le Xu <sh...@gmail.com> on 2017/10/22 21:22:00 UTC

Local combiner on each mapper in Flink

Hello!

I'm new to Flink and I'm wondering if there is a explicit local combiner to
each mapper so I can use to perform a local reduce on each mapper? I looked
up on
https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
but couldn't find anything that matches.


Thanks!

Le

Re: Local combiner on each mapper in Flink

Posted by Le Xu <sh...@gmail.com>.
Thanks for the help!  I’ll try out the ProcessFunction then.

Le

> On Oct 26, 2017, at 8:03 AM, Kien Truong <du...@gmail.com> wrote:
> 
> Hi,
> For Streaming API, use a ProcessFunction as Fabian's suggestion. 
> You can pretty much do anything with a ProcessFunction :)
> 
> Best regards,
> 
> Kien
> 
> 
> On 10/26/2017 8:01 PM, Le Xu wrote:
>> Hi Kien:
>> 
>> Is there a similar API for DataStream as well?
>> 
>> Thanks!
>> 
>> Le
>> 
>> 
>>> On Oct 26, 2017, at 7:58 AM, Kien Truong <duckientruong@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> For batch API, you can use GroupReduceFunction, which give you the same benefit as a MapReduce combiner.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions>Regards,
>>> Kien
>>> 
>>> 
>>> On 10/26/2017 7:37 PM, Le Xu wrote:
>>>> Thanks guys! That makes more sense now. 
>>>> 
>>>> So does it mean once I start use a window operator, all operations on my WindowedStream                     would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition. 
>>>> 
>>>> If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.
>>>> 
>>>> Le
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.
>>>>> 
>>>>> The question is, why do you need a combiner in your use case.
>>>>> - To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
>>>>> - To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.
>>>>> 
>>>>> Hope this helps,
>>>>> Fabian
>>>>> 
>>>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com <ma...@gmail.com>>:
>>>>> Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
>>>>> What kind of data do you expect the following operator will receive?
>>>>> 
>>>>> Best,
>>>>> Kurt
>>>>> 
>>>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>>>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:
>>>>> 
>>>>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
>>>>>                 .keyBy(0) //id 
>>>>>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>>>> 
>>>>> and then for the reducer I can do:
>>>>>  
>>>>> windowStream.apply(...) 
>>>>> 
>>>>> and expect the window information is preserved.
>>>>> 
>>>>> If I were to do use aggregate on window stream, I would end up with something like:
>>>>> 
>>>>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>>>>>                 .keyBy(0) //id 
>>>>>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
>>>>> 				(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
>>>>>                     @Override
>>>>>                     public Accumulator createAccumulator() {
>>>>>                         return null;
>>>>>                     }
>>>>> 
>>>>>                     @Override
>>>>>                     public void add(Tuple2<String, Long> stringLong, Accumulator o) 					{
>>>>> 
>>>>>                     }
>>>>> 
>>>>>                     @Override
>>>>>                     public Tuple2<String, Long> getResult(Accumulator o) {
>>>>>                         return null;
>>>>>                     }
>>>>> 
>>>>>                     @Override
>>>>>                     public Accumulator merge(Accumulator o, Accumulator acc1) {
>>>>>                         return null;
>>>>>                     }
>>>>>                 });
>>>>> 
>>>>> Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?
>>>>> 
>>>>> 
>>>>> Thanks! I apologize if that sounds like a very intuitive questions.
>>>>> 
>>>>> 
>>>>> Le
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>>>> I think you can use WindowedStream.aggreate
>>>>> 
>>>>> Best,
>>>>> Kurt
>>>>> 
>>>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).
>>>>> 
>>>>> Thanks again!
>>>>> 
>>>>> Le
>>>>> 
>>>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hi,
>>>>> 
>>>>> The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>>>> 
>>>>> Regarding to your question, you can use combineGroup 
>>>>> 
>>>>> Best,
>>>>> Kurt
>>>>> 
>>>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hello!
>>>>> 
>>>>> I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html> but couldn't find anything that matches.
>>>>> 
>>>>> 
>>>>> Thanks!
>>>>> 
>>>>> Le
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 


Re: Local combiner on each mapper in Flink

Posted by Kien Truong <du...@gmail.com>.
Hi,

For Streaming API, use a ProcessFunction as Fabian's suggestion.

You can pretty much do anything with a ProcessFunction :)


Best regards,

Kien


On 10/26/2017 8:01 PM, Le Xu wrote:
> Hi Kien:
>
> Is there a similar API for DataStream as well?
>
> Thanks!
>
> Le
>
>
>> On Oct 26, 2017, at 7:58 AM, Kien Truong <duckientruong@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>> Hi,
>>
>> For batch API, you can use GroupReduceFunction, which give you the 
>> same benefit as a MapReduce combiner.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
>>
>> Regards,
>>
>> Kien
>>
>>
>> On 10/26/2017 7:37 PM, Le Xu wrote:
>>> Thanks guys! That makes more sense now.
>>>
>>> So does it mean once I start use a window operator, all operations 
>>> on my WindowedStream would be global (across all partitions)? In 
>>> that case, WindowedStream.aggregate (or sum) would apply to all data 
>>> after shuffling instead of each partition.
>>>
>>> If I understand this correctly, once I want to perform some sort of 
>>> counting within each partition for different words (such as word 
>>> count), I should really avoid using keyBy but keep some sort of 
>>> counting map for each word while also keep track of the current time 
>>> stamp, inside each mapper.
>>>
>>> Le
>>>
>>>
>>>
>>>
>>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhueske@gmail.com 
>>>> <ma...@gmail.com>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> in a MapReduce context, combiners are used to reduce the amount of 
>>>> data 1) to shuffle and fully sort (to group the data by key) and 2) 
>>>> to reduce the impact of skewed data.
>>>>
>>>> The question is, why do you need a combiner in your use case.
>>>> - To reduce the data to shuffle: You should not use a window 
>>>> operator to preaggregate because keyBy implies a shuffle. Instead 
>>>> you could implement a ProcessFunction with operator state. In this 
>>>> solution you need to implement the windowing logic yourself, i.e., 
>>>> group data in window based on their timestamp. Ensure you don't run 
>>>> out of memory (operator state is kept on the heap), etc. So this 
>>>> solution needs quite a bit of manual tuning.
>>>> - To reduce the impact of skewed data: You can use a window 
>>>> aggregation if you don't mind the shuffle. However, you should add 
>>>> an additional artificial key attribute to spread out the 
>>>> computation of the same original key to more grouping key. Note 
>>>> that Flink assigns grouping keys by hash partitioning to workers. 
>>>> This works well for many distinct keys, but might cause issues in 
>>>> case of low key cardinality. Also note that the state size grows 
>>>> and effectiveness reduces with an increasing cardinality of the 
>>>> artificial key.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com 
>>>> <ma...@gmail.com>>:
>>>>
>>>>     Do you mean you want to keep the origin window as well as doing
>>>>     some combine operations inside window in the same time?
>>>>     What kind of data do you expect the following operator will
>>>>     receive?
>>>>
>>>>     Best,
>>>>     Kurt
>>>>
>>>>     On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com
>>>>     <ma...@gmail.com>> wrote:
>>>>
>>>>         Thank Kurt I'm trying out WindowedStream aggregate right
>>>>         now. Just wondering, is there any way for me to preserve
>>>>         the window after aggregation. More specifically, originally
>>>>         i have something like:
>>>>
>>>>         WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow>
>>>>         windowStream = dataStream
>>>>             .keyBy(0) //id
>>>>         .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>>>
>>>>         and then for the reducer I can do:
>>>>         windowStream.apply(...)
>>>>
>>>>         and expect the window information is preserved.
>>>>
>>>>         If I were to do use aggregate on window stream, I would end
>>>>         up with something like:
>>>>
>>>>         DataStream<Tuple2<String, Long>> windowStream = dataStream
>>>>               .keyBy(0) //id
>>>>         .timeWindow(Time.of(windowSize,
>>>>         TimeUnit.MILLISECONDS)).aggregate
>>>>         (new AggregateFunction<Tuple2<String, Long>, Accumulator,
>>>>         Tuple2<String, Long>>() {
>>>>                   @Override
>>>>                   public Accumulator createAccumulator() {
>>>>                       return null;
>>>>                   }
>>>>
>>>>                   @Override
>>>>                   public void add(Tuple2<String, Long> stringLong,
>>>>         Accumulator o) {
>>>>
>>>>                   }
>>>>
>>>>                   @Override
>>>>                   public Tuple2<String, Long> getResult(Accumulator
>>>>         o) {
>>>>                       return null;
>>>>                   }
>>>>
>>>>                   @Override
>>>>                   public Accumulator merge(Accumulator o,
>>>>         Accumulator acc1) {
>>>>                       return null;
>>>>                   }
>>>>               });
>>>>
>>>>         Because it looks like aggregate would only transfer
>>>>         WindowedStream to a DataStream. But for a global
>>>>         aggregation phase (a reducer), should I extract the window
>>>>         again?
>>>>
>>>>
>>>>         Thanks! I apologize if that sounds like a very intuitive
>>>>         questions.
>>>>
>>>>
>>>>         Le
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>         On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young
>>>>         <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>>>
>>>>             I think you can use WindowedStream.aggreate
>>>>
>>>>             Best,
>>>>             Kurt
>>>>
>>>>             On Tue, Oct 24, 2017 at 1:45 PM, Le Xu
>>>>             <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>>>
>>>>                 Thanks Kurt. Maybe I wasn't clear before, I was
>>>>                 wondering if Flink has implementation of combiner
>>>>                 in DataStream (to use after keyBy and windowing).
>>>>
>>>>                 Thanks again!
>>>>
>>>>                 Le
>>>>
>>>>                 On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young
>>>>                 <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>>>
>>>>                     Hi,
>>>>
>>>>                     The document you are looking at is pretty old,
>>>>                     you can check the newest version here:
>>>>                     https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
>>>>                     <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>>>
>>>>                     Regarding to your question, you can use
>>>>                     combineGroup
>>>>
>>>>                     Best,
>>>>                     Kurt
>>>>
>>>>                     On Mon, Oct 23, 2017 at 5:22 AM, Le Xu
>>>>                     <sharonxu65@gmail.com
>>>>                     <ma...@gmail.com>> wrote:
>>>>
>>>>                         Hello!
>>>>
>>>>                         I'm new to Flink and I'm wondering if there
>>>>                         is a explicit local combiner to each mapper
>>>>                         so I can use to perform a local reduce on
>>>>                         each mapper? I looked up on
>>>>                         https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
>>>>                         <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
>>>>                         but couldn't find anything that matches.
>>>>
>>>>
>>>>                         Thanks!
>>>>
>>>>                         Le
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>

Re: Local combiner on each mapper in Flink

Posted by Le Xu <sh...@gmail.com>.
Hi Kien:

Is there a similar API for DataStream as well?

Thanks!

Le


> On Oct 26, 2017, at 7:58 AM, Kien Truong <du...@gmail.com> wrote:
> 
> Hi,
> 
> For batch API, you can use GroupReduceFunction, which give you the same benefit as a MapReduce combiner.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions>Regards,
> Kien
> 
> 
> On 10/26/2017 7:37 PM, Le Xu wrote:
>> Thanks guys! That makes more sense now. 
>> 
>> So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition. 
>> 
>> If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.
>> 
>> Le
>> 
>> 
>> 
>> 
>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.
>>> 
>>> The question is, why do you need a combiner in your use case.
>>> - To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
>>> - To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional                   artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.
>>> 
>>> Hope this helps,
>>> Fabian
>>> 
>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com <ma...@gmail.com>>:
>>> Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
>>> What kind of data do you expect the following operator will receive?
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:
>>> 
>>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
>>>                 .keyBy(0) //id 
>>>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>> 
>>> and then for the reducer I can do:
>>>  
>>> windowStream.apply(...) 
>>> 
>>> and expect the window information is preserved.
>>> 
>>> If I were to do use aggregate on window stream, I would end up with something like:
>>> 
>>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>>>                 .keyBy(0) //id 
>>>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
>>> 				(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
>>>                     @Override
>>>                     public Accumulator createAccumulator() {
>>>                         return null;
>>>                     }
>>> 
>>>                     @Override
>>>                     public void add(Tuple2<String, Long> stringLong, Accumulator o) 					{
>>> 
>>>                     }
>>> 
>>>                     @Override
>>>                     public Tuple2<String, Long> getResult(Accumulator o) {
>>>                         return null;
>>>                     }
>>> 
>>>                     @Override
>>>                     public Accumulator merge(Accumulator o, Accumulator acc1) {
>>>                         return null;
>>>                     }
>>>                 });
>>> 
>>> Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?
>>> 
>>> 
>>> Thanks! I apologize if that sounds like a very intuitive questions.
>>> 
>>> 
>>> Le
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>> I think you can use WindowedStream.aggreate
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).
>>> 
>>> Thanks again!
>>> 
>>> Le
>>> 
>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>> Hi,
>>> 
>>> The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>> 
>>> Regarding to your question, you can use combineGroup 
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>> Hello!
>>> 
>>> I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html> but couldn't find anything that matches.
>>> 
>>> 
>>> Thanks!
>>> 
>>> Le
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 


Re: Local combiner on each mapper in Flink

Posted by Kien Truong <du...@gmail.com>.
Hi,

For batch API, you can use GroupReduceFunction, which give you the same 
benefit as a MapReduce combiner.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions

Regards,

Kien


On 10/26/2017 7:37 PM, Le Xu wrote:
> Thanks guys! That makes more sense now.
>
> So does it mean once I start use a window operator, all operations on 
> my WindowedStream would be global (across all partitions)? In that 
> case, WindowedStream.aggregate (or sum) would apply to all data after 
> shuffling instead of each partition.
>
> If I understand this correctly, once I want to perform some sort of 
> counting within each partition for different words (such as word 
> count), I should really avoid using keyBy but keep some sort of 
> counting map for each word while also keep track of the current time 
> stamp, inside each mapper.
>
> Le
>
>
>
>
>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhueske@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>> Hi,
>>
>> in a MapReduce context, combiners are used to reduce the amount of 
>> data 1) to shuffle and fully sort (to group the data by key) and 2) 
>> to reduce the impact of skewed data.
>>
>> The question is, why do you need a combiner in your use case.
>> - To reduce the data to shuffle: You should not use a window operator 
>> to preaggregate because keyBy implies a shuffle. Instead you could 
>> implement a ProcessFunction with operator state. In this solution you 
>> need to implement the windowing logic yourself, i.e., group data in 
>> window based on their timestamp. Ensure you don't run out of memory 
>> (operator state is kept on the heap), etc. So this solution needs 
>> quite a bit of manual tuning.
>> - To reduce the impact of skewed data: You can use a window 
>> aggregation if you don't mind the shuffle. However, you should add an 
>> additional artificial key attribute to spread out the computation of 
>> the same original key to more grouping key. Note that Flink assigns 
>> grouping keys by hash partitioning to workers. This works well for 
>> many distinct keys, but might cause issues in case of low key 
>> cardinality. Also note that the state size grows and effectiveness 
>> reduces with an increasing cardinality of the artificial key.
>>
>> Hope this helps,
>> Fabian
>>
>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com 
>> <ma...@gmail.com>>:
>>
>>     Do you mean you want to keep the origin window as well as doing
>>     some combine operations inside window in the same time?
>>     What kind of data do you expect the following operator will receive?
>>
>>     Best,
>>     Kurt
>>
>>     On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com
>>     <ma...@gmail.com>> wrote:
>>
>>         Thank Kurt I'm trying out WindowedStream aggregate right now.
>>         Just wondering, is there any way for me to preserve the
>>         window after aggregation. More specifically, originally i
>>         have something like:
>>
>>         WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow>
>>         windowStream = dataStream
>>         .keyBy(0) //id
>>         .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>
>>         and then for the reducer I can do:
>>         windowStream.apply(...)
>>
>>         and expect the window information is preserved.
>>
>>         If I were to do use aggregate on window stream, I would end
>>         up with something like:
>>
>>         DataStream<Tuple2<String, Long>> windowStream = dataStream
>>         .keyBy(0) //id
>>         .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
>>         (new AggregateFunction<Tuple2<String, Long>, Accumulator,
>>         Tuple2<String, Long>>() {
>>         @Override
>>         public Accumulator createAccumulator() {
>>           return null;
>>                             }
>>
>>         @Override
>>         public void add(Tuple2<String, Long> stringLong, Accumulator o) {
>>
>>                             }
>>
>>         @Override
>>         public Tuple2<String, Long> getResult(Accumulator o) {
>>           return null;
>>                             }
>>
>>         @Override
>>         public Accumulator merge(Accumulator o, Accumulator acc1) {
>>           return null;
>>                             }
>>                         });
>>
>>         Because it looks like aggregate would only transfer
>>         WindowedStream to a DataStream. But for a global aggregation
>>         phase (a reducer), should I extract the window again?
>>
>>
>>         Thanks! I apologize if that sounds like a very intuitive
>>         questions.
>>
>>
>>         Le
>>
>>
>>
>>
>>
>>
>>         On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt836@gmail.com
>>         <ma...@gmail.com>> wrote:
>>
>>             I think you can use WindowedStream.aggreate
>>
>>             Best,
>>             Kurt
>>
>>             On Tue, Oct 24, 2017 at 1:45 PM, Le Xu
>>             <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>
>>                 Thanks Kurt. Maybe I wasn't clear before, I was
>>                 wondering if Flink has implementation of combiner in
>>                 DataStream (to use after keyBy and windowing).
>>
>>                 Thanks again!
>>
>>                 Le
>>
>>                 On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young
>>                 <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>
>>                     Hi,
>>
>>                     The document you are looking at is pretty old,
>>                     you can check the newest version here:
>>                     https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
>>                     <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>
>>                     Regarding to your question, you can use combineGroup
>>
>>                     Best,
>>                     Kurt
>>
>>                     On Mon, Oct 23, 2017 at 5:22 AM, Le Xu
>>                     <sharonxu65@gmail.com
>>                     <ma...@gmail.com>> wrote:
>>
>>                         Hello!
>>
>>                         I'm new to Flink and I'm wondering if there
>>                         is a explicit local combiner to each mapper
>>                         so I can use to perform a local reduce on
>>                         each mapper? I looked up on
>>                         https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
>>                         <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
>>                         but couldn't find anything that matches.
>>
>>
>>                         Thanks!
>>
>>                         Le
>>
>>
>>
>>
>>
>>
>>
>

Re: Local combiner on each mapper in Flink

Posted by Le Xu <sh...@gmail.com>.
Thanks guys! That makes more sense now. 

So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition. 

If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.

Le




> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi,
> 
> in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.
> 
> The question is, why do you need a combiner in your use case.
> - To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
> - To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.
> 
> Hope this helps,
> Fabian
> 
> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com <ma...@gmail.com>>:
> Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
> What kind of data do you expect the following operator will receive?
> 
> Best,
> Kurt
> 
> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
> Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:
> 
> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
>                 .keyBy(0) //id 
>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
> 
> and then for the reducer I can do:
>  
> windowStream.apply(...) 
> 
> and expect the window information is preserved.
> 
> If I were to do use aggregate on window stream, I would end up with something like:
> 
> DataStream<Tuple2<String, Long>> windowStream = dataStream
>                 .keyBy(0) //id 
>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
> 				(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
>                     @Override
>                     public Accumulator createAccumulator() {
>                         return null;
>                     }
> 
>                     @Override
>                     public void add(Tuple2<String, Long> stringLong, Accumulator o) 					{
> 
>                     }
> 
>                     @Override
>                     public Tuple2<String, Long> getResult(Accumulator o) {
>                         return null;
>                     }
> 
>                     @Override
>                     public Accumulator merge(Accumulator o, Accumulator acc1) {
>                         return null;
>                     }
>                 });
> 
> Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?
> 
> 
> Thanks! I apologize if that sounds like a very intuitive questions.
> 
> 
> Le
> 
> 
> 
> 
> 
> 
> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
> I think you can use WindowedStream.aggreate
> 
> Best,
> Kurt
> 
> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).
> 
> Thanks again!
> 
> Le
> 
> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
> 
> Regarding to your question, you can use combineGroup 
> 
> Best,
> Kurt
> 
> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
> Hello!
> 
> I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html> but couldn't find anything that matches.
> 
> 
> Thanks!
> 
> Le
> 
> 
> 
> 
> 
> 


Re: Local combiner on each mapper in Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

in a MapReduce context, combiners are used to reduce the amount of data 1)
to shuffle and fully sort (to group the data by key) and 2) to reduce the
impact of skewed data.

The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator to
preaggregate because keyBy implies a shuffle. Instead you could implement a
ProcessFunction with operator state. In this solution you need to implement
the windowing logic yourself, i.e., group data in window based on their
timestamp. Ensure you don't run out of memory (operator state is kept on
the heap), etc. So this solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window aggregation if
you don't mind the shuffle. However, you should add an additional
artificial key attribute to spread out the computation of the same original
key to more grouping key. Note that Flink assigns grouping keys by hash
partitioning to workers. This works well for many distinct keys, but might
cause issues in case of low key cardinality. Also note that the state size
grows and effectiveness reduces with an increasing cardinality of the
artificial key.

Hope this helps,
Fabian

2017-10-26 3:32 GMT+02:00 Kurt Young <yk...@gmail.com>:

> Do you mean you want to keep the origin window as well as doing some
> combine operations inside window in the same time?
> What kind of data do you expect the following operator will receive?
>
> Best,
> Kurt
>
> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sh...@gmail.com> wrote:
>
>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just
>> wondering, is there any way for me to preserve the window after
>> aggregation. More specifically, originally i have something like:
>>
>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream =
>> dataStream
>>                 .keyBy(0) //id
>>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>
>> and then for the reducer I can do:
>>
>> windowStream.apply(...)
>>
>> and expect the window information is preserved.
>>
>> If I were to do use aggregate on window stream, I would end up with
>> something like:
>>
>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>>                 .keyBy(0) //id
>>                 .timeWindow(Time.of(windowSize,
>> TimeUnit.MILLISECONDS)).aggregate
>> (new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String,
>> Long>>() {
>>                     @Override
>>                     public Accumulator createAccumulator() {
>>                         return null;
>>                     }
>>
>>                     @Override
>>                     public void add(Tuple2<String, Long> stringLong,
>> Accumulator o) {
>>
>>                     }
>>
>>                     @Override
>>                     public Tuple2<String, Long> getResult(Accumulator o) {
>>                         return null;
>>                     }
>>
>>                     @Override
>>                     public Accumulator merge(Accumulator o, Accumulator
>> acc1) {
>>                         return null;
>>                     }
>>                 });
>>
>> Because it looks like aggregate would only transfer WindowedStream to a
>> DataStream. But for a global aggregation phase (a reducer), should I
>> extract the window again?
>>
>>
>> Thanks! I apologize if that sounds like a very intuitive questions.
>>
>>
>> Le
>>
>>
>>
>>
>>
>>
>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <yk...@gmail.com> wrote:
>>
>>> I think you can use WindowedStream.aggreate
>>>
>>> Best,
>>> Kurt
>>>
>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sh...@gmail.com> wrote:
>>>
>>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
>>>> implementation of combiner in DataStream (to use after keyBy and windowing).
>>>>
>>>> Thanks again!
>>>>
>>>> Le
>>>>
>>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> The document you are looking at is pretty old, you can check the
>>>>> newest version here: https://ci.apache.org/pr
>>>>> ojects/flink/flink-docs-release-1.3/dev/batch/dataset_transf
>>>>> ormations.html
>>>>>
>>>>> Regarding to your question, you can use combineGroup
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>>>>>
>>>>>> Hello!
>>>>>>
>>>>>> I'm new to Flink and I'm wondering if there is a explicit local
>>>>>> combiner to each mapper so I can use to perform a local reduce on each
>>>>>> mapper? I looked up on https://ci.apache.org/proje
>>>>>> cts/flink/flink-docs-release-0.8/dataset_transformations.html but
>>>>>> couldn't find anything that matches.
>>>>>>
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Le
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Local combiner on each mapper in Flink

Posted by Kurt Young <yk...@gmail.com>.
Do you mean you want to keep the origin window as well as doing some
combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sh...@gmail.com> wrote:

> Thank Kurt I'm trying out WindowedStream aggregate right now. Just
> wondering, is there any way for me to preserve the window after
> aggregation. More specifically, originally i have something like:
>
> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream =
> dataStream
>                 .keyBy(0) //id
>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>
> and then for the reducer I can do:
>
> windowStream.apply(...)
>
> and expect the window information is preserved.
>
> If I were to do use aggregate on window stream, I would end up with
> something like:
>
> DataStream<Tuple2<String, Long>> windowStream = dataStream
>                 .keyBy(0) //id
>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).
> aggregate
> (new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String,
> Long>>() {
>                     @Override
>                     public Accumulator createAccumulator() {
>                         return null;
>                     }
>
>                     @Override
>                     public void add(Tuple2<String, Long> stringLong,
> Accumulator o) {
>
>                     }
>
>                     @Override
>                     public Tuple2<String, Long> getResult(Accumulator o) {
>                         return null;
>                     }
>
>                     @Override
>                     public Accumulator merge(Accumulator o, Accumulator
> acc1) {
>                         return null;
>                     }
>                 });
>
> Because it looks like aggregate would only transfer WindowedStream to a
> DataStream. But for a global aggregation phase (a reducer), should I
> extract the window again?
>
>
> Thanks! I apologize if that sounds like a very intuitive questions.
>
>
> Le
>
>
>
>
>
>
> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <yk...@gmail.com> wrote:
>
>> I think you can use WindowedStream.aggreate
>>
>> Best,
>> Kurt
>>
>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sh...@gmail.com> wrote:
>>
>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
>>> implementation of combiner in DataStream (to use after keyBy and windowing).
>>>
>>> Thanks again!
>>>
>>> Le
>>>
>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> The document you are looking at is pretty old, you can check the newest
>>>> version here: https://ci.apache.org/projects/flink/flink-docs-releas
>>>> e-1.3/dev/batch/dataset_transformations.html
>>>>
>>>> Regarding to your question, you can use combineGroup
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>> I'm new to Flink and I'm wondering if there is a explicit local
>>>>> combiner to each mapper so I can use to perform a local reduce on each
>>>>> mapper? I looked up on https://ci.apache.org/proje
>>>>> cts/flink/flink-docs-release-0.8/dataset_transformations.html but
>>>>> couldn't find anything that matches.
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Le
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Local combiner on each mapper in Flink

Posted by Le Xu <sh...@gmail.com>.
Thank Kurt I'm trying out WindowedStream aggregate right now. Just
wondering, is there any way for me to preserve the window after
aggregation. More specifically, originally i have something like:

WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream =
dataStream
                .keyBy(0) //id
                .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:

windowStream.apply(...)

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with
something like:

DataStream<Tuple2<String, Long>> windowStream = dataStream
                .keyBy(0) //id
                .timeWindow(Time.of(windowSize,
TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String,
Long>>() {
                    @Override
                    public Accumulator createAccumulator() {
                        return null;
                    }

                    @Override
                    public void add(Tuple2<String, Long> stringLong,
Accumulator o) {

                    }

                    @Override
                    public Tuple2<String, Long> getResult(Accumulator o) {
                        return null;
                    }

                    @Override
                    public Accumulator merge(Accumulator o, Accumulator
acc1) {
                        return null;
                    }
                });

Because it looks like aggregate would only transfer WindowedStream to a
DataStream. But for a global aggregation phase (a reducer), should I
extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <yk...@gmail.com> wrote:

> I think you can use WindowedStream.aggreate
>
> Best,
> Kurt
>
> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sh...@gmail.com> wrote:
>
>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
>> implementation of combiner in DataStream (to use after keyBy and windowing).
>>
>> Thanks again!
>>
>> Le
>>
>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The document you are looking at is pretty old, you can check the newest
>>> version here: https://ci.apache.org/projects/flink/flink-docs-releas
>>> e-1.3/dev/batch/dataset_transformations.html
>>>
>>> Regarding to your question, you can use combineGroup
>>>
>>> Best,
>>> Kurt
>>>
>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>>>
>>>> Hello!
>>>>
>>>> I'm new to Flink and I'm wondering if there is a explicit local
>>>> combiner to each mapper so I can use to perform a local reduce on each
>>>> mapper? I looked up on https://ci.apache.org/proje
>>>> cts/flink/flink-docs-release-0.8/dataset_transformations.html but
>>>> couldn't find anything that matches.
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> Le
>>>>
>>>
>>>
>>
>

Re: Local combiner on each mapper in Flink

Posted by Kurt Young <yk...@gmail.com>.
I think you can use WindowedStream.aggreate

Best,
Kurt

On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sh...@gmail.com> wrote:

> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
> implementation of combiner in DataStream (to use after keyBy and windowing).
>
> Thanks again!
>
> Le
>
> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:
>
>> Hi,
>>
>> The document you are looking at is pretty old, you can check the newest
>> version here: https://ci.apache.org/projects/flink/flink-docs-releas
>> e-1.3/dev/batch/dataset_transformations.html
>>
>> Regarding to your question, you can use combineGroup
>>
>> Best,
>> Kurt
>>
>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>>
>>> Hello!
>>>
>>> I'm new to Flink and I'm wondering if there is a explicit local combiner
>>> to each mapper so I can use to perform a local reduce on each mapper? I
>>> looked up on https://ci.apache.org/projects/flink/flink-docs-release-0
>>> .8/dataset_transformations.html but couldn't find anything that matches.
>>>
>>>
>>> Thanks!
>>>
>>> Le
>>>
>>
>>
>

Re: Local combiner on each mapper in Flink

Posted by Le Xu <sh...@gmail.com>.
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
implementation of combiner in DataStream (to use after keyBy and windowing).

Thanks again!

Le

On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:

> Hi,
>
> The document you are looking at is pretty old, you can check the newest
> version here: https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/batch/dataset_transformations.html
>
> Regarding to your question, you can use combineGroup
>
> Best,
> Kurt
>
> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>
>> Hello!
>>
>> I'm new to Flink and I'm wondering if there is a explicit local combiner
>> to each mapper so I can use to perform a local reduce on each mapper? I
>> looked up on https://ci.apache.org/projects/flink/flink-docs-release-
>> 0.8/dataset_transformations.html but couldn't find anything that matches.
>>
>>
>> Thanks!
>>
>> Le
>>
>
>

Re: Local combiner on each mapper in Flink

Posted by Kurt Young <yk...@gmail.com>.
Hi,

The document you are looking at is pretty old, you can check the newest
version here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

Regarding to your question, you can use combineGroup

Best,
Kurt

On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:

> Hello!
>
> I'm new to Flink and I'm wondering if there is a explicit local combiner
> to each mapper so I can use to perform a local reduce on each mapper? I
> looked up on https://ci.apache.org/projects/flink/flink-docs-
> release-0.8/dataset_transformations.html but couldn't find anything that
> matches.
>
>
> Thanks!
>
> Le
>