You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jason Gustafson (JIRA)" <ji...@apache.org> on 2016/10/04 03:33:20 UTC

[jira] [Updated] (KAFKA-3994) Deadlock between consumer heartbeat expiration and offset commit.

     [ https://issues.apache.org/jira/browse/KAFKA-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jason Gustafson updated KAFKA-3994:
-----------------------------------
    Fix Version/s:     (was: 0.10.1.0)
                   0.10.1.1

> Deadlock between consumer heartbeat expiration and offset commit.
> -----------------------------------------------------------------
>
>                 Key: KAFKA-3994
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3994
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.0.0
>            Reporter: Jiangjie Qin
>            Assignee: Jason Gustafson
>             Fix For: 0.10.0.2, 0.10.1.1
>
>
> I got the following stacktraces from ConsumerBounceTest
> {code}
> ...
> "Test worker" #12 prio=5 os_prio=0 tid=0x00007fbb28b7f000 nid=0x427c runnable [0x00007fbb06445000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>         - locked <0x00000003d48bcbc0> (a sun.nio.ch.Util$2)
>         - locked <0x00000003d48bcbb0> (a java.util.Collections$UnmodifiableSet)
>         - locked <0x00000003d48bbd28> (a sun.nio.ch.EPollSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>         at org.apache.kafka.common.network.Selector.select(Selector.java:454)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:411)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1086)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1054)
>         at kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:103)
>         at kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:70)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>         at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>         at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>         at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>         at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
>         at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
>         at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
>         at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:49)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>         at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>         at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>         at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> ...
> Found one Java-level deadlock:
> =============================
> "executor-Heartbeat":
>   waiting to lock monitor 0x00007fb9b00fb028 (object 0x00000003d4fa0f48, a java.util.LinkedList),
>   which is held by "kafka-request-handler-7"
> "kafka-request-handler-7":
>   waiting to lock monitor 0x00007fba6c01e2f8 (object 0x00000003d4617a48, a kafka.coordinator.GroupMetadata),
>   which is held by "executor-Heartbeat"
> Java stack information for the threads listed above:
> ===================================================
> "executor-Heartbeat":
>         at kafka.server.DelayedOperationPurgatory$Watchers.watch(DelayedOperation.scala:301)
>         - waiting to lock <0x00000003d4fa0f48> (a java.util.LinkedList)
>         at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$watchForOperation$1.apply$mcV$sp(DelayedOperation.scala:262)
>         at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$watchForOperation$1.apply(DelayedOperation.scala:260)
>         at kafka.server.DelayedOperationPurgatory$$anonfun$kafka$server$DelayedOperationPurgatory$$watchForOperation$1.apply(DelayedOperation.scala:260)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
>         at kafka.server.DelayedOperationPurgatory.kafka$server$DelayedOperationPurgatory$$watchForOperation(DelayedOperation.scala:260)
>         at kafka.server.DelayedOperationPurgatory$$anonfun$tryCompleteElseWatch$2.apply(DelayedOperation.scala:199)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:195)
>         at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:353)
>         at kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:239)
>         at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:701)
>         at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:701)
>         at scala.Option.foreach(Option.scala:236)
>         at kafka.coordinator.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:701)
>         at kafka.coordinator.DelayedJoin.onComplete(DelayedJoin.scala:39)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>         at kafka.coordinator.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:37)
>         at kafka.coordinator.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:647)
>         - locked <0x00000003d4617a48> (a kafka.coordinator.GroupMetadata)
>         at kafka.coordinator.DelayedJoin.tryComplete(DelayedJoin.scala:37)
>         at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:190)
>         - locked <0x00000003d505b5b0> (a kafka.coordinator.DelayedJoin)
>         at kafka.coordinator.GroupCoordinator.prepareRebalance(GroupCoordinator.scala:631)
>         at kafka.coordinator.GroupCoordinator.kafka$coordinator$GroupCoordinator$$maybePrepareRebalance(GroupCoordinator.scala:616)
>         - locked <0x00000003d4617a48> (a kafka.coordinator.GroupMetadata)
>         at kafka.coordinator.GroupCoordinator.onMemberFailure(GroupCoordinator.scala:639)
>         at kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:715)
>         - locked <0x00000003d4617a48> (a kafka.coordinator.GroupMetadata)
>         at kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33)
>         at kafka.server.DelayedOperation.run(DelayedOperation.scala:107)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-7":
>         at kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$putCacheCallback$2(GroupMetadataManager.scala:288)
>         - waiting to lock <0x00000003d4617a48> (a kafka.coordinator.GroupMetadata)
>         at kafka.coordinator.GroupMetadataManager$$anonfun$prepareStoreOffsets$1.apply(GroupMetadataManager.scala:336)
>         at kafka.coordinator.GroupMetadataManager$$anonfun$prepareStoreOffsets$1.apply(GroupMetadataManager.scala:336)
>         at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:123)
>         at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>         at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:105)
>         at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315)
>         - locked <0x00000003d4fa0f80> (a kafka.server.DelayedProduce)
>         - locked <0x00000003d4fa0f48> (a java.util.LinkedList)
>         at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
>         at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:201)
>         at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:374)
>         at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:293)
>         at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:242)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:868)
>         at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:865)
>         at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>         at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:865)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:473)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:503)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> Found 1 deadlock.
> {code}
> It looks the issue is that when a consumer heartbeat expired while a CommitOffsetRequest is received for the same group of that consumer, the DelayedHeartbeat and the DelayedProduce for the offset commit are grabbing the locks in different order.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)