You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Metzger <rm...@apache.org> on 2020/07/03 11:32:40 UTC

Re: Dynamic partitioner for Flink based on incoming load

> This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too
much data?

Yes

> Is there a way to avoid shuffle at all (or do only 1) and avoid a
situation when 1 node will become a hotspot?

Do you know the amount of data per kafka topic beforehand, or does this
have to be dynamic?


On Thu, Jun 25, 2020 at 8:15 PM Alexander Filipchik <af...@gmail.com>
wrote:

> This will mean 2 shuffles, and 1 node might bottleneck if 1 topic has too
> much data? Is there a way to avoid shuffle at all (or do only 1) and avoid
> a situation when 1 node will become a hotspot?
>
> Alex
>
> On Thu, Jun 25, 2020 at 8:05 AM Kostas Kloudas <kk...@gmail.com> wrote:
>
>> Hi Alexander,
>>
>> Routing of input data in Flink can be done through keying and this can
>> guarantee collocation constraints. This means that you can send two
>> records to the same node by giving them the same key, e.g. the topic
>> name. Keep in mind that elements with different keys do not
>> necessarily go to different nodes, as key assignment to nodes is
>> random.
>>
>> Given this, you could initially key by topic, so that all records of a
>> topic go to the same node. This node will compute statistics about the
>> topic, e.g. elem/sec (t) and based on thresholds assign new keys to
>> each record, e.g. TOPIC-1 if t < 1000, TOPIC-2 if t >= 1000 && t <
>> 2000, etc and re-key. This will not guarantee that TOPIC-1 and TOPIC-2
>> will go to different machines but the probability of this happening
>> will increase with the parallelism of your job. Finally, based on your
>> bucket assigner and the rolling policy, you can redirect the elements
>> to the same bucket, e.g. TOPIC and tune how many part-files you will
>> have based on part-file size and/or time.
>>
>> Will this help you with your use-case?
>>
>> Cheers,
>> Kostas
>>
>>
>>
>>
>> On Thu, Jun 25, 2020 at 3:23 AM Alexander Filipchik
>> <af...@gmail.com> wrote:
>> >
>> > Maybe I misreading the documentation, but:
>> > "Data within the partition directories are split into part files. Each
>> partition will contain at least one part file for each subtask of the sink
>> that has received data for that partition."
>> >
>> > So, it is 1 partition per subtask. I'm trying to figure out how to
>> dynamically adjust which subtask is getting the data to minimize the number
>> of subtasks writing into a specific partition.
>> >
>> > Alex
>> >
>> > On Wed, Jun 24, 2020 at 3:55 PM Seth Wiesman <sj...@gmail.com>
>> wrote:
>> >>
>> >> You can achieve this in Flink 1.10 using the StreamingFileSink.
>> >>
>> >> I’d also like to note that Flink 1.11 (which is currently going
>> through release testing and should be available imminently) has support for
>> exactly this functionality in the table API.
>> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html
>> >>
>> >>
>> >> On Wed, Jun 24, 2020 at 1:53 PM Alexander Filipchik <
>> afilipchik@gmail.com> wrote:
>> >>>
>> >>> Hello!
>> >>>
>> >>> We are working an a Flink Streaming job that reads data from multiple
>> Kafka topics and writes them to DFS. We are using StreamingFileSink with
>> custom implementation for GCS FS and it generates a lot of files as streams
>> are partitioned among multiple JMs. In the ideal case we should have at
>> most 1 file per kafka topic per interval. We also have some heavy topics
>> and some pretty light ones, so the solution should also be smart to utilize
>> resources efficiently.
>> >>>
>> >>> I was thinking we can partition based on how much data is ingested in
>> the last minute or so to make sure: messages from the same topic are routed
>> to the same (or minimal number of ) file if there are enough resources to
>> do so. Think bin packing.
>> >>>
>> >>> Is it a good idea? Is there a built in way to achieve it? If not, is
>> there a way to push state into the partitioner (or even kafka client to
>> repartition in the source)? I was thinking that I can run a side stream
>> that will calculate data volumes and then broadcast it into the main
>> stream, so partitioner can make a decision, but it feels a bit complex.
>> >>>
>> >>> Another way is to modify kafka client to track messages per topics
>> and make decision at that layer.
>> >>>
>> >>> Am I on the right path?
>> >>>
>> >>> Thank you
>>
>