You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2018/06/12 20:31:00 UTC

[jira] [Resolved] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages

     [ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax resolved KAFKA-6782.
------------------------------------
       Resolution: Fixed
    Fix Version/s: 1.1.1
                   1.0.2
                   0.11.0.3
                   2.0.0

> GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6782
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6782
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: Lingxiao WANG
>            Assignee: Lingxiao WANG
>            Priority: Major
>             Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his solution which is below, works for the succeed transactional messages. But when there are aborted messages, it will be in infinite loop. Here is his proposition :
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  offset = consumer.position(topicPartition);
>  }
>  }{code}
> Concretely, when the consumer consume a set of aborted messages, it polls 0 records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute.
>  So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  }
>  offset = consumer.position(topicPartition);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)