You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Le Xu <sh...@gmail.com> on 2017/10/22 21:22:00 UTC
Local combiner on each mapper in Flink
Hello!
I'm new to Flink and I'm wondering if there is a explicit local combiner to
each mapper so I can use to perform a local reduce on each mapper? I looked
up on
https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
but couldn't find anything that matches.
Thanks!
Le
Re: Local combiner on each mapper in Flink
Posted by Le Xu <sh...@gmail.com>.
Thanks for the help! I’ll try out the ProcessFunction then.
Le
> On Oct 26, 2017, at 8:03 AM, Kien Truong <du...@gmail.com> wrote:
>
> Hi,
> For Streaming API, use a ProcessFunction as Fabian's suggestion.
> You can pretty much do anything with a ProcessFunction :)
>
> Best regards,
>
> Kien
>
>
> On 10/26/2017 8:01 PM, Le Xu wrote:
>> Hi Kien:
>>
>> Is there a similar API for DataStream as well?
>>
>> Thanks!
>>
>> Le
>>
>>
>>> On Oct 26, 2017, at 7:58 AM, Kien Truong <duckientruong@gmail.com <ma...@gmail.com>> wrote:
>>>
>>> Hi,
>>>
>>> For batch API, you can use GroupReduceFunction, which give you the same benefit as a MapReduce combiner.
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions>Regards,
>>> Kien
>>>
>>>
>>> On 10/26/2017 7:37 PM, Le Xu wrote:
>>>> Thanks guys! That makes more sense now.
>>>>
>>>> So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition.
>>>>
>>>> If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.
>>>>
>>>> Le
>>>>
>>>>
>>>>
>>>>
>>>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.
>>>>>
>>>>> The question is, why do you need a combiner in your use case.
>>>>> - To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
>>>>> - To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.
>>>>>
>>>>> Hope this helps,
>>>>> Fabian
>>>>>
>>>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com <ma...@gmail.com>>:
>>>>> Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
>>>>> What kind of data do you expect the following operator will receive?
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>>>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:
>>>>>
>>>>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
>>>>> .keyBy(0) //id
>>>>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>>>>
>>>>> and then for the reducer I can do:
>>>>>
>>>>> windowStream.apply(...)
>>>>>
>>>>> and expect the window information is preserved.
>>>>>
>>>>> If I were to do use aggregate on window stream, I would end up with something like:
>>>>>
>>>>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>>>>> .keyBy(0) //id
>>>>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
>>>>> (new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
>>>>> @Override
>>>>> public Accumulator createAccumulator() {
>>>>> return null;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void add(Tuple2<String, Long> stringLong, Accumulator o) {
>>>>>
>>>>> }
>>>>>
>>>>> @Override
>>>>> public Tuple2<String, Long> getResult(Accumulator o) {
>>>>> return null;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public Accumulator merge(Accumulator o, Accumulator acc1) {
>>>>> return null;
>>>>> }
>>>>> });
>>>>>
>>>>> Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?
>>>>>
>>>>>
>>>>> Thanks! I apologize if that sounds like a very intuitive questions.
>>>>>
>>>>>
>>>>> Le
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>>>> I think you can use WindowedStream.aggreate
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).
>>>>>
>>>>> Thanks again!
>>>>>
>>>>> Le
>>>>>
>>>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hi,
>>>>>
>>>>> The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>>>>
>>>>> Regarding to your question, you can use combineGroup
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hello!
>>>>>
>>>>> I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html> but couldn't find anything that matches.
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Le
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>
Re: Local combiner on each mapper in Flink
Posted by Kien Truong <du...@gmail.com>.
Hi,
For Streaming API, use a ProcessFunction as Fabian's suggestion.
You can pretty much do anything with a ProcessFunction :)
Best regards,
Kien
On 10/26/2017 8:01 PM, Le Xu wrote:
> Hi Kien:
>
> Is there a similar API for DataStream as well?
>
> Thanks!
>
> Le
>
>
>> On Oct 26, 2017, at 7:58 AM, Kien Truong <duckientruong@gmail.com
>> <ma...@gmail.com>> wrote:
>>
>> Hi,
>>
>> For batch API, you can use GroupReduceFunction, which give you the
>> same benefit as a MapReduce combiner.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
>>
>> Regards,
>>
>> Kien
>>
>>
>> On 10/26/2017 7:37 PM, Le Xu wrote:
>>> Thanks guys! That makes more sense now.
>>>
>>> So does it mean once I start use a window operator, all operations
>>> on my WindowedStream would be global (across all partitions)? In
>>> that case, WindowedStream.aggregate (or sum) would apply to all data
>>> after shuffling instead of each partition.
>>>
>>> If I understand this correctly, once I want to perform some sort of
>>> counting within each partition for different words (such as word
>>> count), I should really avoid using keyBy but keep some sort of
>>> counting map for each word while also keep track of the current time
>>> stamp, inside each mapper.
>>>
>>> Le
>>>
>>>
>>>
>>>
>>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhueske@gmail.com
>>>> <ma...@gmail.com>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> in a MapReduce context, combiners are used to reduce the amount of
>>>> data 1) to shuffle and fully sort (to group the data by key) and 2)
>>>> to reduce the impact of skewed data.
>>>>
>>>> The question is, why do you need a combiner in your use case.
>>>> - To reduce the data to shuffle: You should not use a window
>>>> operator to preaggregate because keyBy implies a shuffle. Instead
>>>> you could implement a ProcessFunction with operator state. In this
>>>> solution you need to implement the windowing logic yourself, i.e.,
>>>> group data in window based on their timestamp. Ensure you don't run
>>>> out of memory (operator state is kept on the heap), etc. So this
>>>> solution needs quite a bit of manual tuning.
>>>> - To reduce the impact of skewed data: You can use a window
>>>> aggregation if you don't mind the shuffle. However, you should add
>>>> an additional artificial key attribute to spread out the
>>>> computation of the same original key to more grouping key. Note
>>>> that Flink assigns grouping keys by hash partitioning to workers.
>>>> This works well for many distinct keys, but might cause issues in
>>>> case of low key cardinality. Also note that the state size grows
>>>> and effectiveness reduces with an increasing cardinality of the
>>>> artificial key.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com
>>>> <ma...@gmail.com>>:
>>>>
>>>> Do you mean you want to keep the origin window as well as doing
>>>> some combine operations inside window in the same time?
>>>> What kind of data do you expect the following operator will
>>>> receive?
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com
>>>> <ma...@gmail.com>> wrote:
>>>>
>>>> Thank Kurt I'm trying out WindowedStream aggregate right
>>>> now. Just wondering, is there any way for me to preserve
>>>> the window after aggregation. More specifically, originally
>>>> i have something like:
>>>>
>>>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow>
>>>> windowStream = dataStream
>>>> .keyBy(0) //id
>>>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>>>
>>>> and then for the reducer I can do:
>>>> windowStream.apply(...)
>>>>
>>>> and expect the window information is preserved.
>>>>
>>>> If I were to do use aggregate on window stream, I would end
>>>> up with something like:
>>>>
>>>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>>>> .keyBy(0) //id
>>>> .timeWindow(Time.of(windowSize,
>>>> TimeUnit.MILLISECONDS)).aggregate
>>>> (new AggregateFunction<Tuple2<String, Long>, Accumulator,
>>>> Tuple2<String, Long>>() {
>>>> @Override
>>>> public Accumulator createAccumulator() {
>>>> return null;
>>>> }
>>>>
>>>> @Override
>>>> public void add(Tuple2<String, Long> stringLong,
>>>> Accumulator o) {
>>>>
>>>> }
>>>>
>>>> @Override
>>>> public Tuple2<String, Long> getResult(Accumulator
>>>> o) {
>>>> return null;
>>>> }
>>>>
>>>> @Override
>>>> public Accumulator merge(Accumulator o,
>>>> Accumulator acc1) {
>>>> return null;
>>>> }
>>>> });
>>>>
>>>> Because it looks like aggregate would only transfer
>>>> WindowedStream to a DataStream. But for a global
>>>> aggregation phase (a reducer), should I extract the window
>>>> again?
>>>>
>>>>
>>>> Thanks! I apologize if that sounds like a very intuitive
>>>> questions.
>>>>
>>>>
>>>> Le
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young
>>>> <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>>>
>>>> I think you can use WindowedStream.aggreate
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu
>>>> <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>>>
>>>> Thanks Kurt. Maybe I wasn't clear before, I was
>>>> wondering if Flink has implementation of combiner
>>>> in DataStream (to use after keyBy and windowing).
>>>>
>>>> Thanks again!
>>>>
>>>> Le
>>>>
>>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young
>>>> <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> The document you are looking at is pretty old,
>>>> you can check the newest version here:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>>>
>>>> Regarding to your question, you can use
>>>> combineGroup
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu
>>>> <sharonxu65@gmail.com
>>>> <ma...@gmail.com>> wrote:
>>>>
>>>> Hello!
>>>>
>>>> I'm new to Flink and I'm wondering if there
>>>> is a explicit local combiner to each mapper
>>>> so I can use to perform a local reduce on
>>>> each mapper? I looked up on
>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
>>>> but couldn't find anything that matches.
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> Le
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>
Re: Local combiner on each mapper in Flink
Posted by Le Xu <sh...@gmail.com>.
Hi Kien:
Is there a similar API for DataStream as well?
Thanks!
Le
> On Oct 26, 2017, at 7:58 AM, Kien Truong <du...@gmail.com> wrote:
>
> Hi,
>
> For batch API, you can use GroupReduceFunction, which give you the same benefit as a MapReduce combiner.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions>Regards,
> Kien
>
>
> On 10/26/2017 7:37 PM, Le Xu wrote:
>> Thanks guys! That makes more sense now.
>>
>> So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition.
>>
>> If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.
>>
>> Le
>>
>>
>>
>>
>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>>>
>>> Hi,
>>>
>>> in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.
>>>
>>> The question is, why do you need a combiner in your use case.
>>> - To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
>>> - To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com <ma...@gmail.com>>:
>>> Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
>>> What kind of data do you expect the following operator will receive?
>>>
>>> Best,
>>> Kurt
>>>
>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:
>>>
>>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
>>> .keyBy(0) //id
>>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>>
>>> and then for the reducer I can do:
>>>
>>> windowStream.apply(...)
>>>
>>> and expect the window information is preserved.
>>>
>>> If I were to do use aggregate on window stream, I would end up with something like:
>>>
>>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>>> .keyBy(0) //id
>>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
>>> (new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
>>> @Override
>>> public Accumulator createAccumulator() {
>>> return null;
>>> }
>>>
>>> @Override
>>> public void add(Tuple2<String, Long> stringLong, Accumulator o) {
>>>
>>> }
>>>
>>> @Override
>>> public Tuple2<String, Long> getResult(Accumulator o) {
>>> return null;
>>> }
>>>
>>> @Override
>>> public Accumulator merge(Accumulator o, Accumulator acc1) {
>>> return null;
>>> }
>>> });
>>>
>>> Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?
>>>
>>>
>>> Thanks! I apologize if that sounds like a very intuitive questions.
>>>
>>>
>>> Le
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>> I think you can use WindowedStream.aggreate
>>>
>>> Best,
>>> Kurt
>>>
>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).
>>>
>>> Thanks again!
>>>
>>> Le
>>>
>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>> Hi,
>>>
>>> The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>>
>>> Regarding to your question, you can use combineGroup
>>>
>>> Best,
>>> Kurt
>>>
>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>> Hello!
>>>
>>> I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html> but couldn't find anything that matches.
>>>
>>>
>>> Thanks!
>>>
>>> Le
>>>
>>>
>>>
>>>
>>>
>>>
>>
Re: Local combiner on each mapper in Flink
Posted by Kien Truong <du...@gmail.com>.
Hi,
For batch API, you can use GroupReduceFunction, which give you the same
benefit as a MapReduce combiner.
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
Regards,
Kien
On 10/26/2017 7:37 PM, Le Xu wrote:
> Thanks guys! That makes more sense now.
>
> So does it mean once I start use a window operator, all operations on
> my WindowedStream would be global (across all partitions)? In that
> case, WindowedStream.aggregate (or sum) would apply to all data after
> shuffling instead of each partition.
>
> If I understand this correctly, once I want to perform some sort of
> counting within each partition for different words (such as word
> count), I should really avoid using keyBy but keep some sort of
> counting map for each word while also keep track of the current time
> stamp, inside each mapper.
>
> Le
>
>
>
>
>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhueske@gmail.com
>> <ma...@gmail.com>> wrote:
>>
>> Hi,
>>
>> in a MapReduce context, combiners are used to reduce the amount of
>> data 1) to shuffle and fully sort (to group the data by key) and 2)
>> to reduce the impact of skewed data.
>>
>> The question is, why do you need a combiner in your use case.
>> - To reduce the data to shuffle: You should not use a window operator
>> to preaggregate because keyBy implies a shuffle. Instead you could
>> implement a ProcessFunction with operator state. In this solution you
>> need to implement the windowing logic yourself, i.e., group data in
>> window based on their timestamp. Ensure you don't run out of memory
>> (operator state is kept on the heap), etc. So this solution needs
>> quite a bit of manual tuning.
>> - To reduce the impact of skewed data: You can use a window
>> aggregation if you don't mind the shuffle. However, you should add an
>> additional artificial key attribute to spread out the computation of
>> the same original key to more grouping key. Note that Flink assigns
>> grouping keys by hash partitioning to workers. This works well for
>> many distinct keys, but might cause issues in case of low key
>> cardinality. Also note that the state size grows and effectiveness
>> reduces with an increasing cardinality of the artificial key.
>>
>> Hope this helps,
>> Fabian
>>
>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com
>> <ma...@gmail.com>>:
>>
>> Do you mean you want to keep the origin window as well as doing
>> some combine operations inside window in the same time?
>> What kind of data do you expect the following operator will receive?
>>
>> Best,
>> Kurt
>>
>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com
>> <ma...@gmail.com>> wrote:
>>
>> Thank Kurt I'm trying out WindowedStream aggregate right now.
>> Just wondering, is there any way for me to preserve the
>> window after aggregation. More specifically, originally i
>> have something like:
>>
>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow>
>> windowStream = dataStream
>> .keyBy(0) //id
>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>
>> and then for the reducer I can do:
>> windowStream.apply(...)
>>
>> and expect the window information is preserved.
>>
>> If I were to do use aggregate on window stream, I would end
>> up with something like:
>>
>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>> .keyBy(0) //id
>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
>> (new AggregateFunction<Tuple2<String, Long>, Accumulator,
>> Tuple2<String, Long>>() {
>> @Override
>> public Accumulator createAccumulator() {
>> return null;
>> }
>>
>> @Override
>> public void add(Tuple2<String, Long> stringLong, Accumulator o) {
>>
>> }
>>
>> @Override
>> public Tuple2<String, Long> getResult(Accumulator o) {
>> return null;
>> }
>>
>> @Override
>> public Accumulator merge(Accumulator o, Accumulator acc1) {
>> return null;
>> }
>> });
>>
>> Because it looks like aggregate would only transfer
>> WindowedStream to a DataStream. But for a global aggregation
>> phase (a reducer), should I extract the window again?
>>
>>
>> Thanks! I apologize if that sounds like a very intuitive
>> questions.
>>
>>
>> Le
>>
>>
>>
>>
>>
>>
>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt836@gmail.com
>> <ma...@gmail.com>> wrote:
>>
>> I think you can use WindowedStream.aggreate
>>
>> Best,
>> Kurt
>>
>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu
>> <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
>>
>> Thanks Kurt. Maybe I wasn't clear before, I was
>> wondering if Flink has implementation of combiner in
>> DataStream (to use after keyBy and windowing).
>>
>> Thanks again!
>>
>> Le
>>
>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young
>> <ykt836@gmail.com <ma...@gmail.com>> wrote:
>>
>> Hi,
>>
>> The document you are looking at is pretty old,
>> you can check the newest version here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>
>> Regarding to your question, you can use combineGroup
>>
>> Best,
>> Kurt
>>
>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu
>> <sharonxu65@gmail.com
>> <ma...@gmail.com>> wrote:
>>
>> Hello!
>>
>> I'm new to Flink and I'm wondering if there
>> is a explicit local combiner to each mapper
>> so I can use to perform a local reduce on
>> each mapper? I looked up on
>> https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
>> <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
>> but couldn't find anything that matches.
>>
>>
>> Thanks!
>>
>> Le
>>
>>
>>
>>
>>
>>
>>
>
Re: Local combiner on each mapper in Flink
Posted by Le Xu <sh...@gmail.com>.
Thanks guys! That makes more sense now.
So does it mean once I start use a window operator, all operations on my WindowedStream would be global (across all partitions)? In that case, WindowedStream.aggregate (or sum) would apply to all data after shuffling instead of each partition.
If I understand this correctly, once I want to perform some sort of counting within each partition for different words (such as word count), I should really avoid using keyBy but keep some sort of counting map for each word while also keep track of the current time stamp, inside each mapper.
Le
> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi,
>
> in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data.
>
> The question is, why do you need a combiner in your use case.
> - To reduce the data to shuffle: You should not use a window operator to preaggregate because keyBy implies a shuffle. Instead you could implement a ProcessFunction with operator state. In this solution you need to implement the windowing logic yourself, i.e., group data in window based on their timestamp. Ensure you don't run out of memory (operator state is kept on the heap), etc. So this solution needs quite a bit of manual tuning.
> - To reduce the impact of skewed data: You can use a window aggregation if you don't mind the shuffle. However, you should add an additional artificial key attribute to spread out the computation of the same original key to more grouping key. Note that Flink assigns grouping keys by hash partitioning to workers. This works well for many distinct keys, but might cause issues in case of low key cardinality. Also note that the state size grows and effectiveness reduces with an increasing cardinality of the artificial key.
>
> Hope this helps,
> Fabian
>
> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt836@gmail.com <ma...@gmail.com>>:
> Do you mean you want to keep the origin window as well as doing some combine operations inside window in the same time?
> What kind of data do you expect the following operator will receive?
>
> Best,
> Kurt
>
> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
> Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, is there any way for me to preserve the window after aggregation. More specifically, originally i have something like:
>
> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = dataStream
> .keyBy(0) //id
> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>
> and then for the reducer I can do:
>
> windowStream.apply(...)
>
> and expect the window information is preserved.
>
> If I were to do use aggregate on window stream, I would end up with something like:
>
> DataStream<Tuple2<String, Long>> windowStream = dataStream
> .keyBy(0) //id
> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).aggregate
> (new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String, Long>>() {
> @Override
> public Accumulator createAccumulator() {
> return null;
> }
>
> @Override
> public void add(Tuple2<String, Long> stringLong, Accumulator o) {
>
> }
>
> @Override
> public Tuple2<String, Long> getResult(Accumulator o) {
> return null;
> }
>
> @Override
> public Accumulator merge(Accumulator o, Accumulator acc1) {
> return null;
> }
> });
>
> Because it looks like aggregate would only transfer WindowedStream to a DataStream. But for a global aggregation phase (a reducer), should I extract the window again?
>
>
> Thanks! I apologize if that sounds like a very intuitive questions.
>
>
> Le
>
>
>
>
>
>
> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
> I think you can use WindowedStream.aggreate
>
> Best,
> Kurt
>
> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has implementation of combiner in DataStream (to use after keyBy and windowing).
>
> Thanks again!
>
> Le
>
> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt836@gmail.com <ma...@gmail.com>> wrote:
> Hi,
>
> The document you are looking at is pretty old, you can check the newest version here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>
> Regarding to your question, you can use combineGroup
>
> Best,
> Kurt
>
> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonxu65@gmail.com <ma...@gmail.com>> wrote:
> Hello!
>
> I'm new to Flink and I'm wondering if there is a explicit local combiner to each mapper so I can use to perform a local reduce on each mapper? I looked up on https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html> but couldn't find anything that matches.
>
>
> Thanks!
>
> Le
>
>
>
>
>
>
Re: Local combiner on each mapper in Flink
Posted by Fabian Hueske <fh...@gmail.com>.
Hi,
in a MapReduce context, combiners are used to reduce the amount of data 1)
to shuffle and fully sort (to group the data by key) and 2) to reduce the
impact of skewed data.
The question is, why do you need a combiner in your use case.
- To reduce the data to shuffle: You should not use a window operator to
preaggregate because keyBy implies a shuffle. Instead you could implement a
ProcessFunction with operator state. In this solution you need to implement
the windowing logic yourself, i.e., group data in window based on their
timestamp. Ensure you don't run out of memory (operator state is kept on
the heap), etc. So this solution needs quite a bit of manual tuning.
- To reduce the impact of skewed data: You can use a window aggregation if
you don't mind the shuffle. However, you should add an additional
artificial key attribute to spread out the computation of the same original
key to more grouping key. Note that Flink assigns grouping keys by hash
partitioning to workers. This works well for many distinct keys, but might
cause issues in case of low key cardinality. Also note that the state size
grows and effectiveness reduces with an increasing cardinality of the
artificial key.
Hope this helps,
Fabian
2017-10-26 3:32 GMT+02:00 Kurt Young <yk...@gmail.com>:
> Do you mean you want to keep the origin window as well as doing some
> combine operations inside window in the same time?
> What kind of data do you expect the following operator will receive?
>
> Best,
> Kurt
>
> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sh...@gmail.com> wrote:
>
>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just
>> wondering, is there any way for me to preserve the window after
>> aggregation. More specifically, originally i have something like:
>>
>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream =
>> dataStream
>> .keyBy(0) //id
>> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>
>> and then for the reducer I can do:
>>
>> windowStream.apply(...)
>>
>> and expect the window information is preserved.
>>
>> If I were to do use aggregate on window stream, I would end up with
>> something like:
>>
>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>> .keyBy(0) //id
>> .timeWindow(Time.of(windowSize,
>> TimeUnit.MILLISECONDS)).aggregate
>> (new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String,
>> Long>>() {
>> @Override
>> public Accumulator createAccumulator() {
>> return null;
>> }
>>
>> @Override
>> public void add(Tuple2<String, Long> stringLong,
>> Accumulator o) {
>>
>> }
>>
>> @Override
>> public Tuple2<String, Long> getResult(Accumulator o) {
>> return null;
>> }
>>
>> @Override
>> public Accumulator merge(Accumulator o, Accumulator
>> acc1) {
>> return null;
>> }
>> });
>>
>> Because it looks like aggregate would only transfer WindowedStream to a
>> DataStream. But for a global aggregation phase (a reducer), should I
>> extract the window again?
>>
>>
>> Thanks! I apologize if that sounds like a very intuitive questions.
>>
>>
>> Le
>>
>>
>>
>>
>>
>>
>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <yk...@gmail.com> wrote:
>>
>>> I think you can use WindowedStream.aggreate
>>>
>>> Best,
>>> Kurt
>>>
>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sh...@gmail.com> wrote:
>>>
>>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
>>>> implementation of combiner in DataStream (to use after keyBy and windowing).
>>>>
>>>> Thanks again!
>>>>
>>>> Le
>>>>
>>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> The document you are looking at is pretty old, you can check the
>>>>> newest version here: https://ci.apache.org/pr
>>>>> ojects/flink/flink-docs-release-1.3/dev/batch/dataset_transf
>>>>> ormations.html
>>>>>
>>>>> Regarding to your question, you can use combineGroup
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>>>>>
>>>>>> Hello!
>>>>>>
>>>>>> I'm new to Flink and I'm wondering if there is a explicit local
>>>>>> combiner to each mapper so I can use to perform a local reduce on each
>>>>>> mapper? I looked up on https://ci.apache.org/proje
>>>>>> cts/flink/flink-docs-release-0.8/dataset_transformations.html but
>>>>>> couldn't find anything that matches.
>>>>>>
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Le
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
Re: Local combiner on each mapper in Flink
Posted by Kurt Young <yk...@gmail.com>.
Do you mean you want to keep the origin window as well as doing some
combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?
Best,
Kurt
On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sh...@gmail.com> wrote:
> Thank Kurt I'm trying out WindowedStream aggregate right now. Just
> wondering, is there any way for me to preserve the window after
> aggregation. More specifically, originally i have something like:
>
> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream =
> dataStream
> .keyBy(0) //id
> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>
> and then for the reducer I can do:
>
> windowStream.apply(...)
>
> and expect the window information is preserved.
>
> If I were to do use aggregate on window stream, I would end up with
> something like:
>
> DataStream<Tuple2<String, Long>> windowStream = dataStream
> .keyBy(0) //id
> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).
> aggregate
> (new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String,
> Long>>() {
> @Override
> public Accumulator createAccumulator() {
> return null;
> }
>
> @Override
> public void add(Tuple2<String, Long> stringLong,
> Accumulator o) {
>
> }
>
> @Override
> public Tuple2<String, Long> getResult(Accumulator o) {
> return null;
> }
>
> @Override
> public Accumulator merge(Accumulator o, Accumulator
> acc1) {
> return null;
> }
> });
>
> Because it looks like aggregate would only transfer WindowedStream to a
> DataStream. But for a global aggregation phase (a reducer), should I
> extract the window again?
>
>
> Thanks! I apologize if that sounds like a very intuitive questions.
>
>
> Le
>
>
>
>
>
>
> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <yk...@gmail.com> wrote:
>
>> I think you can use WindowedStream.aggreate
>>
>> Best,
>> Kurt
>>
>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sh...@gmail.com> wrote:
>>
>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
>>> implementation of combiner in DataStream (to use after keyBy and windowing).
>>>
>>> Thanks again!
>>>
>>> Le
>>>
>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> The document you are looking at is pretty old, you can check the newest
>>>> version here: https://ci.apache.org/projects/flink/flink-docs-releas
>>>> e-1.3/dev/batch/dataset_transformations.html
>>>>
>>>> Regarding to your question, you can use combineGroup
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>> I'm new to Flink and I'm wondering if there is a explicit local
>>>>> combiner to each mapper so I can use to perform a local reduce on each
>>>>> mapper? I looked up on https://ci.apache.org/proje
>>>>> cts/flink/flink-docs-release-0.8/dataset_transformations.html but
>>>>> couldn't find anything that matches.
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Le
>>>>>
>>>>
>>>>
>>>
>>
>
Re: Local combiner on each mapper in Flink
Posted by Le Xu <sh...@gmail.com>.
Thank Kurt I'm trying out WindowedStream aggregate right now. Just
wondering, is there any way for me to preserve the window after
aggregation. More specifically, originally i have something like:
WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream =
dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
and then for the reducer I can do:
windowStream.apply(...)
and expect the window information is preserved.
If I were to do use aggregate on window stream, I would end up with
something like:
DataStream<Tuple2<String, Long>> windowStream = dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize,
TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction<Tuple2<String, Long>, Accumulator, Tuple2<String,
Long>>() {
@Override
public Accumulator createAccumulator() {
return null;
}
@Override
public void add(Tuple2<String, Long> stringLong,
Accumulator o) {
}
@Override
public Tuple2<String, Long> getResult(Accumulator o) {
return null;
}
@Override
public Accumulator merge(Accumulator o, Accumulator
acc1) {
return null;
}
});
Because it looks like aggregate would only transfer WindowedStream to a
DataStream. But for a global aggregation phase (a reducer), should I
extract the window again?
Thanks! I apologize if that sounds like a very intuitive questions.
Le
On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <yk...@gmail.com> wrote:
> I think you can use WindowedStream.aggreate
>
> Best,
> Kurt
>
> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sh...@gmail.com> wrote:
>
>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
>> implementation of combiner in DataStream (to use after keyBy and windowing).
>>
>> Thanks again!
>>
>> Le
>>
>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The document you are looking at is pretty old, you can check the newest
>>> version here: https://ci.apache.org/projects/flink/flink-docs-releas
>>> e-1.3/dev/batch/dataset_transformations.html
>>>
>>> Regarding to your question, you can use combineGroup
>>>
>>> Best,
>>> Kurt
>>>
>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>>>
>>>> Hello!
>>>>
>>>> I'm new to Flink and I'm wondering if there is a explicit local
>>>> combiner to each mapper so I can use to perform a local reduce on each
>>>> mapper? I looked up on https://ci.apache.org/proje
>>>> cts/flink/flink-docs-release-0.8/dataset_transformations.html but
>>>> couldn't find anything that matches.
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> Le
>>>>
>>>
>>>
>>
>
Re: Local combiner on each mapper in Flink
Posted by Kurt Young <yk...@gmail.com>.
I think you can use WindowedStream.aggreate
Best,
Kurt
On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sh...@gmail.com> wrote:
> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
> implementation of combiner in DataStream (to use after keyBy and windowing).
>
> Thanks again!
>
> Le
>
> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:
>
>> Hi,
>>
>> The document you are looking at is pretty old, you can check the newest
>> version here: https://ci.apache.org/projects/flink/flink-docs-releas
>> e-1.3/dev/batch/dataset_transformations.html
>>
>> Regarding to your question, you can use combineGroup
>>
>> Best,
>> Kurt
>>
>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>>
>>> Hello!
>>>
>>> I'm new to Flink and I'm wondering if there is a explicit local combiner
>>> to each mapper so I can use to perform a local reduce on each mapper? I
>>> looked up on https://ci.apache.org/projects/flink/flink-docs-release-0
>>> .8/dataset_transformations.html but couldn't find anything that matches.
>>>
>>>
>>> Thanks!
>>>
>>> Le
>>>
>>
>>
>
Re: Local combiner on each mapper in Flink
Posted by Le Xu <sh...@gmail.com>.
Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
implementation of combiner in DataStream (to use after keyBy and windowing).
Thanks again!
Le
On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <yk...@gmail.com> wrote:
> Hi,
>
> The document you are looking at is pretty old, you can check the newest
> version here: https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/batch/dataset_transformations.html
>
> Regarding to your question, you can use combineGroup
>
> Best,
> Kurt
>
> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
>
>> Hello!
>>
>> I'm new to Flink and I'm wondering if there is a explicit local combiner
>> to each mapper so I can use to perform a local reduce on each mapper? I
>> looked up on https://ci.apache.org/projects/flink/flink-docs-release-
>> 0.8/dataset_transformations.html but couldn't find anything that matches.
>>
>>
>> Thanks!
>>
>> Le
>>
>
>
Re: Local combiner on each mapper in Flink
Posted by Kurt Young <yk...@gmail.com>.
Hi,
The document you are looking at is pretty old, you can check the newest
version here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
Regarding to your question, you can use combineGroup
Best,
Kurt
On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sh...@gmail.com> wrote:
> Hello!
>
> I'm new to Flink and I'm wondering if there is a explicit local combiner
> to each mapper so I can use to perform a local reduce on each mapper? I
> looked up on https://ci.apache.org/projects/flink/flink-docs-
> release-0.8/dataset_transformations.html but couldn't find anything that
> matches.
>
>
> Thanks!
>
> Le
>