You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Onur Karaman (JIRA)" <ji...@apache.org> on 2017/06/01 00:51:04 UTC

[jira] [Comment Edited] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount

    [ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16032276#comment-16032276 ] 

Onur Karaman edited comment on KAFKA-1595 at 6/1/17 12:50 AM:
--------------------------------------------------------------

I think [~ijuma]'s PR (https://github.com/apache/kafka/pull/83) is worth reinvestigating as a means of improving the controller (especially with respect to controller initialization time).

I can help out if needed.


was (Author: onurkaraman):
I think [~ijuma]'s PR (https://github.com/apache/kafka/pull/83) is worth reinvestigating as a means of improving the controller (especially with respect to controller initialization time).

> Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-1595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1595
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.8.1.1
>            Reporter: Jagbir
>            Assignee: Ismael Juma
>              Labels: newbie
>
> The following issue is created as a follow up suggested by Jun Rao
> in a kafka news group message with the Subject
> "Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount"
> SUMMARY:
> An issue was detected in a typical cluster of 3 kafka instances backed
> by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3,
> java version 1.7.0_65). On consumer end, when consumers get recycled,
> there is a troubling JSON parsing recursion which takes a busy lock and
> blocks consumers thread pool.
> In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes
> a global lock (0x00000000d3a7e1d0) during the rebalance, and fires an
> expensive JSON parsing, while keeping the other consumers from shutting
> down, see, e.g,
> at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> The deep recursive JSON parsing should be deprecated in favor
> of a better JSON parser, see, e.g,
> http://engineering.ooyala.com/blog/comparing-scala-json-libraries?
> DETAILS:
> The first dump is for a recursive blocking thread holding the lock for 0x00000000d3a7e1d0
> and the subsequent dump is for a waiting thread.
> (Please grep for 0x00000000d3a7e1d0 to see the locked object.)
> Â 
> -------------------------8<-----------------------------------------
> "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"
> prio=10 tid=0x00007f24dc285800 nid=0xda9 runnable [0x00007f249e40b000]
> java.lang.Thread.State: RUNNABLE
> at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)
> at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)
> at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)
> at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.lexical.Scanners$Scanner.<init>(Scanners.scala:49)
> at scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:60)
> at scala.util.parsing.combinator.lexical.Scanners$Scanner.rest(Scanners.scala:44)
> at scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:608)
> at scala.util.parsing.combinator.Parsers$$anonfun$acceptIf$1.apply(Parsers.scala:606)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:736)
> at scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
> at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:54)
> at scala.util.parsing.json.JSON$.parseFull(JSON.scala:68)
> at kafka.utils.Json$.liftedTree1$1(Json.scala:37)
> at kafka.utils.Json$.parseFull(Json.scala:36)
> - locked <0x00000000c5a7cdd8> (a java.lang.Object)
> at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:56)
> at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:678)
> at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:677)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.utils.ZkUtils$.getConsumersPerTopic(ZkUtils.scala:677)
> at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:437)
> at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402)
> - locked <0x00000000d3a7e1d0> (a java.lang.Object)
> at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
> --------------------------------------------------------------------
> Many BLOCKED Threads had stack trace similar to the following stack waiting to lock 0x00000000d3a7e1d0
> -------------------------8<-----------------------------------------
> "application-my-context-105" prio=10 tid=0x00007f24e45aa800 nid=0x3494 waiting for monitor entry [0x00007f24afe26000]
> java.lang.Thread.State: BLOCKED (on object monitor)
> at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> - waiting to lock <0x00000000d3a7e1d0> (a java.lang.Object)
> at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110)
> at com.custom1.Consumer.cancel(Consumer.java:312)
> at com.custom1.Consumer.run(Consumer.java:302)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> -------------------------------------------------------------------



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)