You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Yordan Pavlov (Jira)" <ji...@apache.org> on 2023/04/04 13:08:00 UTC

[jira] [Created] (FLINK-31727) Source parallelism should match number of Kafka partitions

Yordan Pavlov created FLINK-31727:
-------------------------------------

             Summary: Source parallelism should match number of Kafka partitions
                 Key: FLINK-31727
                 URL: https://issues.apache.org/jira/browse/FLINK-31727
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.17.0
            Reporter: Yordan Pavlov
         Attachments: fill-topic.sh, main.scala

We seem to have hit a problem in how Flink fires windows, the problem presents itself on job recovery. To simplify the problem I am attaching a simple self sustained Flink job which illustrates the problem. What we have inside is, a KafkaSource consuming a topic with 3 partitions with job parallelism of 1. Right after the source data is consumed we have a TimeWindow, watermarks are constructed based on event data. For simplicity, data in the Kafka topic is just integers, which are also used as watermarks. The topic looks like so:
{quote}partition 0: 0, 3, 6, 9, 12 ...

partition 1: 1, 4, 7, 10, 13 ...

partition 2: 2, 5, 8, 11, 14 ...
{quote}
What we expect, and what is the case before a restart, is for Flink to wait for watermarks to progress on each of the 3 partitions and only then trigger windows. If this is met, we have windows triggered for each consecutive number.

The problem we observe happens on restart, then Flink would start fire windows even though it seems to be reading data only from some of the partitions.

Please find attached program, I am also attaching a simple Bash script used to generate the Kafka input data.

[^main.scala]

[^fill-topic.sh]

 

We do not see the problem if we set parallelism to 3 and the job have 3 slots per task manager. Going through the documentation I did not see such requirement though, this may be hard to enforce for all jobs.

Regards



--
This message was sent by Atlassian Jira
(v8.20.10#820010)