You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "xiezhi (JIRA)" <ji...@apache.org> on 2018/03/07 07:45:00 UTC
[jira] [Commented] (SAMZA-1027) InstanceAlreadyExistsException
while starting up
[ https://issues.apache.org/jira/browse/SAMZA-1027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389197#comment-16389197 ]
xiezhi commented on SAMZA-1027:
-------------------------------
I have met the same problem with a different story. I configured log4j to send application logs to kafka.
There is no more producer, one only. So I couldn't figure out what's happened.
----------------log4j.properties---------------------------------
log4j.rootLogger=INFO, kafka
#appender kafka
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.topic=UAT_APP
log4j.appender.A1.Threshold=INFO
log4j.appender.kafka.syncSend=false
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=localhost:9091,localhost:9092,localhost:9093,
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d\{yyyy-MM-dd HH:mm:ss} %p %t %c (%F:%L) - %m%n
-----------------end log4j.properties---------------------------------------
It's the error log below.
2018-03-07 14:54:57 INFO localhost-startStop-1 org.apache.kafka.common.utils.AppInfoParser (AppInfoParser.java:109) - Kafka version : 1.0.0
2018-03-07 14:54:57 INFO localhost-startStop-1 org.apache.kafka.common.utils.AppInfoParser (AppInfoParser.java:110) - Kafka commitId : aaa7af6d4a11b29d
2018-03-07 14:54:57 WARN localhost-startStop-1 org.apache.kafka.common.utils.AppInfoParser (AppInfoParser.java:66) - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:427)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.log4jappender.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:246)
at org.apache.kafka.log4jappender.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:240)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:809)
at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:735)
at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:615)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:502)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:547)
at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:483)
at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
at org.apache.log4j.Logger.getLogger(Logger.java:104)
at org.apache.commons.logging.impl.Log4JLogger.getLogger(Log4JLogger.java:289)
at org.apache.commons.logging.impl.Log4JLogger.<init>(Log4JLogger.java:109)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1040)
at org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:838)
at org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:601)
at org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:333)
at org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:307)
at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:645)
at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:184)
at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:47)
at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:5118)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5634)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:145)
at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:899)
at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:875)
at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:652)
at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:1092)
at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1984)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
> InstanceAlreadyExistsException while starting up
> ------------------------------------------------
>
> Key: SAMZA-1027
> URL: https://issues.apache.org/jira/browse/SAMZA-1027
> Project: Samza
> Issue Type: Bug
> Components: kafka
> Affects Versions: 0.11.0
> Reporter: Nicolas Colomer
> Priority: Major
>
> After upgrading Samza, I started to see following WARN log while starting a Samza job:
> {code}
> 2016-09-29 10:30:09 AppInfoParser [WARN] task[] ssp[] offset[] Error registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=samza_producer-my_awesome_job-123
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
> at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
> at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
> at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
> at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:328)
> at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163)
> at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:89)
> at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:89)
> at org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:124)
> at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProducer.java:113)
> at org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.send(AbstractCoordinatorStreamManager.java:72)
> at org.apache.samza.container.LocalityManager.writeContainerToHostMapping(LocalityManager.java:134)
> at org.apache.samza.container.SamzaContainer.startLocalityManager(SamzaContainer.scala:739)
> at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:651)
> at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:116)
> at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:90)
> at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {code}
> More precisely, in my situation, this log occurs twice during startup of the job.
> I figured out that it follows the creation of 2 producers: {{samza_producer}} and {{samza_checkpoint_manager}} with client id respectively equals to {{samza_producer-my_awesome_job-123}} and {{samza_checkpoint_manager-my_awesome_job-123}}.
> This issue seems to be directly related to SAMZA-981, that remove discriminants timestamp plus unique counter value from the {{client.id}} string.
> According to KAFKA-3992, this error occurs when multiple producers / consumers are created with the same {{client.id}} setting.
> Looking at the source code, there is a [lock|https://github.com/ncolomer/samza/blob/17e65d1cbdda1ad436f47c15fa4c86332e229a93/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L119-L127] in {{KafkaSystemProducer}} that should prevent any race condition where this could happen. But is there any case where this class (or the {{getProducer}} lambda function) may be instantiated/reused multiple time in the same JVM host?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)