You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/04/19 16:20:00 UTC

[jira] [Commented] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages

    [ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444331#comment-16444331 ] 

ASF GitHub Bot commented on KAFKA-6782:
---------------------------------------

Gitomain opened a new pull request #4900: KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable
URL: https://github.com/apache/kafka/pull/4900
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6782
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6782
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: Lingxiao WANG
>            Priority: Major
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his solution which is below, works for the succeed transactional messages. But when there are aborted messages, it will be in infinite loop. Here is his proposition :
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  offset = consumer.position(topicPartition);
>  }
>  }{code}
> Concretely, when the consumer consume a set of aborted messages, it polls 0 records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute.
>  So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  }
>  offset = consumer.position(topicPartition);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)