You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Armin Braun (JIRA)" <ji...@apache.org> on 2017/02/22 06:49:44 UTC

[jira] [Commented] (KAFKA-4569) Transient failure in org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable

    [ https://issues.apache.org/jira/browse/KAFKA-4569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877598#comment-15877598 ] 

Armin Braun commented on KAFKA-4569:
------------------------------------

Just for future reference. Tried to debug a few issues around this that I found and figured out that this is coming from the heartbeat thread.

Basically it's getting enabled without any delay in `org.apache.kafka.clients.consumer.internals.AbstractCoordinator#initiateJoinGroup` and then in some cases polls quickly enough to make `org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce` break out here:

{code}
    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        coordinator.poll(time.milliseconds());

        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // if data is available already, return it immediately
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;
{code}

Not quite sure how to fix this, but you can reproduce it by running it in an endless loop (like 2ms per run and dies after 500 or so on average :)) -> red ... goes green once you outcomment 
this enable section

{code}
                        if (heartbeatThread != null)
                            heartbeatThread.enable();
                    }
                }
{code}

in `org.apache.kafka.clients.consumer.internals.AbstractCoordinator#initiateJoinGroup`

> Transient failure in org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4569
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4569
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: unit tests
>            Reporter: Guozhang Wang
>            Assignee: Umesh Chaudhary
>              Labels: newbie
>             Fix For: 0.10.3.0
>
>
> One example is:
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/370/testReport/junit/org.apache.kafka.clients.consumer/KafkaConsumerTest/testWakeupWithFetchDataAvailable/
> {code}
> Stacktrace
> java.lang.AssertionError
> 	at org.junit.Assert.fail(Assert.java:86)
> 	at org.junit.Assert.fail(Assert.java:95)
> 	at org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable(KafkaConsumerTest.java:679)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> 	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
> 	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
> 	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
> 	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> 	at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> 	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> 	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> 	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> 	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> 	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
> 	at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> 	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> 	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
> 	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
> 	at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)