You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Rajini Sivaram (JIRA)" <ji...@apache.org> on 2018/07/24 08:30:00 UTC

[jira] [Resolved] (KAFKA-7194) Error deserializing assignment after rebalance

     [ https://issues.apache.org/jira/browse/KAFKA-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rajini Sivaram resolved KAFKA-7194.
-----------------------------------
       Resolution: Fixed
         Reviewer: Konstantine Karantasis
    Fix Version/s: 2.0.0

> Error deserializing assignment after rebalance
> ----------------------------------------------
>
>                 Key: KAFKA-7194
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7194
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Konstantine Karantasis
>            Assignee: Jason Gustafson
>            Priority: Major
>             Fix For: 2.0.0
>
>
> A simple sink connector task is failing in a test with the following exception: 
> {noformat}
> [2018-07-02 12:31:13,200] ERROR WorkerSinkTask{id=verifiable-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
>         at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77)
>         at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:243)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:353)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:338)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748){noformat}
>  
> After dumping the consumer offsets on the partition that this consumer group is writing with: 
> {noformat}
> bin/kafka-dump-log.sh --offsets-decoder --files ./00000000000000000000.log {noformat}
> we get: 
> {noformat}
> Dumping ./00000000000000000000.log
> Starting offset: 0
> offset: 0 position: 0 CreateTime: 1530534673177 isvalid: true keysize: 27 valuesize: 217 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: {"metadata":"connect-verifiable-sink"} payload: {"protocolType":"consumer","protocol":"range","generationId":1,"assignment":"{consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4=[test-0]}"}
> offset: 1 position: 314 CreateTime: 1530534673206 isvalid: true keysize: 27 valuesize: 32 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: {"metadata":"connect-verifiable-sink"} payload: {"protocolType":"consumer","protocol":null,"generationId":2,"assignment":"{}"}{noformat}
>  
> Since the broker seems to send a non-empty response to the consumer, there's a chance that the response buffer is consumed more than once at some point when parsing the response in the client. 
> Here's what the kafka-request.log shows it sends to the client with the `SYNC_GROUP` response that throws the error: 
> {noformat}
> [2018-07-02 12:31:13,185] DEBUG Completed request:RequestHeader(apiKey=SYNC_GROUP, apiVersion=2, clientId=consumer-4, correlationId=5) -- {group_id=connect-verifiable-sink,generation_id=1,member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,group_assignment=[{member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=24 cap=24]}]},response:{throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=24 cap=24]} from connection 172.31.40.44:9092-172.31.35.189:49191-25;totalTime:8.904,requestQueueTime:0.063,localTime:8.558,remoteTime:0.0,throttleTime:0.03,responseQueueTime:0.037,sendTime:0.245,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger){noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)