You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2020/03/31 01:46:00 UTC

[jira] [Commented] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

    [ https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071383#comment-17071383 ] 

Guozhang Wang commented on KAFKA-8677:
--------------------------------------

I'm going to investigate the newly reported error here, it's also been reported at least another time as

{code}
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 65541, only 5 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:345)
at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:725)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:839)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1310)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1250)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
at kafka.utils.TestUtils$.$anonfun$pollRecordsUntilTrue$1(TestUtils.scala:819)
at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:864)
at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1382)
at kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:537)
at kafka.api.EndToEndAuthorizationTest.consumeRecordsIgnoreOneAuthorizationException(EndToEndAuthorizationTest.scala:556)
at kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:376)
{code}

And in some soak test, we saw

{code}
[2020-03-30T15:32:21-07:00] (streams-soak-2-5_soak_i-002fe2f777065e280_streamslog) [2020-03-30 22:32:20,765] ERROR [stream-soak-test-7da4ed8b-dc6c-4fd4-b14c-5fe68c71559e-StreamThread-1] stream-thread [stream-soak-test-7da4ed8b-dc6c-4fd4-b14c-5fe68c71559e-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-03-30T15:32:21-07:00] (streams-soak-2-5_soak_i-002fe2f777065e280_streamslog) org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
        at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeVersion(ConsumerProtocol.java:124)
        at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:304)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:349)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
{code}

{code}
[2020-03-27T21:13:01-05:00] (streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) [2020-03-28 02:13:00,513] INFO [stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] [Consumer instanceId=ip-172-31-31-29.us-west-2.compute.internal-1, clientId=stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1-consumer, groupId=stream-soak-test] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-27T21:13:01-05:00] (streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) [2020-03-28 02:13:00,513] ERROR [stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] stream-thread [stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-03-27T21:13:01-05:00] (streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
	at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeVersion(ConsumerProtocol.java:124)
	at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:304)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:349)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
[2020-03-27T21:13:01-05:00] (streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) [2020-03-28 02:13:00,514] INFO [stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] stream-thread [stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-03-27T21:13:01-05:00] (streams-soak-2-5_soak_i-054b83e98b7ed6285_streamslog) [2020-03-28 02:13:00,514] INFO [stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] stream-thread [stream-soak-test-bda42f54-3438-4e1e-8a40-8b3dffdcc23c-StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread)
{code}

> Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> ------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8677
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8677
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, security, unit tests
>    Affects Versions: 2.5.0
>            Reporter: Boyang Chen
>            Assignee: Guozhang Wang
>            Priority: Critical
>              Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]
>  
> *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl failed, log available in /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00* *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00*     org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout instead of the expected 1 records
> ---------------------------
> I found this flaky test is actually exposing a real bug in consumer: within {{KafkaConsumer.poll}}, we have an optimization to try to send the next fetch request before returning the data in order to pipelining the fetch requests:
> {code}
>                 if (!records.isEmpty()) {
>                     // before returning the fetched records, we can send off the next round of fetches
>                     // and avoid block waiting for their responses to enable pipelining while the user
>                     // is handling the fetched records.
>                     //
>                     // NOTE: since the consumed position has already been updated, we must not allow
>                     // wakeups or any other errors to be triggered prior to returning the fetched records.
>                     if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
>                         client.pollNoWakeup();
>                     }
>                     return this.interceptors.onConsume(new ConsumerRecords<>(records));
>                 }
> {code}
> As the NOTE mentioned, this pollNoWakeup should NOT throw any exceptions, since at this point the fetch position has been updated. If an exception is thrown here, and the callers decides to capture and continue, those records would never be returned again, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)