You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 董鹏 <do...@imdada.cn> on 2019/02/20 07:02:31 UTC

kafka consumer exception

flink大神你们好,在使用flink on kafka(1.0版本) 遇到如下异常:
不影响job,不影响结果,对于这个异常偶尔打出,你们是否有遇到这个问题呢?


[org.apache.kafka.common.utils.AppInfoParser] [AppInfoParser.java:60] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-31
  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
  at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
  at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:709)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
  at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
  at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
  at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  at java.lang.Thread.run(Thread.java:748)

Re:kafka consumer exception

Posted by 董鹏 <do...@imdada.cn>.
很感谢,目前我们也是这种思路。
可惜还暂时未解决。
 
 
------------------ Original ------------------
From:  "ForwardXu"<x1...@qq.com>;
Date:  Wed, Feb 20, 2019 03:23 PM
To:  "user-zh"<us...@flink.apache.org>; 

Subject:  回复:kafka consumer exception

 
董鹏,你好:


   你这个问题可能多半是你在kafka consumer配置中配置了client-id,然后flink多线程执行的时候用的是一样client-id向kafka请求消费数据导致。具体问题你可参看以下jira链接:
https://issues.apache.org/jira/browse/KAFKA-3992。如果是配置了client-id可以去掉留空。这样kafka会为每一个线程从新生成一个clientid,"consumer" +  自增id。




前进


------------------ 原始邮件 ------------------
发件人: "董鹏"<do...@imdada.cn>;
发送时间: 2019年2月20日(星期三) 下午3:02
收件人: "user-zh"<us...@flink.apache.org>;

主题: kafka consumer exception



flink大神你们好,在使用flink on kafka(1.0版本) 遇到如下异常:
不影响job,不影响结果,对于这个异常偶尔打出,你们是否有遇到这个问题呢?


[org.apache.kafka.common.utils.AppInfoParser] [AppInfoParser.java:60] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-31
  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
  at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
  at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:709)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
  at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
  at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
  at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  at java.lang.Thread.run(Thread.java:748)

回复:kafka consumer exception

Posted by ForwardXu <x1...@qq.com>.
董鹏,你好:


   你这个问题可能多半是你在kafka consumer配置中配置了client-id,然后flink多线程执行的时候用的是一样client-id向kafka请求消费数据导致。具体问题你可参看以下jira链接:
https://issues.apache.org/jira/browse/KAFKA-3992。如果是配置了client-id可以去掉留空。这样kafka会为每一个线程从新生成一个clientid,"consumer" +  自增id。




前进


------------------ 原始邮件 ------------------
发件人: "董鹏"<do...@imdada.cn>;
发送时间: 2019年2月20日(星期三) 下午3:02
收件人: "user-zh"<us...@flink.apache.org>;

主题: kafka consumer exception



flink大神你们好,在使用flink on kafka(1.0版本) 遇到如下异常:
不影响job,不影响结果,对于这个异常偶尔打出,你们是否有遇到这个问题呢?


[org.apache.kafka.common.utils.AppInfoParser] [AppInfoParser.java:60] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-31
  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
  at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
  at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
  at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:709)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
  at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
  at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
  at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  at java.lang.Thread.run(Thread.java:748)