You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Yuriy Badalyantc (Jira)" <ji...@apache.org> on 2020/04/13 03:52:00 UTC

[jira] [Commented] (KAFKA-5875) Consumer group repeatedly fails to join, even across JVM restarts: BufferUnderFlowException reading the {{version}} field in the consumer protocol header

    [ https://issues.apache.org/jira/browse/KAFKA-5875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17082026#comment-17082026 ] 

Yuriy Badalyantc commented on KAFKA-5875:
-----------------------------------------

I have a similar issue. I have a kafka-streams application and sometimes it falls with this:
```
o.a.k.c.p.t.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
  at o.a.k.c.p.t.Schema.read(Schema.java:110)
  at o.a.k.c.c.i.ConsumerProtocol.deserializeVersion(ConsumerProtocol.java:124)
  at o.a.k.c.c.i.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:304)
  at o.a.k.c.c.i.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:340)
  at o.a.k.c.c.i.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
  at o.a.k.c.c.i.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
  at o.a.k.c.c.i.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
  at o.a.k.c.c.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
  at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1231)
  at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1211)
  at o.a.k.s.p.i.StreamThread.pollRequests(StreamThread.java:843)
  at o.a.k.s.p.i.StreamThread.runOnce(StreamThread.java:743)
  at o.a.k.s.p.i.StreamThread.runLoop(StreamThread.java:698)
  at o.a.k.s.p.i.StreamThread.run(StreamThread.java:671)
Wrapped by: o.a.k.s.e.StreamsException: KafkaStreams will be stopped after uncaught exception in  StreamsThread threadId: market.pacman.production-StreamThread-3
TaskManager
  MetadataState:
    GlobalMetadata: []
    GlobalStores: []
    My HostInfo: HostInfo{host='unknown', port=-1}
    null
  Active tasks:
    Running:
    Running Partitions:
    New:
    Restoring:
    Restoring Partitions:
    Restored Partitions:
    Suspended:
  Standby tasks:
    Running:
    Running Partitions:
    New:

  at r.d.m.c.k.s.u.KafkaStreamsProcess.$anonfun$stopped$1(KafkaStreamsProcess.scala:31)
  at c.e.i.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:142)
  ... 8 frames excluded
  at s.r.j.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  at s.c.BlockContext$.withBlockContext(BlockContext.scala:94)
  ... 5 frames excluded
  at c.e.c.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1(Deferred.scala:201)
  at c.e.c.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1$adapted(Deferred.scala:201)
  at c.e.c.Deferred$ConcurrentDeferred.$anonfun$notifyReadersLoop$1(Deferred.scala:236)
  at s.r.j.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  ... 5 frames excluded
  at r.d.m.c.u.ExecutionContexts$$anon$1$$anon$2.run(ExecutionContexts.scala:110)
  at j.u.c.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
  at j.u.c.ForkJoinTask.doExec(ForkJoinTask.java:290)
  at j.u.c.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
  at j.u.c.ForkJoinPool.scan(ForkJoinPool.java:1656)
  at j.u.c.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
  at j.u.c.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
```

But after few restarts (I'm using kubernetes too) this issue automatically resolves. I'm using 2.4.0 client library and broker version is 2.3.1. 
I saw some resolved issues like this, but the problem definitely not resolved completely. Maybe it's a symptom of some other, unrelated problem?

> Consumer group repeatedly fails to join, even across JVM restarts: BufferUnderFlowException reading the {{version}} field in the consumer protocol header
> ---------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5875
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5875
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Evan Pollan
>            Priority: Major
>
> I've seen this maybe once a month in our large cluster Kubernetes-based Kafka consumers & producers.  Every once in a while a consumer in a Kubernetes "pod" get this error trying to join a consumer group:
> {code}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka version : 0.11.0.0","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka commitId : cb8625948210849f","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","message":"Revoking previously assigned partitions [] for group conv-fetch-jobs-runner-for-internal","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"(Re-)joining group conv-fetch-jobs-runner-for-internal","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:43.588+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"Successfully joined group conv-fetch-jobs-runner-for-internal with generation 17297","exception":""}
> {"errorType":"Error reading field 'version': java.nio.BufferUnderflowException","level":"ERROR","message":"Died!","operation":"Died!","stacktrace":"org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException\n\tat org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)\n\tat com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)\n\tat com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)\n\tat com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)\n\tat com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)\n\tat java.lang.Thread.run(Thread.java:745)\n","trackingId":"dead-consumer","logger":"com.spredfast.common.kafka.consumer.RunnableConsumer","loggingVersion":"UNKNOWN","component":"MetricsFetch","pid":25,"host":"fetch-2420457278-sh4f5","@timestamp":"2017-09-12T13:45:43.613Z"}
> {code}
> Pardon the log format -- these get sucked into logstash, thus the JSON.
> Here's the raw stacktrace: 
> {code}
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
> 	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> 	at com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)
> 	at com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)
> 	at com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)
> 	at com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> What's fascinating about this is:
> * We have a liveness probe (Kubernetes term for a healthcheck whose failure will cause the container backing the "pod" to be killed and restarted) attached to the existence of dead consumers.  When this situation happens, it _never_ resolves itself. Today, I found a pod that had been restarted 1023 times due to this error.
> * The only way to make it go away is to _delete_ the Kubernetes pod.  This causes it to be replaced by a pod on another Kubernetes host ("minion") using the same docker image and configuration.  Invariably, this pod comes up and all consumers join just fine
> * We have _not_ tried restarting the brokers when this happens.  
> * There must be something about the pod, container, or Kubernetes host that is consistent across pod crash loops that factors into the consumer group join process -- MAC?  hostname?  Can't be anything that is recomputed on JVM restart, though...
> Seems like there's either:
> # a bug in the client (i.e. in its assumption that it can deserialize a protocol header on successful return of that join future).  maybe there's a flavor of broker response that doesn't include this header?
> # a bug in the broker in that it's sending an empty or undersized response to a group join command in some situtations.
> It's worth noting that the severity of this issue is magnified by the fact that it requires manual intervention.  It wouldn't be so bad if our healthcheck failure tripped a pod restart, and the new JVM's consumers would join OK.  But, the fact that even a JVM restart doesn't do it means most resiliency plays won't work.
> BTW, I see a similar schema read failure in https://issues.apache.org/jira/browse/KAFKA-4349, although the client code is completely different (admin {{ConsumerGroupCommand}})



--
This message was sent by Atlassian Jira
(v8.3.4#803005)