You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2017/10/23 20:29:00 UTC

[jira] [Created] (KAFKA-6108) Synchronizing on commits and StandbyTasks can be improved

Matthias J. Sax created KAFKA-6108:
--------------------------------------

             Summary: Synchronizing on commits and StandbyTasks can be improved
                 Key: KAFKA-6108
                 URL: https://issues.apache.org/jira/browse/KAFKA-6108
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.0
            Reporter: Matthias J. Sax


In Kafka Streams, we use an optimization that allows us to reuse a source topic as changelog topic (and thus, avoid unnecessary data duplication) if we read a topic directly as {{KTable}}. To guarantee that {{StandbyTasks}} provide a correct state, we need to synchronize the read progress of the {{StandbyTasks}} with the processing progress of the main {{StreamTask}} --- otherwise, the {{StandbyTasks}} might restore state too much into the future. For this, we limit the allowed restore offsets of the {{StandbyTasks}} to be not larger than the committed offsets of the {{StreamTask}}.

Furthermore, we buffer all data returned by the restore consumer that is beyond the allowed restore-offsets in-memory.

To achieve both goals, we regularly update the max allowed restore offsets (this is done task internally) and we also use a flag {{processStandbyRecords}} within {{StreamThread}} with the purpose to not call {{poll()}} on the restore consumer if our in-memory buffer has already data beyond the allowed max restore offsets.

We should consider:
 - unify both places in the code and put the whole logic into a single place (suggestion is to use the {{StreamThread}} -- a tasks, does not need to know about this optimization)
 - feed only those data into the task, that the task is allowed to restore (instead of everything)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)