You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jessy Ping <te...@gmail.com> on 2022/01/14 17:39:45 UTC

Flink (DataStream) in Kubernetes

Hi Team,

We are planning to run the below pipeline as a standalone Flink application
cluster on kubernetes. It will be better if the community can share their
insights regarding the below questions.

[image: image.png]
We can describe the pipeline as follows,

   1. Combine the realtime streams from S1, enrichment data from S2 and S3
   using Union Operator. Partition the stream based on value1 for keeping the
   enrichment data locally available.
   2. Broadcast the rules to process the data from S4.
   3. Connect the above two streams(1&2) and process the real time events
   from S1 using the enrichment data from S2 and S3 stored in rocksDB state as
   per the rules stored in broadcast state inside the keyed broadcast process
   function.
   4. Produce the transformed results to a Kafka Sink.

Note: Kafka Source S1 has 32 partitions. Suppose we have 1 million distinct
keys and expect 10k events/s from S1.

Approach 1: Application cluster with 16 task managers. Each task manager
has 2 slots and 2 CPUs.
Approach 2: Application cluster with 2 task managers. Each task manager has
16 slots and 16 CPUs.

*Questions*

   - Which approach is suitable for a standalone deployment in Kubernetes?
   Do we have some best practises for running Flink applications on K8s ?
   - We are planning to connect the source S1, S2 and S3 using Union
   Operator. And these sources have different parallelism settings, equal to
   the available kafka partitions. And the downstream process function has the
   same parallelism as the real-time kafka source S1. Is it a good idea to
   apply union on streams with different parallelisms ?.
   - The size of the broadcast state is around 20mb, so the checkpoint size
   of the broadcast state will be 740mb ( maximum parallelism * size, 32* 20
   ). All events required the entire rules for processing the data, hence
   keeping this in rocksdb is not possible. Is it a good approach to keep a
   large state in broadcast-state?.
   - Is it a good practice to use a singleton pattern in Flink to create a
   local cache of the rules inside the open method of process function ?. If
   data losses due to restart i can repopulate the data using an external
   call. Can I keep these kinds of local caches(created inside open method)
   safely for the entire lifetime of a particular pod/task manager ?
   - Is there any relation between incremental checkpoints and maximum
   number of completed checkpoints (state.checkpoints.num-retained) ?
   - Will the entire state be checkpointed every time irrespective of the
   delta between the checkpoints if I have enabled incremental checkpoints for
   my rocksdb state backend and set the maximum number of completed
   checkpoints to 1 ?

Thanks
Jessy

Re: Flink (DataStream) in Kubernetes

Posted by Robert Metzger <me...@gmail.com>.
Hi Jessy,

Which approach is suitable for a standalone deployment in Kubernetes? Do we
> have some best practises for running Flink applications on K8s ?


I would deploy Flink in Application Mode using the standalone K8s
deployment:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/

We are planning to connect the source S1, S2 and S3 using Union Operator.
> And these sources have different parallelism settings, equal to the
> available kafka partitions. And the downstream process function has the
> same parallelism as the real-time kafka source S1. Is it a good idea to
> apply union on streams with different parallelisms ?.


I recommend just starting with a global parallelism, which is the same for
all operators. Only if you see performance issues, you can start
incrementally tweaking the parallelism of individual operators.
It is not a problem to have different parallelisms for the union operator.

All events required the entire rules for processing the data, hence keeping
> this in rocksdb is not possible. Is it a good approach to keep a large
> state in broadcast-state?.


I would not consider 740mb large state. That easily fits into memory.

Is there any relation between incremental checkpoints and maximum number of
> completed checkpoints (state.checkpoints.num-retained) ?


I don't think so.

Will the entire state be checkpointed every time irrespective of the delta
> between the checkpoints if I have enabled incremental checkpoints for my
> rocksdb state backend and set the maximum number of completed checkpoints
> to 1 ?


No, Flink will create incremental checkpoints.

On Tue, Jan 18, 2022 at 2:41 PM Jessy Ping <te...@gmail.com>
wrote:

> Hi Team,
> Any insights for below mail will be helpful.
>
> Thanks
> Jessy
>
> On Fri, Jan 14, 2022, 11:09 PM Jessy Ping <te...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> We are planning to run the below pipeline as a standalone Flink
>> application cluster on kubernetes. It will be better if the community can
>> share their insights regarding the below questions.
>>
>> [image: image.png]
>> We can describe the pipeline as follows,
>>
>>    1. Combine the realtime streams from S1, enrichment data from S2 and
>>    S3 using Union Operator. Partition the stream based on value1 for keeping
>>    the enrichment data locally available.
>>    2. Broadcast the rules to process the data from S4.
>>    3. Connect the above two streams(1&2) and process the real time
>>    events from S1 using the enrichment data from S2 and S3 stored in rocksDB
>>    state as per the rules stored in broadcast state inside the keyed broadcast
>>    process function.
>>    4. Produce the transformed results to a Kafka Sink.
>>
>> Note: Kafka Source S1 has 32 partitions. Suppose we have 1 million
>> distinct keys and expect 10k events/s from S1.
>>
>> Approach 1: Application cluster with 16 task managers. Each task manager
>> has 2 slots and 2 CPUs.
>> Approach 2: Application cluster with 2 task managers. Each task manager
>> has 16 slots and 16 CPUs.
>>
>> *Questions*
>>
>>    - Which approach is suitable for a standalone deployment in
>>    Kubernetes? Do we have some best practises for running Flink applications
>>    on K8s ?
>>    - We are planning to connect the source S1, S2 and S3 using Union
>>    Operator. And these sources have different parallelism settings, equal to
>>    the available kafka partitions. And the downstream process function has the
>>    same parallelism as the real-time kafka source S1. Is it a good idea to
>>    apply union on streams with different parallelisms ?.
>>    - The size of the broadcast state is around 20mb, so the checkpoint
>>    size of the broadcast state will be 740mb ( maximum parallelism * size, 32*
>>    20 ). All events required the entire rules for processing the data, hence
>>    keeping this in rocksdb is not possible. Is it a good approach to keep a
>>    large state in broadcast-state?.
>>    - Is it a good practice to use a singleton pattern in Flink to create
>>    a local cache of the rules inside the open method of process function ?. If
>>    data losses due to restart i can repopulate the data using an external
>>    call. Can I keep these kinds of local caches(created inside open method)
>>    safely for the entire lifetime of a particular pod/task manager ?
>>    - Is there any relation between incremental checkpoints and maximum
>>    number of completed checkpoints (state.checkpoints.num-retained) ?
>>    - Will the entire state be checkpointed every time irrespective of
>>    the delta between the checkpoints if I have enabled incremental checkpoints
>>    for my rocksdb state backend and set the maximum number of completed
>>    checkpoints to 1 ?
>>
>> Thanks
>> Jessy
>>
>>

Re: Flink (DataStream) in Kubernetes

Posted by Jessy Ping <te...@gmail.com>.
Hi Team,
Any insights for below mail will be helpful.

Thanks
Jessy

On Fri, Jan 14, 2022, 11:09 PM Jessy Ping <te...@gmail.com>
wrote:

> Hi Team,
>
> We are planning to run the below pipeline as a standalone Flink
> application cluster on kubernetes. It will be better if the community can
> share their insights regarding the below questions.
>
> [image: image.png]
> We can describe the pipeline as follows,
>
>    1. Combine the realtime streams from S1, enrichment data from S2 and
>    S3 using Union Operator. Partition the stream based on value1 for keeping
>    the enrichment data locally available.
>    2. Broadcast the rules to process the data from S4.
>    3. Connect the above two streams(1&2) and process the real time events
>    from S1 using the enrichment data from S2 and S3 stored in rocksDB state as
>    per the rules stored in broadcast state inside the keyed broadcast process
>    function.
>    4. Produce the transformed results to a Kafka Sink.
>
> Note: Kafka Source S1 has 32 partitions. Suppose we have 1 million
> distinct keys and expect 10k events/s from S1.
>
> Approach 1: Application cluster with 16 task managers. Each task manager
> has 2 slots and 2 CPUs.
> Approach 2: Application cluster with 2 task managers. Each task manager
> has 16 slots and 16 CPUs.
>
> *Questions*
>
>    - Which approach is suitable for a standalone deployment in
>    Kubernetes? Do we have some best practises for running Flink applications
>    on K8s ?
>    - We are planning to connect the source S1, S2 and S3 using Union
>    Operator. And these sources have different parallelism settings, equal to
>    the available kafka partitions. And the downstream process function has the
>    same parallelism as the real-time kafka source S1. Is it a good idea to
>    apply union on streams with different parallelisms ?.
>    - The size of the broadcast state is around 20mb, so the checkpoint
>    size of the broadcast state will be 740mb ( maximum parallelism * size, 32*
>    20 ). All events required the entire rules for processing the data, hence
>    keeping this in rocksdb is not possible. Is it a good approach to keep a
>    large state in broadcast-state?.
>    - Is it a good practice to use a singleton pattern in Flink to create
>    a local cache of the rules inside the open method of process function ?. If
>    data losses due to restart i can repopulate the data using an external
>    call. Can I keep these kinds of local caches(created inside open method)
>    safely for the entire lifetime of a particular pod/task manager ?
>    - Is there any relation between incremental checkpoints and maximum
>    number of completed checkpoints (state.checkpoints.num-retained) ?
>    - Will the entire state be checkpointed every time irrespective of the
>    delta between the checkpoints if I have enabled incremental checkpoints for
>    my rocksdb state backend and set the maximum number of completed
>    checkpoints to 1 ?
>
> Thanks
> Jessy
>
>