You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2018/06/12 20:31:00 UTC
[jira] [Resolved] (KAFKA-6782) GlobalKTable GlobalStateStore never
finishes restoring when consuming aborted messages
[ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-6782.
------------------------------------
Resolution: Fixed
Fix Version/s: 1.1.1
1.0.2
0.11.0.3
2.0.0
> GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages
> --------------------------------------------------------------------------------------
>
> Key: KAFKA-6782
> URL: https://issues.apache.org/jira/browse/KAFKA-6782
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.1.0, 1.0.1
> Reporter: Lingxiao WANG
> Assignee: Lingxiao WANG
> Priority: Major
> Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his solution which is below, works for the succeed transactional messages. But when there are aborted messages, it will be in infinite loop. Here is his proposition :
> {code:java}
> while (offset < highWatermark) {
> final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
> for (ConsumerRecord<byte[], byte[]> record : records) {
> if (record.key() != null) {
> stateRestoreCallback.restore(record.key(), record.value());
> }
> offset = consumer.position(topicPartition);
> }
> }{code}
> Concretely, when the consumer consume a set of aborted messages, it polls 0 records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute.
> So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
> final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
> for (ConsumerRecord<byte[], byte[]> record : records) {
> if (record.key() != null) {
> stateRestoreCallback.restore(record.key(), record.value());
> }
> }
> offset = consumer.position(topicPartition);
> }{code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)