You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Lingxiao WANG (JIRA)" <ji...@apache.org> on 2018/04/12 16:51:00 UTC

[jira] [Updated] (KAFKA-6782) GlobalStateStore never finishes restoring when consuming transactional messages

     [ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lingxiao WANG updated KAFKA-6782:
---------------------------------
    Description: 
Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his proposition :
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 offset = consumer.position(topicPartition);
 }
 }{code}
doesn't work for me. In my situation, there are chance to have several transaction markers appear in sequence in one partition. In this case, the consumer is blocked and can't poll any records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute.

 So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated.
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 }
 offset = consumer.position(topicPartition);
 }{code}
 

  was:
Some problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his proposition :
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 offset = consumer.position(topicPartition);
 }
 }{code}
doesn't work for me. In my situation, there are chance to have several transaction markers appear in sequence in one partition. In this case, the consumer is blocked and can't poll any records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute.

 So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated.
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
 for (ConsumerRecord<byte[], byte[]> record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 }
 offset = consumer.position(topicPartition);
 }{code}
 


> GlobalStateStore never finishes restoring when consuming transactional messages
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-6782
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6782
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0, 1.0.1
>            Reporter: Lingxiao WANG
>            Priority: Blocker
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his proposition :
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  offset = consumer.position(topicPartition);
>  }
>  }{code}
> doesn't work for me. In my situation, there are chance to have several transaction markers appear in sequence in one partition. In this case, the consumer is blocked and can't poll any records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute.
>  So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
>  for (ConsumerRecord<byte[], byte[]> record : records) {
>  if (record.key() != null) {
>    stateRestoreCallback.restore(record.key(), record.value());
>  }
>  }
>  offset = consumer.position(topicPartition);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)