You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sidney Feiner <si...@startapp.com> on 2020/03/15 15:27:46 UTC

KafkaConsumer keeps getting InstanceAlreadyExistsException

Hey,
I've been using Flink for a while now without any problems when running apps with a FlinkKafkaConsumer.
All my apps have the same overall logic (consume from kafka -> transform event -> write to file) and the only way they differ from each other is the topic they read (remaining kafka config remains identical) and the way they transform the event.
But suddenly, I've been starting to get the following error:


2020-03-15 12:13:56,911 WARN  org.apache.kafka.common.utils.AppInfoParser                   - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1
       at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
       at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
       at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
       at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
       at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
       at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
       at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
       at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
       at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
       at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
       at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
       at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
       at java.lang.Thread.run(Thread.java:748)


I've tried setting the "client.id" on my consumer to a random UUID, making sure I don't have any duplicates but that didn't help either.
Any idea what could be causing this?

Thanks 🙂

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]


Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

Posted by Becket Qin <be...@gmail.com>.
Hi Rong,

The issue here is that the PartitionDiscoverer has an internal
KafkaConsumer which reuses the client.id set by the users for the actual
fetching KafkaConsumer. Different KafkaConsumers distinguish their metrics
by client.id, therefore if there are two KafkaConsumers in the same JVM
with the same client.id, their metrics will collide with each other. This
is why the exception was reported.

Thanks,

Jiangjie (Becket) Qin

On Thu, Mar 19, 2020 at 12:04 AM Rong Rong <wa...@gmail.com> wrote:

> Hi Becket/Till,
>
> Thanks for the detail explanation. Just to confirm:
> the issue in FLINK-8093 refers to multiple Kafka consumer within the same
> TM - thus the fix should be to make consumer client.id unique for
> different tasks ?
> and the issue here is an issue internal to the Kafka consumer, where both
> the polling consumer thread and the MBean JMX reporter thread share the
> same client.id - thus we should fix this in the Kafka level?
>
> If this is the correct understanding, I think we should separate them
> since they are in fact 2 different issues.
>
> --
> Rong
>
> On Tue, Mar 17, 2020 at 3:36 AM Becket Qin <be...@gmail.com> wrote:
>
>> Actually it might be better to create another ticket, FLINK-8093 was
>> mainly complaining about the JMX bean collision when there are multiple
>> tasks running in the same TM.
>>
>> Jiangjie (Becket) Qin
>>
>> On Tue, Mar 17, 2020 at 6:33 PM Becket Qin <be...@gmail.com> wrote:
>>
>>> Hi Till,
>>>
>>> It looks FLINK-8093 <https://issues.apache.org/jira/browse/FLINK-8093> reports
>>> the same issue, although the reported information is not exactly correct,
>>> as this should not cause the producer to fail. I'll take care of the ticket.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> @Becket do we already have a JIRA ticket to track this effort?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <be...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Sidney,
>>>>>
>>>>> The WARN logging you saw was from the AbstractPartitionDiscoverer
>>>>> which is created by FlinkKafkaConsumer itself. It has an internal consumer
>>>>> which shares the client.id of the actual consumer fetching data. This
>>>>> is a bug that we should fix.
>>>>>
>>>>> As Rong said, this won't affect the normal operation of the consumer.
>>>>> It is just an AppInfo MBean for reporting some information. There might be
>>>>> some slight impact on the accuracy of the consumer metrics, but should be
>>>>> almost ignorable because the partition discoverer is quite inactive
>>>>> compared with the actual consumer.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <wa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> We also had seen this issue before running Flink apps in a shared
>>>>>> cluster environment.
>>>>>>
>>>>>> Basically, Kafka is trying to register a JMX MBean[1] for application
>>>>>> monitoring.
>>>>>> This is only a WARN suggesting that you are registering more than one
>>>>>> MBean with the same client id "consumer-1", it should not affect your
>>>>>> normal application behavior.
>>>>>>
>>>>>> This is most likely occurring if you have more than one Kafka
>>>>>> consumer within the same JVM, are you using a session cluster[2]? can you
>>>>>> share more on your application configuration including parallelism and slot
>>>>>> configs?
>>>>>> Also based on the log, you are not configuring the "client.id"
>>>>>> correctly. which config key are you using? could you also share your fill
>>>>>> Kafka properties map?
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Rong
>>>>>>
>>>>>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>>>>>
>>>>>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <
>>>>>> sidney.feiner@startapp.com> wrote:
>>>>>>
>>>>>>> Hey,
>>>>>>> I've been using Flink for a while now without any problems when
>>>>>>> running apps with a FlinkKafkaConsumer.
>>>>>>> All my apps have the same overall logic (consume from kafka ->
>>>>>>> transform event -> write to file) and the only way they differ from each
>>>>>>> other is the topic they read (remaining kafka config remains identical) and
>>>>>>> the way they transform the event.
>>>>>>> But suddenly, I've been starting to get the following error:
>>>>>>>
>>>>>>>
>>>>>>> 2020-03-15 12:13:56,911 WARN
>>>>>>>  org.apache.kafka.common.utils.AppInfoParser                   - Error
>>>>>>> registering AppInfo mbean
>>>>>>> javax.management.InstanceAlreadyExistsException:
>>>>>>> kafka.consumer:type=app-info,id=consumer-1
>>>>>>>        at
>>>>>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>>>>>        at
>>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>>>>>
>>>>>>>        at
>>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>>>>>>
>>>>>>>        at
>>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>>>>>>
>>>>>>>        at
>>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>>>>>>
>>>>>>>        at
>>>>>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>>>>>>>
>>>>>>>        at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>>>>        at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>>>>        at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>>
>>>>>>> I've tried setting the "client.id" on my consumer to a random UUID,
>>>>>>> making sure I don't have any duplicates but that didn't help either.
>>>>>>> Any idea what could be causing this?
>>>>>>>
>>>>>>> Thanks 🙂
>>>>>>>
>>>>>>> *Sidney Feiner* */* Data Platform Developer
>>>>>>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>>>>>>
>>>>>>> [image: emailsignature]
>>>>>>>
>>>>>>>

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

Posted by Rong Rong <wa...@gmail.com>.
Hi Becket/Till,

Thanks for the detail explanation. Just to confirm:
the issue in FLINK-8093 refers to multiple Kafka consumer within the same
TM - thus the fix should be to make consumer client.id unique for different
tasks ?
and the issue here is an issue internal to the Kafka consumer, where both
the polling consumer thread and the MBean JMX reporter thread share the
same client.id - thus we should fix this in the Kafka level?

If this is the correct understanding, I think we should separate them since
they are in fact 2 different issues.

--
Rong

On Tue, Mar 17, 2020 at 3:36 AM Becket Qin <be...@gmail.com> wrote:

> Actually it might be better to create another ticket, FLINK-8093 was
> mainly complaining about the JMX bean collision when there are multiple
> tasks running in the same TM.
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 17, 2020 at 6:33 PM Becket Qin <be...@gmail.com> wrote:
>
>> Hi Till,
>>
>> It looks FLINK-8093 <https://issues.apache.org/jira/browse/FLINK-8093> reports
>> the same issue, although the reported information is not exactly correct,
>> as this should not cause the producer to fail. I'll take care of the ticket.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> @Becket do we already have a JIRA ticket to track this effort?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <be...@gmail.com> wrote:
>>>
>>>> Hi Sidney,
>>>>
>>>> The WARN logging you saw was from the AbstractPartitionDiscoverer which
>>>> is created by FlinkKafkaConsumer itself. It has an internal consumer which
>>>> shares the client.id of the actual consumer fetching data. This is a
>>>> bug that we should fix.
>>>>
>>>> As Rong said, this won't affect the normal operation of the consumer.
>>>> It is just an AppInfo MBean for reporting some information. There might be
>>>> some slight impact on the accuracy of the consumer metrics, but should be
>>>> almost ignorable because the partition discoverer is quite inactive
>>>> compared with the actual consumer.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <wa...@gmail.com> wrote:
>>>>
>>>>> We also had seen this issue before running Flink apps in a shared
>>>>> cluster environment.
>>>>>
>>>>> Basically, Kafka is trying to register a JMX MBean[1] for application
>>>>> monitoring.
>>>>> This is only a WARN suggesting that you are registering more than one
>>>>> MBean with the same client id "consumer-1", it should not affect your
>>>>> normal application behavior.
>>>>>
>>>>> This is most likely occurring if you have more than one Kafka consumer
>>>>> within the same JVM, are you using a session cluster[2]? can you share more
>>>>> on your application configuration including parallelism and slot configs?
>>>>> Also based on the log, you are not configuring the "client.id"
>>>>> correctly. which config key are you using? could you also share your fill
>>>>> Kafka properties map?
>>>>>
>>>>>
>>>>> --
>>>>> Rong
>>>>>
>>>>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>>>>
>>>>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <
>>>>> sidney.feiner@startapp.com> wrote:
>>>>>
>>>>>> Hey,
>>>>>> I've been using Flink for a while now without any problems when
>>>>>> running apps with a FlinkKafkaConsumer.
>>>>>> All my apps have the same overall logic (consume from kafka ->
>>>>>> transform event -> write to file) and the only way they differ from each
>>>>>> other is the topic they read (remaining kafka config remains identical) and
>>>>>> the way they transform the event.
>>>>>> But suddenly, I've been starting to get the following error:
>>>>>>
>>>>>>
>>>>>> 2020-03-15 12:13:56,911 WARN
>>>>>>  org.apache.kafka.common.utils.AppInfoParser                   - Error
>>>>>> registering AppInfo mbean
>>>>>> javax.management.InstanceAlreadyExistsException:
>>>>>> kafka.consumer:type=app-info,id=consumer-1
>>>>>>        at
>>>>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>>>>        at
>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>>>>
>>>>>>        at
>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>>>>>
>>>>>>        at
>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>>>>>
>>>>>>        at
>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>>>>>
>>>>>>        at
>>>>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>>>>>
>>>>>>        at
>>>>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>>>>>
>>>>>>        at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>>>>>>
>>>>>>        at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>>>>>>
>>>>>>        at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>>>        at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>>>        at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>> I've tried setting the "client.id" on my consumer to a random UUID,
>>>>>> making sure I don't have any duplicates but that didn't help either.
>>>>>> Any idea what could be causing this?
>>>>>>
>>>>>> Thanks 🙂
>>>>>>
>>>>>> *Sidney Feiner* */* Data Platform Developer
>>>>>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>>>>>
>>>>>> [image: emailsignature]
>>>>>>
>>>>>>

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

Posted by Becket Qin <be...@gmail.com>.
Actually it might be better to create another ticket, FLINK-8093 was mainly
complaining about the JMX bean collision when there are multiple tasks
running in the same TM.

Jiangjie (Becket) Qin

On Tue, Mar 17, 2020 at 6:33 PM Becket Qin <be...@gmail.com> wrote:

> Hi Till,
>
> It looks FLINK-8093 <https://issues.apache.org/jira/browse/FLINK-8093> reports
> the same issue, although the reported information is not exactly correct,
> as this should not cause the producer to fail. I'll take care of the ticket.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> @Becket do we already have a JIRA ticket to track this effort?
>>
>> Cheers,
>> Till
>>
>> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <be...@gmail.com> wrote:
>>
>>> Hi Sidney,
>>>
>>> The WARN logging you saw was from the AbstractPartitionDiscoverer which
>>> is created by FlinkKafkaConsumer itself. It has an internal consumer which
>>> shares the client.id of the actual consumer fetching data. This is a
>>> bug that we should fix.
>>>
>>> As Rong said, this won't affect the normal operation of the consumer. It
>>> is just an AppInfo MBean for reporting some information. There might be
>>> some slight impact on the accuracy of the consumer metrics, but should be
>>> almost ignorable because the partition discoverer is quite inactive
>>> compared with the actual consumer.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <wa...@gmail.com> wrote:
>>>
>>>> We also had seen this issue before running Flink apps in a shared
>>>> cluster environment.
>>>>
>>>> Basically, Kafka is trying to register a JMX MBean[1] for application
>>>> monitoring.
>>>> This is only a WARN suggesting that you are registering more than one
>>>> MBean with the same client id "consumer-1", it should not affect your
>>>> normal application behavior.
>>>>
>>>> This is most likely occurring if you have more than one Kafka consumer
>>>> within the same JVM, are you using a session cluster[2]? can you share more
>>>> on your application configuration including parallelism and slot configs?
>>>> Also based on the log, you are not configuring the "client.id"
>>>> correctly. which config key are you using? could you also share your fill
>>>> Kafka properties map?
>>>>
>>>>
>>>> --
>>>> Rong
>>>>
>>>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>>>
>>>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <
>>>> sidney.feiner@startapp.com> wrote:
>>>>
>>>>> Hey,
>>>>> I've been using Flink for a while now without any problems when
>>>>> running apps with a FlinkKafkaConsumer.
>>>>> All my apps have the same overall logic (consume from kafka ->
>>>>> transform event -> write to file) and the only way they differ from each
>>>>> other is the topic they read (remaining kafka config remains identical) and
>>>>> the way they transform the event.
>>>>> But suddenly, I've been starting to get the following error:
>>>>>
>>>>>
>>>>> 2020-03-15 12:13:56,911 WARN
>>>>>  org.apache.kafka.common.utils.AppInfoParser                   - Error
>>>>> registering AppInfo mbean
>>>>> javax.management.InstanceAlreadyExistsException:
>>>>> kafka.consumer:type=app-info,id=consumer-1
>>>>>        at
>>>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>>>        at
>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>>>
>>>>>        at
>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>>>>
>>>>>        at
>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>>>>
>>>>>        at
>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>>>>
>>>>>        at
>>>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>>>>
>>>>>        at
>>>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>>>>
>>>>>        at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>>>>>
>>>>>        at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>>>>>
>>>>>        at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>>>>>
>>>>>        at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>>>>
>>>>>        at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>>>
>>>>>        at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>>>>
>>>>>        at
>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>>
>>>>>        at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>>
>>>>>        at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>>>>>
>>>>>        at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>>>>>
>>>>>        at
>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>>        at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>>
>>>>> I've tried setting the "client.id" on my consumer to a random UUID,
>>>>> making sure I don't have any duplicates but that didn't help either.
>>>>> Any idea what could be causing this?
>>>>>
>>>>> Thanks 🙂
>>>>>
>>>>> *Sidney Feiner* */* Data Platform Developer
>>>>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>>>>
>>>>> [image: emailsignature]
>>>>>
>>>>>

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

Posted by Becket Qin <be...@gmail.com>.
Hi Till,

It looks FLINK-8093 <https://issues.apache.org/jira/browse/FLINK-8093> reports
the same issue, although the reported information is not exactly correct,
as this should not cause the producer to fail. I'll take care of the ticket.

Thanks,

Jiangjie (Becket) Qin

On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann <tr...@apache.org> wrote:

> @Becket do we already have a JIRA ticket to track this effort?
>
> Cheers,
> Till
>
> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <be...@gmail.com> wrote:
>
>> Hi Sidney,
>>
>> The WARN logging you saw was from the AbstractPartitionDiscoverer which
>> is created by FlinkKafkaConsumer itself. It has an internal consumer which
>> shares the client.id of the actual consumer fetching data. This is a bug
>> that we should fix.
>>
>> As Rong said, this won't affect the normal operation of the consumer. It
>> is just an AppInfo MBean for reporting some information. There might be
>> some slight impact on the accuracy of the consumer metrics, but should be
>> almost ignorable because the partition discoverer is quite inactive
>> compared with the actual consumer.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <wa...@gmail.com> wrote:
>>
>>> We also had seen this issue before running Flink apps in a shared
>>> cluster environment.
>>>
>>> Basically, Kafka is trying to register a JMX MBean[1] for application
>>> monitoring.
>>> This is only a WARN suggesting that you are registering more than one
>>> MBean with the same client id "consumer-1", it should not affect your
>>> normal application behavior.
>>>
>>> This is most likely occurring if you have more than one Kafka consumer
>>> within the same JVM, are you using a session cluster[2]? can you share more
>>> on your application configuration including parallelism and slot configs?
>>> Also based on the log, you are not configuring the "client.id"
>>> correctly. which config key are you using? could you also share your fill
>>> Kafka properties map?
>>>
>>>
>>> --
>>> Rong
>>>
>>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>>
>>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <
>>> sidney.feiner@startapp.com> wrote:
>>>
>>>> Hey,
>>>> I've been using Flink for a while now without any problems when running
>>>> apps with a FlinkKafkaConsumer.
>>>> All my apps have the same overall logic (consume from kafka ->
>>>> transform event -> write to file) and the only way they differ from each
>>>> other is the topic they read (remaining kafka config remains identical) and
>>>> the way they transform the event.
>>>> But suddenly, I've been starting to get the following error:
>>>>
>>>>
>>>> 2020-03-15 12:13:56,911 WARN
>>>>  org.apache.kafka.common.utils.AppInfoParser                   - Error
>>>> registering AppInfo mbean
>>>> javax.management.InstanceAlreadyExistsException:
>>>> kafka.consumer:type=app-info,id=consumer-1
>>>>        at
>>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>>        at
>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>>
>>>>        at
>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>>>
>>>>        at
>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>>>
>>>>        at
>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>>>
>>>>        at
>>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>>>
>>>>        at
>>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>>>
>>>>        at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>>>>
>>>>        at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>>>>
>>>>        at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>>>>
>>>>        at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>>>
>>>>        at
>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>>
>>>>        at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>>>
>>>>        at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>
>>>>        at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>
>>>>        at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>>>>
>>>>        at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>>>>
>>>>        at
>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>        at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>> I've tried setting the "client.id" on my consumer to a random UUID,
>>>> making sure I don't have any duplicates but that didn't help either.
>>>> Any idea what could be causing this?
>>>>
>>>> Thanks 🙂
>>>>
>>>> *Sidney Feiner* */* Data Platform Developer
>>>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>>>
>>>> [image: emailsignature]
>>>>
>>>>

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

Posted by Till Rohrmann <tr...@apache.org>.
@Becket do we already have a JIRA ticket to track this effort?

Cheers,
Till

On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <be...@gmail.com> wrote:

> Hi Sidney,
>
> The WARN logging you saw was from the AbstractPartitionDiscoverer which is
> created by FlinkKafkaConsumer itself. It has an internal consumer which
> shares the client.id of the actual consumer fetching data. This is a bug
> that we should fix.
>
> As Rong said, this won't affect the normal operation of the consumer. It
> is just an AppInfo MBean for reporting some information. There might be
> some slight impact on the accuracy of the consumer metrics, but should be
> almost ignorable because the partition discoverer is quite inactive
> compared with the actual consumer.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <wa...@gmail.com> wrote:
>
>> We also had seen this issue before running Flink apps in a shared cluster
>> environment.
>>
>> Basically, Kafka is trying to register a JMX MBean[1] for application
>> monitoring.
>> This is only a WARN suggesting that you are registering more than one
>> MBean with the same client id "consumer-1", it should not affect your
>> normal application behavior.
>>
>> This is most likely occurring if you have more than one Kafka consumer
>> within the same JVM, are you using a session cluster[2]? can you share more
>> on your application configuration including parallelism and slot configs?
>> Also based on the log, you are not configuring the "client.id"
>> correctly. which config key are you using? could you also share your fill
>> Kafka properties map?
>>
>>
>> --
>> Rong
>>
>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>
>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <si...@startapp.com>
>> wrote:
>>
>>> Hey,
>>> I've been using Flink for a while now without any problems when running
>>> apps with a FlinkKafkaConsumer.
>>> All my apps have the same overall logic (consume from kafka -> transform
>>> event -> write to file) and the only way they differ from each other is the
>>> topic they read (remaining kafka config remains identical) and the way they
>>> transform the event.
>>> But suddenly, I've been starting to get the following error:
>>>
>>>
>>> 2020-03-15 12:13:56,911 WARN
>>>  org.apache.kafka.common.utils.AppInfoParser                   - Error
>>> registering AppInfo mbean
>>> javax.management.InstanceAlreadyExistsException:
>>> kafka.consumer:type=app-info,id=consumer-1
>>>        at
>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>        at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>
>>>        at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>>
>>>        at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>>
>>>        at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>>
>>>        at
>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>>
>>>        at
>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>>
>>>        at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>>>
>>>        at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>>>
>>>        at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>>>
>>>        at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>>
>>>        at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>
>>>        at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>>
>>>        at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>
>>>        at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>
>>>        at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>>>
>>>        at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>>>
>>>        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>        at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> I've tried setting the "client.id" on my consumer to a random UUID,
>>> making sure I don't have any duplicates but that didn't help either.
>>> Any idea what could be causing this?
>>>
>>> Thanks 🙂
>>>
>>> *Sidney Feiner* */* Data Platform Developer
>>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>>
>>> [image: emailsignature]
>>>
>>>

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

Posted by Becket Qin <be...@gmail.com>.
Hi Sidney,

The WARN logging you saw was from the AbstractPartitionDiscoverer which is
created by FlinkKafkaConsumer itself. It has an internal consumer which
shares the client.id of the actual consumer fetching data. This is a bug
that we should fix.

As Rong said, this won't affect the normal operation of the consumer. It is
just an AppInfo MBean for reporting some information. There might be some
slight impact on the accuracy of the consumer metrics, but should be almost
ignorable because the partition discoverer is quite inactive compared with
the actual consumer.

Thanks,

Jiangjie (Becket) Qin

On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <wa...@gmail.com> wrote:

> We also had seen this issue before running Flink apps in a shared cluster
> environment.
>
> Basically, Kafka is trying to register a JMX MBean[1] for application
> monitoring.
> This is only a WARN suggesting that you are registering more than one
> MBean with the same client id "consumer-1", it should not affect your
> normal application behavior.
>
> This is most likely occurring if you have more than one Kafka consumer
> within the same JVM, are you using a session cluster[2]? can you share more
> on your application configuration including parallelism and slot configs?
> Also based on the log, you are not configuring the "client.id" correctly.
> which config key are you using? could you also share your fill Kafka
> properties map?
>
>
> --
> Rong
>
> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>
> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <si...@startapp.com>
> wrote:
>
>> Hey,
>> I've been using Flink for a while now without any problems when running
>> apps with a FlinkKafkaConsumer.
>> All my apps have the same overall logic (consume from kafka -> transform
>> event -> write to file) and the only way they differ from each other is the
>> topic they read (remaining kafka config remains identical) and the way they
>> transform the event.
>> But suddenly, I've been starting to get the following error:
>>
>>
>> 2020-03-15 12:13:56,911 WARN  org.apache.kafka.common.utils.AppInfoParser
>>                   - Error registering AppInfo mbean
>> javax.management.InstanceAlreadyExistsException:
>> kafka.consumer:type=app-info,id=consumer-1
>>        at
>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>        at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>
>>        at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>
>>        at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>
>>        at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>
>>        at
>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>
>>        at
>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>
>>        at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>>
>>        at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>>
>>        at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>>
>>        at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>
>>        at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>
>>        at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>
>>        at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>
>>        at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>
>>        at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>>
>>        at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>>
>>        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>        at java.lang.Thread.run(Thread.java:748)
>>
>>
>> I've tried setting the "client.id" on my consumer to a random UUID,
>> making sure I don't have any duplicates but that didn't help either.
>> Any idea what could be causing this?
>>
>> Thanks 🙂
>>
>> *Sidney Feiner* */* Data Platform Developer
>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>
>> [image: emailsignature]
>>
>>

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

Posted by Rong Rong <wa...@gmail.com>.
We also had seen this issue before running Flink apps in a shared cluster
environment.

Basically, Kafka is trying to register a JMX MBean[1] for application
monitoring.
This is only a WARN suggesting that you are registering more than one MBean
with the same client id "consumer-1", it should not affect your normal
application behavior.

This is most likely occurring if you have more than one Kafka consumer
within the same JVM, are you using a session cluster[2]? can you share more
on your application configuration including parallelism and slot configs?
Also based on the log, you are not configuring the "client.id" correctly.
which config key are you using? could you also share your fill Kafka
properties map?


--
Rong

[1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session

On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <si...@startapp.com>
wrote:

> Hey,
> I've been using Flink for a while now without any problems when running
> apps with a FlinkKafkaConsumer.
> All my apps have the same overall logic (consume from kafka -> transform
> event -> write to file) and the only way they differ from each other is the
> topic they read (remaining kafka config remains identical) and the way they
> transform the event.
> But suddenly, I've been starting to get the following error:
>
>
> 2020-03-15 12:13:56,911 WARN  org.apache.kafka.common.utils.AppInfoParser
>                   - Error registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=app-info,id=consumer-1
>        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>        at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>
>        at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>
>        at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>
>        at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>
>        at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>
>        at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>
>        at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>
>        at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>
>        at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>
>        at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>
>        at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>
>        at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>
>        at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
>        at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>
>        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>        at java.lang.Thread.run(Thread.java:748)
>
>
> I've tried setting the "client.id" on my consumer to a random UUID,
> making sure I don't have any duplicates but that didn't help either.
> Any idea what could be causing this?
>
> Thanks 🙂
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
>