You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrew Roberts <ar...@fuze.com> on 2016/12/07 14:57:58 UTC

Parallelism and stateful mapping with Flink

Hello,

I’m trying to perform a stateful mapping of some objects coming in from Kafka in a parallelized flink job (set on the job using env.setParallelism(3)). The data source is a kafka topic, but the partitions aren’t meaningfully keyed for this operation (each kafka message is flatMapped to between 0-2 objects, with potentially different keys). I have a keyBy() operator directly before my map(), but I’m seeing objects with the same key distributed to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().

My understanding of keyBy is that it would segment the stream by key, and guarantee that all data with a given key would hit the same instance. Am I possibly seeing residual “keying” from the kafka topic?

I’m running flink 1.1.3 in scala. Please let me know if I can add more info.

Thanks,

Andrew

Re: Parallelism and stateful mapping with Flink

Posted by Aljoscha Krettek <al...@apache.org>.
I commented on the issue with a way that should work.

On Fri, 9 Dec 2016 at 01:00 Chesnay Schepler <ch...@apache.org> wrote:

> Done. https://issues.apache.org/jira/browse/FLINK-5299
>
> On 08.12.2016 16:50, Ufuk Celebi wrote:
> > Would you like to open an issue for this for starters Chesnay? Would be
> good to fix for the upcoming release even.
> >
> >
> > On 8 December 2016 at 16:39:58, Chesnay Schepler (chesnay@apache.org)
> wrote:
> >> It would be neat if we could support arrays as keys directly; it should
> >> boil down to checking the key type and in case of an array injecting a
> >> KeySelector that calls Arrays.hashCode(array).
> >> This worked for me when i ran into the same issue while experimenting
> >> with some stuff.
> >>
> >> The batch API can use arrays as keys as well, so it's also a matter of
> >> consistency imo.
> >>
> >> Regards,
> >> Chesnay
> >>
> >> On 08.12.2016 16:23, Ufuk Celebi wrote:
> >>> @Aljoscha: I remember that someone else ran into this, too. Should we
> address arrays
> >> as keys specifically in the API? Prohibit? Document this?
> >>> – Ufuk
> >>>
> >>> On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com)
> wrote:
> >>>> Sure!
> >>>>
> >>>> (Aside, it turns out that the issue was using an `Array[Byte]` as a
> key - byte arrays
> >> don’t
> >>>> appear to have a stable hashCode. I’ll provide the skeleton for
> fullness, though.)
> >>>>
> >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>>> env.setParallelism(Config.callAggregator.parallelism)
> >>>>
> >>>> env.addSource(kafkaSource)
> >>>> .flatMap(transformToRecords(_))
> >>>> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
> >>>> .map(new StatefulAggregator())
> >>>> .addSink(hbaseSink)
> >>>>
> >>>>
> >>>> Again, wrapping my keyBy function in `new String()` has fixed my
> issue. Thanks!
> >>>>
> >>>> -a
> >>>>
> >>>>
> >>>>
> >>>>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> could you maybe provide the (minimal) code for the problematic job?
> Also, are you
> >> sure
> >>>> that the keyBy is working on the correct key attribute?
> >>>>> Best,
> >>>>> Stefan
> >>>>>
> >>>>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
> >>>>>>
> >>>>>> Hello,
> >>>>>>
> >>>>>> I’m trying to perform a stateful mapping of some objects coming in
> from Kafka in a
> >> parallelized
> >>>> flink job (set on the job using env.setParallelism(3)). The data
> source is a kafka
> >> topic,
> >>>> but the partitions aren’t meaningfully keyed for this operation (each
> kafka message
> >>>> is flatMapped to between 0-2 objects, with potentially different
> keys). I have a keyBy()
> >>>> operator directly before my map(), but I’m seeing objects with the
> same key distributed
> >>>> to different parallel task instances, as reported by
> getRuntimeContext().getIndexOfThisSubtask().
> >>>>>> My understanding of keyBy is that it would segment the stream by
> key, and guarantee
> >>>> that all data with a given key would hit the same instance. Am I
> possibly seeing residual
> >>>> “keying” from the kafka topic?
> >>>>>> I’m running flink 1.1.3 in scala. Please let me know if I can add
> more info.
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Andrew
> >>>>
> >>
> >>
> >
>
>

Re: Parallelism and stateful mapping with Flink

Posted by Chesnay Schepler <ch...@apache.org>.
Done. https://issues.apache.org/jira/browse/FLINK-5299

On 08.12.2016 16:50, Ufuk Celebi wrote:
> Would you like to open an issue for this for starters Chesnay? Would be good to fix for the upcoming release even.
>
>
> On 8 December 2016 at 16:39:58, Chesnay Schepler (chesnay@apache.org) wrote:
>> It would be neat if we could support arrays as keys directly; it should
>> boil down to checking the key type and in case of an array injecting a
>> KeySelector that calls Arrays.hashCode(array).
>> This worked for me when i ran into the same issue while experimenting
>> with some stuff.
>>   
>> The batch API can use arrays as keys as well, so it's also a matter of
>> consistency imo.
>>   
>> Regards,
>> Chesnay
>>   
>> On 08.12.2016 16:23, Ufuk Celebi wrote:
>>> @Aljoscha: I remember that someone else ran into this, too. Should we address arrays
>> as keys specifically in the API? Prohibit? Document this?
>>> \u2013 Ufuk
>>>
>>> On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com) wrote:
>>>> Sure!
>>>>
>>>> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays
>> don\u2019t
>>>> appear to have a stable hashCode. I\u2019ll provide the skeleton for fullness, though.)
>>>>
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> env.setParallelism(Config.callAggregator.parallelism)
>>>>
>>>> env.addSource(kafkaSource)
>>>> .flatMap(transformToRecords(_))
>>>> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
>>>> .map(new StatefulAggregator())
>>>> .addSink(hbaseSink)
>>>>
>>>>
>>>> Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!
>>>>
>>>> -a
>>>>
>>>>
>>>>
>>>>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> could you maybe provide the (minimal) code for the problematic job? Also, are you
>> sure
>>>> that the keyBy is working on the correct key attribute?
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I\u2019m trying to perform a stateful mapping of some objects coming in from Kafka in a
>> parallelized
>>>> flink job (set on the job using env.setParallelism(3)). The data source is a kafka
>> topic,
>>>> but the partitions aren\u2019t meaningfully keyed for this operation (each kafka message
>>>> is flatMapped to between 0-2 objects, with potentially different keys). I have a keyBy()
>>>> operator directly before my map(), but I\u2019m seeing objects with the same key distributed
>>>> to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().
>>>>>> My understanding of keyBy is that it would segment the stream by key, and guarantee
>>>> that all data with a given key would hit the same instance. Am I possibly seeing residual
>>>> \u201ckeying\u201d from the kafka topic?
>>>>>> I\u2019m running flink 1.1.3 in scala. Please let me know if I can add more info.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Andrew
>>>>
>>   
>>   
>


Re: Parallelism and stateful mapping with Flink

Posted by Ufuk Celebi <uc...@apache.org>.
Would you like to open an issue for this for starters Chesnay? Would be good to fix for the upcoming release even.


On 8 December 2016 at 16:39:58, Chesnay Schepler (chesnay@apache.org) wrote:
> It would be neat if we could support arrays as keys directly; it should
> boil down to checking the key type and in case of an array injecting a
> KeySelector that calls Arrays.hashCode(array).
> This worked for me when i ran into the same issue while experimenting
> with some stuff.
>  
> The batch API can use arrays as keys as well, so it's also a matter of
> consistency imo.
>  
> Regards,
> Chesnay
>  
> On 08.12.2016 16:23, Ufuk Celebi wrote:
> > @Aljoscha: I remember that someone else ran into this, too. Should we address arrays  
> as keys specifically in the API? Prohibit? Document this?
> >
> > – Ufuk
> >
> > On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com) wrote:
> >> Sure!
> >>
> >> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays  
> don’t
> >> appear to have a stable hashCode. I’ll provide the skeleton for fullness, though.)  
> >>
> >> val env = StreamExecutionEnvironment.getExecutionEnvironment
> >> env.setParallelism(Config.callAggregator.parallelism)
> >>
> >> env.addSource(kafkaSource)
> >> .flatMap(transformToRecords(_))
> >> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
> >> .map(new StatefulAggregator())
> >> .addSink(hbaseSink)
> >>
> >>
> >> Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!
> >>
> >> -a
> >>
> >>
> >>
> >>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
> >>>
> >>> Hi,
> >>>
> >>> could you maybe provide the (minimal) code for the problematic job? Also, are you  
> sure
> >> that the keyBy is working on the correct key attribute?
> >>> Best,
> >>> Stefan
> >>>
> >>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
> >>>>
> >>>> Hello,
> >>>>
> >>>> I’m trying to perform a stateful mapping of some objects coming in from Kafka in a  
> parallelized
> >> flink job (set on the job using env.setParallelism(3)). The data source is a kafka  
> topic,
> >> but the partitions aren’t meaningfully keyed for this operation (each kafka message  
> >> is flatMapped to between 0-2 objects, with potentially different keys). I have a keyBy()  
> >> operator directly before my map(), but I’m seeing objects with the same key distributed  
> >> to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().  
> >>>> My understanding of keyBy is that it would segment the stream by key, and guarantee  
> >> that all data with a given key would hit the same instance. Am I possibly seeing residual  
> >> “keying” from the kafka topic?
> >>>> I’m running flink 1.1.3 in scala. Please let me know if I can add more info.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Andrew
> >>
> >>
> >
>  
>  


Re: Parallelism and stateful mapping with Flink

Posted by Chesnay Schepler <ch...@apache.org>.
It would be neat if we could support arrays as keys directly; it should 
boil down to checking the key type and in case of an array injecting a 
KeySelector that calls Arrays.hashCode(array).
This worked for me when i ran into the same issue while experimenting 
with some stuff.

The batch API can use arrays as keys as well, so it's also a matter of 
consistency imo.

Regards,
Chesnay

On 08.12.2016 16:23, Ufuk Celebi wrote:
> @Aljoscha: I remember that someone else ran into this, too. Should we address arrays as keys specifically in the API? Prohibit? Document this?
>
> \u2013 Ufuk
>
> On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com) wrote:
>> Sure!
>>   
>> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays don\u2019t
>> appear to have a stable hashCode. I\u2019ll provide the skeleton for fullness, though.)
>>   
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setParallelism(Config.callAggregator.parallelism)
>>   
>> env.addSource(kafkaSource)
>> .flatMap(transformToRecords(_))
>> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
>> .map(new StatefulAggregator())
>> .addSink(hbaseSink)
>>   
>>   
>> Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!
>>   
>> -a
>>   
>>   
>>   
>>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
>>>
>>> Hi,
>>>
>>> could you maybe provide the (minimal) code for the problematic job? Also, are you sure
>> that the keyBy is working on the correct key attribute?
>>> Best,
>>> Stefan
>>>
>>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
>>>>
>>>> Hello,
>>>>
>>>> I\u2019m trying to perform a stateful mapping of some objects coming in from Kafka in a parallelized
>> flink job (set on the job using env.setParallelism(3)). The data source is a kafka topic,
>> but the partitions aren\u2019t meaningfully keyed for this operation (each kafka message
>> is flatMapped to between 0-2 objects, with potentially different keys). I have a keyBy()
>> operator directly before my map(), but I\u2019m seeing objects with the same key distributed
>> to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().
>>>> My understanding of keyBy is that it would segment the stream by key, and guarantee
>> that all data with a given key would hit the same instance. Am I possibly seeing residual
>> \u201ckeying\u201d from the kafka topic?
>>>> I\u2019m running flink 1.1.3 in scala. Please let me know if I can add more info.
>>>>
>>>> Thanks,
>>>>
>>>> Andrew
>>   
>>   
>


Re: Parallelism and stateful mapping with Flink

Posted by Ufuk Celebi <uc...@apache.org>.
@Aljoscha: I remember that someone else ran into this, too. Should we address arrays as keys specifically in the API? Prohibit? Document this?

– Ufuk

On 7 December 2016 at 17:41:40, Andrew Roberts (aroberts@fuze.com) wrote:
> Sure!
>  
> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays don’t  
> appear to have a stable hashCode. I’ll provide the skeleton for fullness, though.)
>  
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(Config.callAggregator.parallelism)
>  
> env.addSource(kafkaSource)
> .flatMap(transformToRecords(_))
> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
> .map(new StatefulAggregator())
> .addSink(hbaseSink)
>  
>  
> Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!
>  
> -a
>  
>  
>  
> > On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:  
> >
> > Hi,
> >
> > could you maybe provide the (minimal) code for the problematic job? Also, are you sure  
> that the keyBy is working on the correct key attribute?
> >
> > Best,
> > Stefan
> >
> >> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
> >>
> >> Hello,
> >>
> >> I’m trying to perform a stateful mapping of some objects coming in from Kafka in a parallelized  
> flink job (set on the job using env.setParallelism(3)). The data source is a kafka topic,  
> but the partitions aren’t meaningfully keyed for this operation (each kafka message  
> is flatMapped to between 0-2 objects, with potentially different keys). I have a keyBy()  
> operator directly before my map(), but I’m seeing objects with the same key distributed  
> to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().  
> >>
> >> My understanding of keyBy is that it would segment the stream by key, and guarantee  
> that all data with a given key would hit the same instance. Am I possibly seeing residual  
> “keying” from the kafka topic?
> >>
> >> I’m running flink 1.1.3 in scala. Please let me know if I can add more info.
> >>
> >> Thanks,
> >>
> >> Andrew
> >
>  
>  


Re: Parallelism and stateful mapping with Flink

Posted by Andrew Roberts <ar...@fuze.com>.
Sure!

(Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte arrays don’t appear to have a stable hashCode. I’ll provide the skeleton for fullness, though.)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(Config.callAggregator.parallelism)

env.addSource(kafkaSource)
  .flatMap(transformToRecords(_))
  .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
  .map(new StatefulAggregator())
  .addSink(hbaseSink)


Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!

-a



> On Dec 7, 2016, at 11:28 AM, Stefan Richter <s....@data-artisans.com> wrote:
> 
> Hi,
> 
> could you maybe provide the (minimal) code for the problematic job? Also, are you sure that the keyBy is working on the correct key attribute?
> 
> Best,
> Stefan
> 
>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts <ar...@fuze.com>:
>> 
>> Hello,
>> 
>> I’m trying to perform a stateful mapping of some objects coming in from Kafka in a parallelized flink job (set on the job using env.setParallelism(3)). The data source is a kafka topic, but the partitions aren’t meaningfully keyed for this operation (each kafka message is flatMapped to between 0-2 objects, with potentially different keys). I have a keyBy() operator directly before my map(), but I’m seeing objects with the same key distributed to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().
>> 
>> My understanding of keyBy is that it would segment the stream by key, and guarantee that all data with a given key would hit the same instance. Am I possibly seeing residual “keying” from the kafka topic?
>> 
>> I’m running flink 1.1.3 in scala. Please let me know if I can add more info.
>> 
>> Thanks,
>> 
>> Andrew
> 


Re: Parallelism and stateful mapping with Flink

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

could you maybe provide the (minimal) code for the problematic job? Also, are you sure that the keyBy is working on the correct key attribute?

Best,
Stefan

> Am 07.12.2016 um 15:57 schrieb Andrew Roberts <ar...@fuze.com>:
> 
> Hello,
> 
> I’m trying to perform a stateful mapping of some objects coming in from Kafka in a parallelized flink job (set on the job using env.setParallelism(3)). The data source is a kafka topic, but the partitions aren’t meaningfully keyed for this operation (each kafka message is flatMapped to between 0-2 objects, with potentially different keys). I have a keyBy() operator directly before my map(), but I’m seeing objects with the same key distributed to different parallel task instances, as reported by getRuntimeContext().getIndexOfThisSubtask().
> 
> My understanding of keyBy is that it would segment the stream by key, and guarantee that all data with a given key would hit the same instance. Am I possibly seeing residual “keying” from the kafka topic?
> 
> I’m running flink 1.1.3 in scala. Please let me know if I can add more info.
> 
> Thanks,
> 
> Andrew