You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2016/11/30 15:50:59 UTC
[jira] [Updated] (KAFKA-4086) long processing consumer restart will
stall
[ https://issues.apache.org/jira/browse/KAFKA-4086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-4086:
-------------------------------
Labels: consumer reliability (was: consumer)
> long processing consumer restart will stall
> -------------------------------------------
>
> Key: KAFKA-4086
> URL: https://issues.apache.org/jira/browse/KAFKA-4086
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.10.0.0
> Reporter: Dale Jin
> Labels: consumer, reliability
>
> [~hachikuji]
> We have a long processing consumer. Whenever a new consumer tries to join the group while the long processing consumer is processing, the new consumer will stall.
> If we kill the long processing consumer and restart it again, it will stall both consumers.
> When we kill the long processing consumer, that consumer tries to issue a leaveGroup command but it will fail seemingly due to the client request timeout.
> When we try to start the long processing consumer again, it seems to be sending topic metadata to the broker then the subsequent join group request is issued and returning a future but when I check the server log I don't see the corresponding request in kafka-request.log.
> When we construct the consumer, we use the following code (based on kafka-python library):
> {code}
> self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
> value_deserializer=deserializer,
> group_id=self.user_defined_sub_name,
> heartbeat_interval_ms=10000,
> session_timeout_ms=300000,
> enable_auto_commit=False)
> {code}
> on the server side, we use 0.10.0.0 with default settings.
> looks like a `RebalanceInProgressError` is thrown
> {code}
> 2016-08-22 20:39:08,984 - kafka.coordinator - INFO - Discovered coordinator 100 for group v1.user.queue
> 2016-08-22 20:39:08,984 - kafka.coordinator.consumer - INFO - Revoking previously assigned partitions set() for group v1.user.queue
> 2016-08-22 20:39:08,990 - kafka.cluster - DEBUG - Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 1)
> 2016-08-22 20:39:08,990 - kafka.coordinator - INFO - (Re-)joining group v1.user.queue
> 2016-08-22 20:39:08,990 - kafka.coordinator - DEBUG - Sending JoinGroup (JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])) to coordinator 100
> 2016-08-22 20:39:08,991 - kafka.conn - DEBUG - <BrokerConnection host=10.128.64.81/10.128.64.81 port=9092> Request 5: JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])
> 2016-08-22 20:43:04,576 - kafka.conn - WARNING - <BrokerConnection host=10.128.64.81/10.128.64.81 port=9092> timed out after 40000 ms. Closing connection.
> 2016-08-22 20:43:04,576 - kafka.client - WARNING - Node 100 connection failed – refreshing metadata
> 2016-08-22 20:43:04,576 - kafka.coordinator - ERROR - Error sending JoinGroupRequest_v0 to node 100 [Error 7 RequestTimedOutError: Request timed out after 40000 ms]
> 2016-08-22 20:43:04,576 - kafka.coordinator - WARNING - Marking the coordinator dead (node 100) for group v1.user.queue: None.
> 2016-08-22 20:43:04,678 - kafka.coordinator - DEBUG - Sending group coordinator request for group v1.user.queue to broker 100
> {code}
> fyi, we turned on the following in log4j:
> {code}
> log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
> log4j.additivity.kafka.server.KafkaApis=true
> log4j.logger.kafka.request.logger=TRACE, requestAppender
> log4j.additivity.kafka.request.logger=true
> log4j.logger.kafka.controller=TRACE, controllerAppender
> log4j.additivity.kafka.controller=true
> log4j.logger.state.change.logger=TRACE, stateChangeAppender
> log4j.additivity.state.change.logger=true
> {code}
> Please let us know how we can proceed forward to find out the root cause.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)