You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juho Autio <ju...@rovio.com> on 2018/03/22 12:02:35 UTC

Kafka Consumers Partition Discovery doesn't work

According to the docs*, flink.partition-discovery.interval-millis can be
set to enable automatic partition discovery.

I'm testing this, apparently it doesn't work.

I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
and FlinkKafkaConsumer010.

I had my flink stream running, consuming an existing topic with 3
partitions, among some other topics.
I modified partitions of an existing topic: 3 -> 4**.
I checked consumer offsets by secor: it's now consuming all 4 partitions.
I checked consumer offset by my flink stream: it's still consuming only the
3 original partitions.

I also checked the Task Metrics of this job from Flink UI and it only
offers Kafka related metrics to be added for 3 partitions (0,1 & 2).

According to Flink UI > Job Manager > Configuration:
flink.partition-discovery.interval-millis=60000
– so that's just 1 minute. It's already more than 20 minutes since I added
the new partition, so Flink should've picked it up.

How to debug?


Btw, this job has external checkpoints enabled, done once per minute. Those
are also succeeding.

*)
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

**)

~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe --topic
my_topic
Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:

~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
my_topic --partitions 4
Adding partitions succeeded!

Re: Kafka Consumers Partition Discovery doesn't work

Posted by Juho Autio <ju...@rovio.com>.
Thanks Gordon, here's the ticket:
https://issues.apache.org/jira/browse/FLINK-9334

If you'd like me to have a stab at it, feel free to assign the ticket to me.

On Thu, Apr 12, 2018 at 10:28 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Juno,
>
> Thanks for reporting back, glad to know that it's not an issue :)
>
> In general, connector specific configurations should always happen at the
> connector level, per-connector.
> The flink-conf.yaml file is usually for cluster wide configurations.
>
> And yes, it might be helpful to have a code snippet to demonstrate the
> configuration for partition discovery.
> Could you open a JIRA for that?
>
> Cheers,
> Gordon
>
> On Tue, Apr 10, 2018, 8:44 AM Juho Autio <ju...@rovio.com> wrote:
>
>> Ahhh looks like I had simply misunderstood where that property should go.
>>
>> The docs correctly say:
>> > To enable it, set a non-negative value for flink.partition-discovery.interval-millis
>> in the __provided properties config__
>>
>> So it should be set in the Properties that are passed in the constructor
>> of FlinkKafkaConsumer!
>>
>> I had somehow assumed that this should go to flink-conf.yaml (maybe
>> because it starts with "flink."?), and obviously the FlinkKafkaConsumer
>> doesn't read that.
>>
>> Sorry for the trouble. If anything, I guess a piece of example code
>> might've helped me avoid this mistake. The docs are clear though, I just
>> had become blind to this detail as I thought I had already read it.
>>
>> On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio <ju...@rovio.com> wrote:
>>
>>> Still not working after I had a fresh build from https://github.com/
>>> apache/flink/tree/release-1.5.
>>>
>>> When the job starts this is logged:
>>>
>>> 2018-04-05 09:29:38,157 INFO  org.apache.flink.configuration.GlobalConfiguration
>>>           - Loading configuration property: flink.partition-discovery.interval-millis,
>>> 60000
>>>
>>> So that's 1 minute.
>>>
>>> As before, I added one more partition to a topic that is being consumed.
>>> Secor started consuming it as expected, but Flink didn't – or at least it
>>> isn't reporting anything about doing so. The new partition is not shown in
>>> Flink task metrics or consumer offsets committed by Flink.
>>>
>>> How could I investigate this further? How about that additional logging
>>> for partition discovery?
>>>
>>> On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai <
>>> tzulitai@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> I think you’ve made a good point: there is currently no logs that tell
>>>> anything about discovering a new partition. We should probably add this.
>>>>
>>>> And yes, it would be great if you can report back on this using either
>>>> the latest master, release-1.5 or release-1.4 branches.
>>>>
>>>> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.autio@rovio.com)
>>>> wrote:
>>>>
>>>> Thanks, that sounds promising. I don't know how to check if it's
>>>> consuming all partitions? For example I couldn't find any logs about
>>>> discovering a new partition. However, did I understand correctly that this
>>>> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try
>>>> again.
>>>>
>>>> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <
>>>> tzulitai@apache.org> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> Can you confirm that the new partition is consumed, but only that
>>>>> Flink’s reported metrics do not include them?
>>>>> If yes, then I think your observations can be explained by this issue:
>>>>> https://issues.apache.org/jira/browse/FLINK-8419
>>>>>
>>>>> <https://issues.apache.org/jira/browse/FLINK-8419>
>>>>> This issue should have been fixed in the recently released 1.4.2
>>>>> version.
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.autio@rovio.com)
>>>>> wrote:
>>>>>
>>>>> According to the docs*, flink.partition-discovery.interval-millis can
>>>>> be set to enable automatic partition discovery.
>>>>>
>>>>> I'm testing this, apparently it doesn't work.
>>>>>
>>>>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
>>>>> and FlinkKafkaConsumer010.
>>>>>
>>>>> I had my flink stream running, consuming an existing topic with 3
>>>>> partitions, among some other topics.
>>>>> I modified partitions of an existing topic: 3 -> 4**.
>>>>> I checked consumer offsets by secor: it's now consuming all 4
>>>>> partitions.
>>>>> I checked consumer offset by my flink stream: it's still consuming
>>>>> only the 3 original partitions.
>>>>>
>>>>> I also checked the Task Metrics of this job from Flink UI and it only
>>>>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2).
>>>>>
>>>>> According to Flink UI > Job Manager > Configuration:
>>>>> flink.partition-discovery.interval-millis=60000
>>>>> – so that's just 1 minute. It's already more than 20 minutes since I
>>>>> added the new partition, so Flink should've picked it up.
>>>>>
>>>>> How to debug?
>>>>>
>>>>>
>>>>> Btw, this job has external checkpoints enabled, done once per minute.
>>>>> Those are also succeeding.
>>>>>
>>>>> *) https://ci.apache.org/projects/flink/flink-docs-
>>>>> master/dev/connectors/kafka.html#kafka-consumers-topic-
>>>>> and-partition-discovery
>>>>>
>>>>> **)
>>>>>
>>>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe
>>>>> --topic my_topic
>>>>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:
>>>>>
>>>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter
>>>>> --topic my_topic --partitions 4
>>>>> Adding partitions succeeded!
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

Re: Kafka Consumers Partition Discovery doesn't work

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Juno,

Thanks for reporting back, glad to know that it's not an issue :)

In general, connector specific configurations should always happen at the
connector level, per-connector.
The flink-conf.yaml file is usually for cluster wide configurations.

And yes, it might be helpful to have a code snippet to demonstrate the
configuration for partition discovery.
Could you open a JIRA for that?

Cheers,
Gordon

On Tue, Apr 10, 2018, 8:44 AM Juho Autio <ju...@rovio.com> wrote:

> Ahhh looks like I had simply misunderstood where that property should go.
>
> The docs correctly say:
> > To enable it, set a non-negative value for
> flink.partition-discovery.interval-millis in the __provided properties
> config__
>
> So it should be set in the Properties that are passed in the constructor
> of FlinkKafkaConsumer!
>
> I had somehow assumed that this should go to flink-conf.yaml (maybe
> because it starts with "flink."?), and obviously the FlinkKafkaConsumer
> doesn't read that.
>
> Sorry for the trouble. If anything, I guess a piece of example code
> might've helped me avoid this mistake. The docs are clear though, I just
> had become blind to this detail as I thought I had already read it.
>
> On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio <ju...@rovio.com> wrote:
>
>> Still not working after I had a fresh build from
>> https://github.com/apache/flink/tree/release-1.5.
>>
>> When the job starts this is logged:
>>
>> 2018-04-05 09:29:38,157 INFO
>> org.apache.flink.configuration.GlobalConfiguration            - Loading
>> configuration property: flink.partition-discovery.interval-millis, 60000
>>
>> So that's 1 minute.
>>
>> As before, I added one more partition to a topic that is being consumed.
>> Secor started consuming it as expected, but Flink didn't – or at least it
>> isn't reporting anything about doing so. The new partition is not shown in
>> Flink task metrics or consumer offsets committed by Flink.
>>
>> How could I investigate this further? How about that additional logging
>> for partition discovery?
>>
>> On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>> > wrote:
>>
>>> Hi,
>>>
>>> I think you’ve made a good point: there is currently no logs that tell
>>> anything about discovering a new partition. We should probably add this.
>>>
>>> And yes, it would be great if you can report back on this using either
>>> the latest master, release-1.5 or release-1.4 branches.
>>>
>>> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.autio@rovio.com)
>>> wrote:
>>>
>>> Thanks, that sounds promising. I don't know how to check if it's
>>> consuming all partitions? For example I couldn't find any logs about
>>> discovering a new partition. However, did I understand correctly that this
>>> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try
>>> again.
>>>
>>> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <
>>> tzulitai@apache.org> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> Can you confirm that the new partition is consumed, but only that
>>>> Flink’s reported metrics do not include them?
>>>> If yes, then I think your observations can be explained by this issue:
>>>> https://issues.apache.org/jira/browse/FLINK-8419
>>>>
>>>> <https://issues.apache.org/jira/browse/FLINK-8419>
>>>> This issue should have been fixed in the recently released 1.4.2
>>>> version.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.autio@rovio.com)
>>>> wrote:
>>>>
>>>> According to the docs*, flink.partition-discovery.interval-millis can
>>>> be set to enable automatic partition discovery.
>>>>
>>>> I'm testing this, apparently it doesn't work.
>>>>
>>>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
>>>> and FlinkKafkaConsumer010.
>>>>
>>>> I had my flink stream running, consuming an existing topic with 3
>>>> partitions, among some other topics.
>>>> I modified partitions of an existing topic: 3 -> 4**.
>>>> I checked consumer offsets by secor: it's now consuming all 4
>>>> partitions.
>>>> I checked consumer offset by my flink stream: it's still consuming only
>>>> the 3 original partitions.
>>>>
>>>> I also checked the Task Metrics of this job from Flink UI and it only
>>>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2).
>>>>
>>>> According to Flink UI > Job Manager > Configuration:
>>>> flink.partition-discovery.interval-millis=60000
>>>> – so that's just 1 minute. It's already more than 20 minutes since I
>>>> added the new partition, so Flink should've picked it up.
>>>>
>>>> How to debug?
>>>>
>>>>
>>>> Btw, this job has external checkpoints enabled, done once per minute.
>>>> Those are also succeeding.
>>>>
>>>> *)
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery
>>>>
>>>> **)
>>>>
>>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe
>>>> --topic my_topic
>>>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:
>>>>
>>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
>>>>  my_topic --partitions 4
>>>> Adding partitions succeeded!
>>>>
>>>>
>>>>
>>>
>>
>

Re: Kafka Consumers Partition Discovery doesn't work

Posted by Juho Autio <ju...@rovio.com>.
Ahhh looks like I had simply misunderstood where that property should go.

The docs correctly say:
> To enable it, set a non-negative value for
flink.partition-discovery.interval-millis in the __provided properties
config__

So it should be set in the Properties that are passed in the constructor of
FlinkKafkaConsumer!

I had somehow assumed that this should go to flink-conf.yaml (maybe because
it starts with "flink."?), and obviously the FlinkKafkaConsumer doesn't
read that.

Sorry for the trouble. If anything, I guess a piece of example code
might've helped me avoid this mistake. The docs are clear though, I just
had become blind to this detail as I thought I had already read it.

On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio <ju...@rovio.com> wrote:

> Still not working after I had a fresh build from https://github.com/
> apache/flink/tree/release-1.5.
>
> When the job starts this is logged:
>
> 2018-04-05 09:29:38,157 INFO  org.apache.flink.configuration.GlobalConfiguration
>           - Loading configuration property: flink.partition-discovery.interval-millis,
> 60000
>
> So that's 1 minute.
>
> As before, I added one more partition to a topic that is being consumed.
> Secor started consuming it as expected, but Flink didn't – or at least it
> isn't reporting anything about doing so. The new partition is not shown in
> Flink task metrics or consumer offsets committed by Flink.
>
> How could I investigate this further? How about that additional logging
> for partition discovery?
>
> On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi,
>>
>> I think you’ve made a good point: there is currently no logs that tell
>> anything about discovering a new partition. We should probably add this.
>>
>> And yes, it would be great if you can report back on this using either
>> the latest master, release-1.5 or release-1.4 branches.
>>
>> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.autio@rovio.com) wrote:
>>
>> Thanks, that sounds promising. I don't know how to check if it's
>> consuming all partitions? For example I couldn't find any logs about
>> discovering a new partition. However, did I understand correctly that this
>> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try
>> again.
>>
>> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>> > wrote:
>>
>>> Hi Juho,
>>>
>>> Can you confirm that the new partition is consumed, but only that
>>> Flink’s reported metrics do not include them?
>>> If yes, then I think your observations can be explained by this issue:
>>> https://issues.apache.org/jira/browse/FLINK-8419
>>>
>>> <https://issues.apache.org/jira/browse/FLINK-8419>
>>> This issue should have been fixed in the recently released 1.4.2 version.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.autio@rovio.com) wrote:
>>>
>>> According to the docs*, flink.partition-discovery.interval-millis can
>>> be set to enable automatic partition discovery.
>>>
>>> I'm testing this, apparently it doesn't work.
>>>
>>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
>>> and FlinkKafkaConsumer010.
>>>
>>> I had my flink stream running, consuming an existing topic with 3
>>> partitions, among some other topics.
>>> I modified partitions of an existing topic: 3 -> 4**.
>>> I checked consumer offsets by secor: it's now consuming all 4 partitions.
>>> I checked consumer offset by my flink stream: it's still consuming only
>>> the 3 original partitions.
>>>
>>> I also checked the Task Metrics of this job from Flink UI and it only
>>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2).
>>>
>>> According to Flink UI > Job Manager > Configuration:
>>> flink.partition-discovery.interval-millis=60000
>>> – so that's just 1 minute. It's already more than 20 minutes since I
>>> added the new partition, so Flink should've picked it up.
>>>
>>> How to debug?
>>>
>>>
>>> Btw, this job has external checkpoints enabled, done once per minute.
>>> Those are also succeeding.
>>>
>>> *) https://ci.apache.org/projects/flink/flink-docs-master/dev/c
>>> onnectors/kafka.html#kafka-consumers-topic-and-partition-discovery
>>>
>>> **)
>>>
>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe
>>> --topic my_topic
>>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:
>>>
>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
>>>  my_topic --partitions 4
>>> Adding partitions succeeded!
>>>
>>>
>>>
>>
>

Re: Kafka Consumers Partition Discovery doesn't work

Posted by Juho Autio <ju...@rovio.com>.
Still not working after I had a fresh build from
https://github.com/apache/flink/tree/release-1.5.

When the job starts this is logged:

2018-04-05 09:29:38,157 INFO
org.apache.flink.configuration.GlobalConfiguration
          - Loading configuration property:
flink.partition-discovery.interval-millis,
60000

So that's 1 minute.

As before, I added one more partition to a topic that is being consumed.
Secor started consuming it as expected, but Flink didn't – or at least it
isn't reporting anything about doing so. The new partition is not shown in
Flink task metrics or consumer offsets committed by Flink.

How could I investigate this further? How about that additional logging for
partition discovery?

On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> I think you’ve made a good point: there is currently no logs that tell
> anything about discovering a new partition. We should probably add this.
>
> And yes, it would be great if you can report back on this using either the
> latest master, release-1.5 or release-1.4 branches.
>
> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.autio@rovio.com) wrote:
>
> Thanks, that sounds promising. I don't know how to check if it's consuming
> all partitions? For example I couldn't find any logs about discovering a
> new partition. However, did I understand correctly that this is also fixed
> in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try again.
>
> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Juho,
>>
>> Can you confirm that the new partition is consumed, but only that Flink’s
>> reported metrics do not include them?
>> If yes, then I think your observations can be explained by this issue:
>> https://issues.apache.org/jira/browse/FLINK-8419
>>
>> <https://issues.apache.org/jira/browse/FLINK-8419>
>> This issue should have been fixed in the recently released 1.4.2 version.
>>
>> Cheers,
>> Gordon
>>
>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.autio@rovio.com) wrote:
>>
>> According to the docs*, flink.partition-discovery.interval-millis can be
>> set to enable automatic partition discovery.
>>
>> I'm testing this, apparently it doesn't work.
>>
>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
>> and FlinkKafkaConsumer010.
>>
>> I had my flink stream running, consuming an existing topic with 3
>> partitions, among some other topics.
>> I modified partitions of an existing topic: 3 -> 4**.
>> I checked consumer offsets by secor: it's now consuming all 4 partitions.
>> I checked consumer offset by my flink stream: it's still consuming only
>> the 3 original partitions.
>>
>> I also checked the Task Metrics of this job from Flink UI and it only
>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2).
>>
>> According to Flink UI > Job Manager > Configuration:
>> flink.partition-discovery.interval-millis=60000
>> – so that's just 1 minute. It's already more than 20 minutes since I
>> added the new partition, so Flink should've picked it up.
>>
>> How to debug?
>>
>>
>> Btw, this job has external checkpoints enabled, done once per minute.
>> Those are also succeeding.
>>
>> *) https://ci.apache.org/projects/flink/flink-docs-master/dev/c
>> onnectors/kafka.html#kafka-consumers-topic-and-partition-discovery
>>
>> **)
>>
>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe
>> --topic my_topic
>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:
>>
>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
>> my_topic --partitions 4
>> Adding partitions succeeded!
>>
>>
>>
>

Re: Kafka Consumers Partition Discovery doesn't work

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

I think you’ve made a good point: there is currently no logs that tell anything about discovering a new partition. We should probably add this.

And yes, it would be great if you can report back on this using either the latest master, release-1.5 or release-1.4 branches.

On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.autio@rovio.com) wrote:

Thanks, that sounds promising. I don't know how to check if it's consuming all partitions? For example I couldn't find any logs about discovering a new partition. However, did I understand correctly that this is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try again.

On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Juho,

Can you confirm that the new partition is consumed, but only that Flink’s reported metrics do not include them?
If yes, then I think your observations can be explained by this issue: https://issues.apache.org/jira/browse/FLINK-8419

This issue should have been fixed in the recently released 1.4.2 version.

Cheers,
Gordon

On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.autio@rovio.com) wrote:

According to the docs*, flink.partition-discovery.interval-millis can be set to enable automatic partition discovery.

I'm testing this, apparently it doesn't work.

I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508 and FlinkKafkaConsumer010.

I had my flink stream running, consuming an existing topic with 3 partitions, among some other topics.
I modified partitions of an existing topic: 3 -> 4**.
I checked consumer offsets by secor: it's now consuming all 4 partitions.
I checked consumer offset by my flink stream: it's still consuming only the 3 original partitions.

I also checked the Task Metrics of this job from Flink UI and it only offers Kafka related metrics to be added for 3 partitions (0,1 & 2).

According to Flink UI > Job Manager > Configuration:
flink.partition-discovery.interval-millis=60000
– so that's just 1 minute. It's already more than 20 minutes since I added the new partition, so Flink should've picked it up.

How to debug?


Btw, this job has external checkpoints enabled, done once per minute. Those are also succeeding.

*) https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

**)

~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe --topic my_topic
Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:

~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic my_topic --partitions 4
Adding partitions succeeded!




Re: Kafka Consumers Partition Discovery doesn't work

Posted by Juho Autio <ju...@rovio.com>.
Thanks, that sounds promising. I don't know how to check if it's consuming
all partitions? For example I couldn't find any logs about discovering a
new partition. However, did I understand correctly that this is also fixed
in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try again.

On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Juho,
>
> Can you confirm that the new partition is consumed, but only that Flink’s
> reported metrics do not include them?
> If yes, then I think your observations can be explained by this issue:
> https://issues.apache.org/jira/browse/FLINK-8419
>
> <https://issues.apache.org/jira/browse/FLINK-8419>
> This issue should have been fixed in the recently released 1.4.2 version.
>
> Cheers,
> Gordon
>
> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.autio@rovio.com) wrote:
>
> According to the docs*, flink.partition-discovery.interval-millis can be
> set to enable automatic partition discovery.
>
> I'm testing this, apparently it doesn't work.
>
> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
> and FlinkKafkaConsumer010.
>
> I had my flink stream running, consuming an existing topic with 3
> partitions, among some other topics.
> I modified partitions of an existing topic: 3 -> 4**.
> I checked consumer offsets by secor: it's now consuming all 4 partitions.
> I checked consumer offset by my flink stream: it's still consuming only
> the 3 original partitions.
>
> I also checked the Task Metrics of this job from Flink UI and it only
> offers Kafka related metrics to be added for 3 partitions (0,1 & 2).
>
> According to Flink UI > Job Manager > Configuration:
> flink.partition-discovery.interval-millis=60000
> – so that's just 1 minute. It's already more than 20 minutes since I added
> the new partition, so Flink should've picked it up.
>
> How to debug?
>
>
> Btw, this job has external checkpoints enabled, done once per minute.
> Those are also succeeding.
>
> *) https://ci.apache.org/projects/flink/flink-docs-
> master/dev/connectors/kafka.html#kafka-consumers-topic-
> and-partition-discovery
>
> **)
>
> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe
> --topic my_topic
> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:
>
> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
> my_topic --partitions 4
> Adding partitions succeeded!
>
>
>

Re: Kafka Consumers Partition Discovery doesn't work

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Juho,

Can you confirm that the new partition is consumed, but only that Flink’s reported metrics do not include them?
If yes, then I think your observations can be explained by this issue: https://issues.apache.org/jira/browse/FLINK-8419

This issue should have been fixed in the recently released 1.4.2 version.

Cheers,
Gordon

On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.autio@rovio.com) wrote:

According to the docs*, flink.partition-discovery.interval-millis can be set to enable automatic partition discovery.

I'm testing this, apparently it doesn't work.

I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508 and FlinkKafkaConsumer010.

I had my flink stream running, consuming an existing topic with 3 partitions, among some other topics.
I modified partitions of an existing topic: 3 -> 4**.
I checked consumer offsets by secor: it's now consuming all 4 partitions.
I checked consumer offset by my flink stream: it's still consuming only the 3 original partitions.

I also checked the Task Metrics of this job from Flink UI and it only offers Kafka related metrics to be added for 3 partitions (0,1 & 2).

According to Flink UI > Job Manager > Configuration:
flink.partition-discovery.interval-millis=60000
– so that's just 1 minute. It's already more than 20 minutes since I added the new partition, so Flink should've picked it up.

How to debug?


Btw, this job has external checkpoints enabled, done once per minute. Those are also succeeding.

*) https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

**)

~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe --topic my_topic
Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:

~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic my_topic --partitions 4
Adding partitions succeeded!