You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jin Yi <el...@gmail.com> on 2020/01/16 06:48:10 UTC

Filter with large key set

Hi there,

I have the following usecase:
a key set say [A,B,C,....] with around 10M entries, the type of the entries
can be one of the types in BasicTypeInfo, e.g. String, Long, Integer etc...

and each message looks like below:
message: {
   header: A
   body: {}
}

I would like to use Flink to filter each message' header field, to see if
the value present in the key set.

*The key set needs to be dynamic, meaning at any time, we can perform
add/read/delete operations on the key set. *

Any suggestions are very welcome!

Thanks a lot!
Eleanore

Re: Filter with large key set

Posted by Jin Yi <el...@gmail.com>.
Hi Fabian,

Thanks for the suggestion and sorry for the late reply, as I was trying
out.

The Broadcast Stream is the one I am looking for! And I should make myself
clear that the elements in the key set is one type only, e.g. they could be
all string, all int, etc, not some are string and some are int.

Thanks a lot!

Eleanore

On Fri, Jan 17, 2020 at 6:43 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Eleanore,
>
> A dynamic filter like the one you need, is essentially a join operation.
> There is two ways to do this:
>
> * partitioning the key set and the message on the attribute. This would be
> done with a KeyedCoProcessFunction.
> * broadcasting the key set and just locally forwarding the messages. This
> would be done with a KeyedBroadcastProcessFunction.
>
> The challenge in your application is that the key set entries have
> different types which is something that Flink does not very well support.
> There is two ways to go about this:
>
> 1) route all data through the same operators that can handle all types.
> You can model this with an n-ary Either type. Flink only has a binary
> Either type, so you would need to implement the TypeInformation,
> serializer, and comparator yourself. The Either classes should give you
> good guidance for that.
> 2) have different operators and flows for each basic data type. This will
> fan out your job, but should be the easier approach.
>
> Best, Fabian
>
>
>
> Am Do., 16. Jan. 2020 um 07:48 Uhr schrieb Jin Yi <eleanore.jin@gmail.com
> >:
>
>> Hi there,
>>
>> I have the following usecase:
>> a key set say [A,B,C,....] with around 10M entries, the type of the
>> entries can be one of the types in BasicTypeInfo, e.g. String, Long,
>> Integer etc...
>>
>> and each message looks like below:
>> message: {
>>    header: A
>>    body: {}
>> }
>>
>> I would like to use Flink to filter each message' header field, to see if
>> the value present in the key set.
>>
>> *The key set needs to be dynamic, meaning at any time, we can perform
>> add/read/delete operations on the key set. *
>>
>> Any suggestions are very welcome!
>>
>> Thanks a lot!
>> Eleanore
>>
>

Re: Filter with large key set

Posted by Jin Yi <el...@gmail.com>.
Hi Fabian,

Thanks for the suggestion and sorry for the late reply, as I was trying
out.

The Broadcast Stream is the one I am looking for! And I should make myself
clear that the elements in the key set is one type only, e.g. they could be
all string, all int, etc, not some are string and some are int.

Thanks a lot!

Eleanore

On Fri, Jan 17, 2020 at 6:43 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Eleanore,
>
> A dynamic filter like the one you need, is essentially a join operation.
> There is two ways to do this:
>
> * partitioning the key set and the message on the attribute. This would be
> done with a KeyedCoProcessFunction.
> * broadcasting the key set and just locally forwarding the messages. This
> would be done with a KeyedBroadcastProcessFunction.
>
> The challenge in your application is that the key set entries have
> different types which is something that Flink does not very well support.
> There is two ways to go about this:
>
> 1) route all data through the same operators that can handle all types.
> You can model this with an n-ary Either type. Flink only has a binary
> Either type, so you would need to implement the TypeInformation,
> serializer, and comparator yourself. The Either classes should give you
> good guidance for that.
> 2) have different operators and flows for each basic data type. This will
> fan out your job, but should be the easier approach.
>
> Best, Fabian
>
>
>
> Am Do., 16. Jan. 2020 um 07:48 Uhr schrieb Jin Yi <eleanore.jin@gmail.com
> >:
>
>> Hi there,
>>
>> I have the following usecase:
>> a key set say [A,B,C,....] with around 10M entries, the type of the
>> entries can be one of the types in BasicTypeInfo, e.g. String, Long,
>> Integer etc...
>>
>> and each message looks like below:
>> message: {
>>    header: A
>>    body: {}
>> }
>>
>> I would like to use Flink to filter each message' header field, to see if
>> the value present in the key set.
>>
>> *The key set needs to be dynamic, meaning at any time, we can perform
>> add/read/delete operations on the key set. *
>>
>> Any suggestions are very welcome!
>>
>> Thanks a lot!
>> Eleanore
>>
>

Re: Filter with large key set

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Eleanore,

A dynamic filter like the one you need, is essentially a join operation.
There is two ways to do this:

* partitioning the key set and the message on the attribute. This would be
done with a KeyedCoProcessFunction.
* broadcasting the key set and just locally forwarding the messages. This
would be done with a KeyedBroadcastProcessFunction.

The challenge in your application is that the key set entries have
different types which is something that Flink does not very well support.
There is two ways to go about this:

1) route all data through the same operators that can handle all types. You
can model this with an n-ary Either type. Flink only has a binary Either
type, so you would need to implement the TypeInformation, serializer, and
comparator yourself. The Either classes should give you good guidance for
that.
2) have different operators and flows for each basic data type. This will
fan out your job, but should be the easier approach.

Best, Fabian



Am Do., 16. Jan. 2020 um 07:48 Uhr schrieb Jin Yi <el...@gmail.com>:

> Hi there,
>
> I have the following usecase:
> a key set say [A,B,C,....] with around 10M entries, the type of the
> entries can be one of the types in BasicTypeInfo, e.g. String, Long,
> Integer etc...
>
> and each message looks like below:
> message: {
>    header: A
>    body: {}
> }
>
> I would like to use Flink to filter each message' header field, to see if
> the value present in the key set.
>
> *The key set needs to be dynamic, meaning at any time, we can perform
> add/read/delete operations on the key set. *
>
> Any suggestions are very welcome!
>
> Thanks a lot!
> Eleanore
>

Re: Filter with large key set

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Eleanore,

A dynamic filter like the one you need, is essentially a join operation.
There is two ways to do this:

* partitioning the key set and the message on the attribute. This would be
done with a KeyedCoProcessFunction.
* broadcasting the key set and just locally forwarding the messages. This
would be done with a KeyedBroadcastProcessFunction.

The challenge in your application is that the key set entries have
different types which is something that Flink does not very well support.
There is two ways to go about this:

1) route all data through the same operators that can handle all types. You
can model this with an n-ary Either type. Flink only has a binary Either
type, so you would need to implement the TypeInformation, serializer, and
comparator yourself. The Either classes should give you good guidance for
that.
2) have different operators and flows for each basic data type. This will
fan out your job, but should be the easier approach.

Best, Fabian



Am Do., 16. Jan. 2020 um 07:48 Uhr schrieb Jin Yi <el...@gmail.com>:

> Hi there,
>
> I have the following usecase:
> a key set say [A,B,C,....] with around 10M entries, the type of the
> entries can be one of the types in BasicTypeInfo, e.g. String, Long,
> Integer etc...
>
> and each message looks like below:
> message: {
>    header: A
>    body: {}
> }
>
> I would like to use Flink to filter each message' header field, to see if
> the value present in the key set.
>
> *The key set needs to be dynamic, meaning at any time, we can perform
> add/read/delete operations on the key set. *
>
> Any suggestions are very welcome!
>
> Thanks a lot!
> Eleanore
>