You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Fred Ji (JIRA)" <ji...@apache.org> on 2018/01/04 23:12:00 UTC

[jira] [Updated] (SAMZA-1537) StreamAppender can deadlock due to locks held by Kafka and Log4j

     [ https://issues.apache.org/jira/browse/SAMZA-1537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Fred Ji updated SAMZA-1537:
---------------------------
    Fix Version/s:     (was: 0.15.0)
                   0.14.0

> StreamAppender can deadlock due to locks held by Kafka and Log4j
> ----------------------------------------------------------------
>
>                 Key: SAMZA-1537
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1537
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Jake Maes
>            Assignee: Jake Maes
>             Fix For: 0.14.0
>
>
> The thread dumps of the 2 offending threads are below, but the basics are: 
> 1. AppInfoParser in kafka uses static synchronized methods
> 2. Log4j synchronizes per Category
> So if the StreamAppender tries create a new KafkaProducer, which calls the static sync AppInfoParser thread, which then tries to log to the same Category
> {noFormat}
> "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon prio=5 tid=23 BLOCKED
> 	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
> 	   Local Variable: java.lang.String#326563
> 	   Local Variable: java.lang.String#329864
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
> 	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#12
> 	   Local Variable: java.util.ArrayList#265184
> 	   Local Variable: org.apache.kafka.common.metrics.MetricConfig#9
> 	   Local Variable: java.util.LinkedHashMap#991
> 	   Local Variable: org.apache.kafka.common.internals.ClusterResourceListeners#9
> 	   Local Variable: java.util.ArrayList#265353
> 	   Local Variable: org.apache.kafka.clients.NetworkClient#9
> 	   Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9
> 	   Local Variable: java.util.ArrayList#265374
> 	   Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3
> 	   Local Variable: java.lang.String#309971
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
> 	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#11
> 	   Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3
> 	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#7
> 	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#8
> 	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
> 	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2
> 	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
> 	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#10
> 	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#9
> 	   Local Variable: java.util.Properties#38
> 	   Local Variable: com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3
> 	   Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9
> 	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167)
> 	   Local Variable: java.lang.String#326561
> 	   Local Variable: java.lang.IllegalStateException#2
> 	   Local Variable: java.lang.String#330077
> 	   Local Variable: org.apache.samza.system.SystemProducerException#4
> 	   Local Variable: java.lang.Integer#15116
> 	at org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115)
> 	at com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32)
> 	at com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23)
> 	   Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1
> 	at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> 	at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> 	   Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1
> 	at org.apache.log4j.Category.callAppenders(Category.java:206)
> 	   Local Variable: org.apache.log4j.spi.LoggingEvent#24
> 	   Local Variable: org.apache.log4j.Logger#4
> 	at org.apache.log4j.Category.forcedLog(Category.java:391)
> 	at org.apache.log4j.Category.log(Category.java:856)
> 	at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323)
> 	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313)
> 	   Local Variable: java.util.concurrent.TimeUnit$3#1
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220)
> 	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4
> 	   Local Variable: com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2
> 	   Local Variable: java.lang.Boolean#1
> 	   Local Variable: org.apache.samza.system.SystemProducerException#2
> 	   Local Variable: java.lang.Object#203455
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown string>)
> 	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
> 	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58
> 	at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
> 	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58
> 	   Local Variable: org.apache.kafka.common.errors.TimeoutException#2
> 	   Local Variable: java.util.ArrayList$Itr#6
> 	at org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
> 	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
> 	   Local Variable: java.util.ArrayList#263984
> 	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator#4
> 	   Local Variable: java.util.ArrayList$Itr#5
> 	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch#34
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
> 	   Local Variable: org.apache.kafka.common.Cluster#2
> 	   Local Variable: java.util.Collections$EmptyMap#1
> 	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2
> 	   Local Variable: java.util.HashMap$KeyIterator#2
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
> 	   Local Variable: org.apache.kafka.clients.producer.internals.Sender#4
> 	at java.lang.Thread.run(Thread.java:745)
> "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon prio=5 tid=35 BLOCKED
> 	at org.apache.log4j.Category.callAppenders(Category.java:204)
> 	   Local Variable: org.apache.log4j.spi.LoggingEvent#26
> 	   Local Variable: org.apache.log4j.Logger#15
> 	at org.apache.log4j.Category.forcedLog(Category.java:391)
> 	at org.apache.log4j.Category.log(Category.java:856)
> 	at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304)
> 	at org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87)
> 	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59)
> 	   Local Variable: javax.management.ObjectName#162
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
> 	   Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7
> 	   Local Variable: java.util.ArrayList#264895
> 	   Local Variable: org.apache.kafka.common.internals.ClusterResourceListeners#7
> 	   Local Variable: java.lang.String#308990
> 	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#8
> 	   Local Variable: java.util.LinkedHashMap#854
> 	   Local Variable: java.util.ArrayList#264889
> 	   Local Variable: org.apache.kafka.common.metrics.MetricConfig#7
> 	   Local Variable: java.util.ArrayList#264910
> 	   Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2
> 	   Local Variable: org.apache.kafka.clients.NetworkClient#7
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182)
> 	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#5
> 	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#6
> 	   Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#6
> 	   Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2
> 	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159)
> 	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1
> 	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137)
> 	   Local Variable: java.util.Properties#67
> 	   Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8
> 	   Local Variable: com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2
> 	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2
> 	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#14
> 	   Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#13
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84)
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224)
> 	   Local Variable: org.apache.samza.system.SystemProducerException#1
> 	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5
> 	   Local Variable: com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157)
> 	at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown string>)
> 	at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362)
> 	   Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27
> 	at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162)
> 	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27
> 	   Local Variable: java.util.ArrayList$Itr#2
> 	   Local Variable: org.apache.kafka.common.errors.TimeoutException#1
> 	at org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282)
> 	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277)
> 	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator#10
> 	   Local Variable: java.util.ArrayList$Itr#1
> 	   Local Variable: java.util.ArrayList#263305
> 	   Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch#21
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210)
> 	   Local Variable: org.apache.kafka.common.Cluster#1
> 	   Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1
> 	   Local Variable: java.util.HashMap$KeyIterator#1
> 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
> 	   Local Variable: org.apache.kafka.clients.producer.internals.Sender#7
> 	at java.lang.Thread.run(Thread.java:745)
> {noFormat}
> After some discussion with [~pmaheshwari], we felt making the StreamAppender async was the only reliable solution. There are 2 approaches to this:
> 1. Use log4j2 which has async logging by default. The down side is that we'd have to update the StreamAppender and JmxAppender to be log4j2 plugins instead of simply extending AppenderSkeleton.
> 2. Add a queue and thread to StreamAppender s.t. new events are added to the queue with some timeout and the thread consumes from the queue and sends to the configured SystemProducer. 
> 2 is favorable right now because it's quicker to implement and test. It can also be easily replaced by option 1 which is already a goal because we want to leverage the performance benefits of log4j2. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)