You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Evan Pollan (JIRA)" <ji...@apache.org> on 2017/09/12 15:05:00 UTC

[jira] [Created] (KAFKA-5875) Consumer group repeatedly fails to join, even across JVM restarts: BufferUnderFlowException reading the {{version}} field in the consumer protocol header

Evan Pollan created KAFKA-5875:
----------------------------------

             Summary: Consumer group repeatedly fails to join, even across JVM restarts: BufferUnderFlowException reading the {{version}} field in the consumer protocol header
                 Key: KAFKA-5875
                 URL: https://issues.apache.org/jira/browse/KAFKA-5875
             Project: Kafka
          Issue Type: Bug
            Reporter: Evan Pollan


I've seen this maybe once a month in our large cluster Kubernetes-based Kafka consumers & producers.  Every once in a while a consumer in a Kubernetes "pod" get this error trying to join a consumer group:

{code}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka version : 0.11.0.0","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka commitId : cb8625948210849f","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","message":"Revoking previously assigned partitions [] for group conv-fetch-jobs-runner-for-internal","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"(Re-)joining group conv-fetch-jobs-runner-for-internal","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:43.588+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"Successfully joined group conv-fetch-jobs-runner-for-internal with generation 17297","exception":""}
{"errorType":"Error reading field 'version': java.nio.BufferUnderflowException","level":"ERROR","message":"Died!","operation":"Died!","stacktrace":"org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException\n\tat org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)\n\tat com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)\n\tat com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)\n\tat com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)\n\tat com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)\n\tat java.lang.Thread.run(Thread.java:745)\n","trackingId":"dead-consumer","logger":"com.spredfast.common.kafka.consumer.RunnableConsumer","loggingVersion":"UNKNOWN","component":"MetricsFetch","pid":25,"host":"fetch-2420457278-sh4f5","@timestamp":"2017-09-12T13:45:43.613Z"}
{code}

Pardon the log format -- these get sucked into logstash, thus the JSON.

Here's the raw stacktrace: 
{code}
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
	at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
	at com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)
	at com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)
	at com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)
	at com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)
	at java.lang.Thread.run(Thread.java:745)
{code}

What's fascinating about this is:
* We have a liveness probe (Kubernetes term for a healthcheck whose failure will cause the container backing the "pod" to be killed and restarted) attached to the existence of dead consumers.  When this situation happens, it _never_ resolves itself. Today, I found a pod that had been restarted 1023 times due to this error.
* The only way to make it go away is to _delete_ the Kubernetes pod.  This causes it to be replaced by a pod on another Kubernetes host ("minion") using the same docker image and configuration.  Invariably, this pod comes up and all consumers join just fine
* We have _not_ tried restarting the brokers when this happens.  
* There must be something about the pod, container, or Kubernetes host that is consistent across pod crash loops that factors into the consumer group join process -- MAC?  hostname?  Can't be anything that is recomputed on JVM restart, though...

Seems like there's either:
# a bug in the client (i.e. in its assumption that it can deserialize a protocol header on successful return of that join future).  maybe there's a flavor of broker response that doesn't include this header?
# a bug in the broker in that it's sending an empty or undersized response to a group join command in some situtations.

It's worth noting that the severity of this issue is magnified by the fact that it requires manual intervention.  It wouldn't be so bad if our healthcheck failure tripped a pod restart, and the new JVM's consumers would join OK.  But, the fact that even a JVM restart doesn't do it means most resiliency plays won't work.

BTW, I see a similar schema read failure in https://issues.apache.org/jira/browse/KAFKA-4349, although the client code is completely different (admin {{ConsumerGroupCommand}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)