You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexander Filipchik <af...@gmail.com> on 2020/06/24 18:52:49 UTC

Dynamic partitioner for Flink based on incoming load

Hello!

We are working an a Flink Streaming job that reads data from multiple Kafka
topics and writes them to DFS. We are using StreamingFileSink with custom
implementation for GCS FS and it generates a lot of files as streams are
partitioned among multiple JMs. In the ideal case we should have at most 1
file per kafka topic per interval. We also have some heavy topics and some
pretty light ones, so the solution should also be smart to utilize
resources efficiently.

I was thinking we can partition based on how much data is ingested in the
last minute or so to make sure: messages from the same topic are routed to
the same (or minimal number of ) file if there are enough resources to do
so. Think bin packing.

Is it a good idea? Is there a built in way to achieve it? If not, is there
a way to push state into the partitioner (or even kafka client to
repartition in the source)? I was thinking that I can run a side stream
that will calculate data volumes and then broadcast it into the main
stream, so partitioner can make a decision, but it feels a bit complex.

Another way is to modify kafka client to track messages per topics and make
decision at that layer.

Am I on the right path?

Thank you

Re: Dynamic partitioner for Flink based on incoming load

Posted by Robert Metzger <rm...@apache.org>.
> This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too
much data?

Yes

> Is there a way to avoid shuffle at all (or do only 1) and avoid a
situation when 1 node will become a hotspot?

Do you know the amount of data per kafka topic beforehand, or does this
have to be dynamic?


On Thu, Jun 25, 2020 at 8:15 PM Alexander Filipchik <af...@gmail.com>
wrote:

> This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too
> much data? Is there a way to avoid shuffle at all (or do only 1) and avoid
> a situation when 1 node will become a hotspot?
>
> Alex
>
> On Thu, Jun 25, 2020 at 8:05 AM Kostas Kloudas <kk...@gmail.com> wrote:
>
>> Hi Alexander,
>>
>> Routing of input data in Flink can be done through keying and this can
>> guarantee collocation constraints. This means that you can send two
>> records to the same node by giving them the same key, e.g. the topic
>> name. Keep in mind that elements with different keys do not
>> necessarily go to different nodes, as key assignment to nodes is
>> random.
>>
>> Given this, you could initially key by topic, so that all records of a
>> topic go to the same node. This node will compute statistics about the
>> topic, e.g. elem/sec (t) and based on thresholds assign new keys to
>> each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t <
>> 2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2
>> will go to different machines but the probability of this happening
>> will increase with the parallelism of your job. Finally, based on your
>> bucket assigner and the rolling policy, you can redirect the elements
>> to the same bucket, e.g. TOPIC and tune how many part-files you will
>> have based on part-file size and/or time.
>>
>> Will this help you with your use-case?
>>
>> Cheers,
>> Kostas
>>
>>
>>
>>
>> On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik
>> <af...@gmail.com> wrote:
>> >
>> > Maybe I misreading the documentation, but:
>> > "Data within the partition directories are split into part files. Each
>> partition will contain at least one part file for each subtask of the sink
>> that has received data for that partition."
>> >
>> > So, it is 1 partition per subtask. I'm trying to figure out how to
>> dynamically adjust which subtask is getting the data to minimize the number
>> of subtasks writing into a specific partition.
>> >
>> > Alex
>> >
>> > On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <sj...@gmail.com>
>> wrote:
>> >>
>> >> You can achieve this in Flink 1.10 using the StreamingFileSink.
>> >>
>> >> I’d also like to note that Flink 1.11 (which is currently going
>> through release testing and should be available imminently) has support for
>> exactly this functionality in the table API.
>> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
>> >>
>> >>
>> >> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <
>> afilipchik@gmail.com> wrote:
>> >>>
>> >>> Hello!
>> >>>
>> >>> We are working an a Flink Streaming job that reads data from multiple
>> Kafka topics and writes them to DFS. We are using StreamingFileSink with
>> custom implementation for GCS FS and it generates a lot of files as streams
>> are partitioned among multiple JMs. In the ideal case we should have at
>> most 1 file per kafka topic per interval. We also have some heavy topics
>> and some pretty light ones, so the solution should also be smart to utilize
>> resources efficiently.
>> >>>
>> >>> I was thinking we can partition based on how much data is ingested in
>> the last minute or so to make sure: messages from the same topic are routed
>> to the same (or minimal number of ) file if there are enough resources to
>> do so. Think bin packing.
>> >>>
>> >>> Is it a good idea? Is there a built in way to achieve it? If not, is
>> there a way to push state into the partitioner (or even kafka client to
>> repartition in the source)? I was thinking that I can run a side stream
>> that will calculate data volumes and then broadcast it into the main
>> stream, so partitioner can make a decision, but it feels a bit complex.
>> >>>
>> >>> Another way is to modify kafka client to track messages per topics
>> and make decision at that layer.
>> >>>
>> >>> Am I on the right path?
>> >>>
>> >>> Thank you
>>
>

Re: Dynamic partitioner for Flink based on incoming load

Posted by Alexander Filipchik <af...@gmail.com>.
This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too
much data? Is there a way to avoid shuffle at all (or do only 1) and avoid
a situation when 1 node will become a hotspot?

Alex

On Thu, Jun 25, 2020 at 8:05 AM Kostas Kloudas <kk...@gmail.com> wrote:

> Hi Alexander,
>
> Routing of input data in Flink can be done through keying and this can
> guarantee collocation constraints. This means that you can send two
> records to the same node by giving them the same key, e.g. the topic
> name. Keep in mind that elements with different keys do not
> necessarily go to different nodes, as key assignment to nodes is
> random.
>
> Given this, you could initially key by topic, so that all records of a
> topic go to the same node. This node will compute statistics about the
> topic, e.g. elem/sec (t) and based on thresholds assign new keys to
> each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t <
> 2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2
> will go to different machines but the probability of this happening
> will increase with the parallelism of your job. Finally, based on your
> bucket assigner and the rolling policy, you can redirect the elements
> to the same bucket, e.g. TOPIC and tune how many part-files you will
> have based on part-file size and/or time.
>
> Will this help you with your use-case?
>
> Cheers,
> Kostas
>
>
>
>
> On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik
> <af...@gmail.com> wrote:
> >
> > Maybe I misreading the documentation, but:
> > "Data within the partition directories are split into part files. Each
> partition will contain at least one part file for each subtask of the sink
> that has received data for that partition."
> >
> > So, it is 1 partition per subtask. I'm trying to figure out how to
> dynamically adjust which subtask is getting the data to minimize the number
> of subtasks writing into a specific partition.
> >
> > Alex
> >
> > On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <sj...@gmail.com>
> wrote:
> >>
> >> You can achieve this in Flink 1.10 using the StreamingFileSink.
> >>
> >> I’d also like to note that Flink 1.11 (which is currently going through
> release testing and should be available imminently) has support for exactly
> this functionality in the table API.
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
> >>
> >>
> >> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <
> afilipchik@gmail.com> wrote:
> >>>
> >>> Hello!
> >>>
> >>> We are working an a Flink Streaming job that reads data from multiple
> Kafka topics and writes them to DFS. We are using StreamingFileSink with
> custom implementation for GCS FS and it generates a lot of files as streams
> are partitioned among multiple JMs. In the ideal case we should have at
> most 1 file per kafka topic per interval. We also have some heavy topics
> and some pretty light ones, so the solution should also be smart to utilize
> resources efficiently.
> >>>
> >>> I was thinking we can partition based on how much data is ingested in
> the last minute or so to make sure: messages from the same topic are routed
> to the same (or minimal number of ) file if there are enough resources to
> do so. Think bin packing.
> >>>
> >>> Is it a good idea? Is there a built in way to achieve it? If not, is
> there a way to push state into the partitioner (or even kafka client to
> repartition in the source)? I was thinking that I can run a side stream
> that will calculate data volumes and then broadcast it into the main
> stream, so partitioner can make a decision, but it feels a bit complex.
> >>>
> >>> Another way is to modify kafka client to track messages per topics and
> make decision at that layer.
> >>>
> >>> Am I on the right path?
> >>>
> >>> Thank you
>

Re: Dynamic partitioner for Flink based on incoming load

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Alexander,

Routing of input data in Flink can be done through keying and this can
guarantee collocation constraints. This means that you can send two
records to the same node by giving them the same key, e.g. the topic
name. Keep in mind that elements with different keys do not
necessarily go to different nodes, as key assignment to nodes is
random.

Given this, you could initially key by topic, so that all records of a
topic go to the same node. This node will compute statistics about the
topic, e.g. elem/sec (t) and based on thresholds assign new keys to
each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t <
2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2
will go to different machines but the probability of this happening
will increase with the parallelism of your job. Finally, based on your
bucket assigner and the rolling policy, you can redirect the elements
to the same bucket, e.g. TOPIC and tune how many part-files you will
have based on part-file size and/or time.

Will this help you with your use-case?

Cheers,
Kostas




On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik
<af...@gmail.com> wrote:
>
> Maybe I misreading the documentation, but:
> "Data within the partition directories are split into part files. Each partition will contain at least one part file for each subtask of the sink that has received data for that partition."
>
> So, it is 1 partition per subtask. I'm trying to figure out how to dynamically adjust which subtask is getting the data to minimize the number of subtasks writing into a specific partition.
>
> Alex
>
> On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <sj...@gmail.com> wrote:
>>
>> You can achieve this in Flink 1.10 using the StreamingFileSink.
>>
>> I’d also like to note that Flink 1.11 (which is currently going through release testing and should be available imminently) has support for exactly this functionality in the table API.
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
>>
>>
>> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <af...@gmail.com> wrote:
>>>
>>> Hello!
>>>
>>> We are working an a Flink Streaming job that reads data from multiple Kafka topics and writes them to DFS. We are using StreamingFileSink with custom implementation for GCS FS and it generates a lot of files as streams are partitioned among multiple JMs. In the ideal case we should have at most 1 file per kafka topic per interval. We also have some heavy topics and some pretty light ones, so the solution should also be smart to utilize resources efficiently.
>>>
>>> I was thinking we can partition based on how much data is ingested in the last minute or so to make sure: messages from the same topic are routed to the same (or minimal number of ) file if there are enough resources to do so. Think bin packing.
>>>
>>> Is it a good idea? Is there a built in way to achieve it? If not, is there a way to push state into the partitioner (or even kafka client to repartition in the source)? I was thinking that I can run a side stream that will calculate data volumes and then broadcast it into the main stream, so partitioner can make a decision, but it feels a bit complex.
>>>
>>> Another way is to modify kafka client to track messages per topics and make decision at that layer.
>>>
>>> Am I on the right path?
>>>
>>> Thank you

Re: Dynamic partitioner for Flink based on incoming load

Posted by Alexander Filipchik <af...@gmail.com>.
Maybe I misreading the documentation, but:
"Data within the partition directories are split into part files. Each
partition will contain at least one part file for each subtask of the sink
that has received data for that partition."

So, it is 1 partition per subtask. I'm trying to figure out how to
dynamically adjust which subtask is getting the data to minimize the number
of subtasks writing into a specific partition.

Alex

On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <sj...@gmail.com> wrote:

> You can achieve this in Flink 1.10 using the StreamingFileSink.
>
> I’d also like to note that Flink 1.11 (which is currently going through
> release testing and should be available imminently) has support for exactly
> this functionality in the table API.
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
>
>
> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <af...@gmail.com>
> wrote:
>
>> Hello!
>>
>> We are working an a Flink Streaming job that reads data from multiple
>> Kafka topics and writes them to DFS. We are using StreamingFileSink with
>> custom implementation for GCS FS and it generates a lot of files as streams
>> are partitioned among multiple JMs. In the ideal case we should have at
>> most 1 file per kafka topic per interval. We also have some heavy topics
>> and some pretty light ones, so the solution should also be smart to utilize
>> resources efficiently.
>>
>> I was thinking we can partition based on how much data is ingested in the
>> last minute or so to make sure: messages from the same topic are routed to
>> the same (or minimal number of ) file if there are enough resources to do
>> so. Think bin packing.
>>
>> Is it a good idea? Is there a built in way to achieve it? If not, is
>> there a way to push state into the partitioner (or even kafka client to
>> repartition in the source)? I was thinking that I can run a side stream
>> that will calculate data volumes and then broadcast it into the main
>> stream, so partitioner can make a decision, but it feels a bit complex.
>>
>> Another way is to modify kafka client to track messages per topics and
>> make decision at that layer.
>>
>> Am I on the right path?
>>
>> Thank you
>>
>

Re: Dynamic partitioner for Flink based on incoming load

Posted by Seth Wiesman <sj...@gmail.com>.
You can achieve this in Flink 1.10 using the StreamingFileSink.

I’d also like to note that Flink 1.11 (which is currently going through
release testing and should be available imminently) has support for exactly
this functionality in the table API.

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html


On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <af...@gmail.com>
wrote:

> Hello!
>
> We are working an a Flink Streaming job that reads data from multiple
> Kafka topics and writes them to DFS. We are using StreamingFileSink with
> custom implementation for GCS FS and it generates a lot of files as streams
> are partitioned among multiple JMs. In the ideal case we should have at
> most 1 file per kafka topic per interval. We also have some heavy topics
> and some pretty light ones, so the solution should also be smart to utilize
> resources efficiently.
>
> I was thinking we can partition based on how much data is ingested in the
> last minute or so to make sure: messages from the same topic are routed to
> the same (or minimal number of ) file if there are enough resources to do
> so. Think bin packing.
>
> Is it a good idea? Is there a built in way to achieve it? If not, is there
> a way to push state into the partitioner (or even kafka client to
> repartition in the source)? I was thinking that I can run a side stream
> that will calculate data volumes and then broadcast it into the main
> stream, so partitioner can make a decision, but it feels a bit complex.
>
> Another way is to modify kafka client to track messages per topics and
> make decision at that layer.
>
> Am I on the right path?
>
> Thank you
>