You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timo Walther <tw...@apache.org> on 2018/05/15 12:36:18 UTC

Re: Flink does not read from some Kafka Partitions

Hi Ruby,

which Flink version are you using? When looking into the code of the 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase you 
can see that the behavior for using partition discovery or not depends 
on the Flink version.

Regards,
Timo


Am 15.05.18 um 02:01 schrieb Ruby Andrews:
> Hello,
>
> My team ran into some behavior we did not expect when we tried to get 
> an existing Flink app to read from a re-sized Kafka. Here are the 
> highlights:
> - We are using the FlinkKafkaConsumer010.
> - We re-partitioned (added partitions to) an existing topic that our 
> Flink app reads so that it the topic has 8 partitions. Following that, 
> we re-deployed our task managers. We thought that the task managers 
> would start reading new partitions.
> - 8 task managers read from the topic, but they did NOT read all of 
> the partitions. 3 of the partitions had 2 task managers reading from 
> them and 3 of the partitions had 0 task managers reading from them. My 
> team had expected that Flink would automatically read from all 
> partitions, 1 task manager per partition.
> - To force the app to read from all partitions, we added this property 
> to our kafka consumer properties: 
> *flink.partition-discovery.interval-millis* and re-deployed the task 
> managers. We expected this flag to cause Flink to discover (and start 
> reading) all partitions.
> - We did not see a change in the Kafka readers — there were still 3 
> topics not being read.
> - Finally, we changed the ID of the Flink operator that  reads the 
> Kafka topic and re-deployed the task managers again.
> - After changing the ID, the app started reading from all partitions.
>
> What is the correct way to pick up partitions after re-partitioning a 
> Kafka topic?
>
> Thanks,
> Ruby



Re: Flink does not read from some Kafka Partitions

Posted by Ruby Andrews <ra...@newrelic.com>.
Thank you both for your responses. Looks like I may have inadvertently used
1.3.1 libraries instead of 1.4.

Ruby

On Wed, May 16, 2018 at 3:12 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> Timo is correct - partition discovery is supported by the consumer only
> starting from Flink 1.4.
>
> The expected behaviour without partition discovery on, is that the list of
> partitions picked up on the first execution of the job will be the list of
> subscribed partition across all executions.
> When restoring from a savepoint / checkpoint, discovery for new partitions
> will not occur.
> The reason why new partitions are discovered after you changed the UID of
> the consumer operator to a new one, is because the consumer is considered a
> completely new operator without any restored state.
>
> Since Flink 1.4, you can choose to enable partition discovery by setting
> flink.partition-discovery.interval-millis.
> This can be turned on / off at the start of any execution attempt.
> For example, you can have it off initially, take a savepoint, and when
> restoring change that configuration to enable discovery.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Flink does not read from some Kafka Partitions

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Timo is correct - partition discovery is supported by the consumer only
starting from Flink 1.4.

The expected behaviour without partition discovery on, is that the list of
partitions picked up on the first execution of the job will be the list of
subscribed partition across all executions.
When restoring from a savepoint / checkpoint, discovery for new partitions
will not occur.
The reason why new partitions are discovered after you changed the UID of
the consumer operator to a new one, is because the consumer is considered a
completely new operator without any restored state.

Since Flink 1.4, you can choose to enable partition discovery by setting
flink.partition-discovery.interval-millis.
This can be turned on / off at the start of any execution attempt.
For example, you can have it off initially, take a savepoint, and when
restoring change that configuration to enable discovery.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/