You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Tommy Becker (JIRA)" <ji...@apache.org> on 2018/07/05 23:05:00 UTC

[jira] [Comment Edited] (KAFKA-4113) Allow KTable bootstrap

    [ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534259#comment-16534259 ] 

Tommy Becker edited comment on KAFKA-4113 at 7/5/18 11:04 PM:
--------------------------------------------------------------

Coming from Samza, I find it very surprising that there is no way to do this. If I have 2 topics with existing data, 1 stream and 1 table and write a KafkaStreams application to do a join, it seems very likely that initial records in the stream (possible quite many) will not be joined properly, as the corresponding message in the table backing topic has not yet been read.

The timestamp semantics makes sense in that I suppose there are some use-cases where you'd consider the value that was current in the table at the time of some incoming message as "better" than the latest value (though I suspect they are a minority). But in reality, the table backing topic is almost certainly log-compacted which means you can't achieve these semantics regardless as these older values are now gone, and worse, the new values have newer timestamps which perpetuate the problem we're talking about.


was (Author: twbecker):
Coming from Samza, I find it very surprising that there is no way to do this. If I have 2 topics, 1 stream and 1 table with existing data and write a KafkaStreams application to do a join, it seems very likely that initial records in the stream (possible quite many) will not be joined properly, as the corresponding message in the table backing topic has not yet been read.

The timestamp semantics makes sense in that I suppose there are some use-cases where you'd consider the value that was current in the table at the time of some incoming message as "better" than the latest value (though I suspect they are a minority). But in reality, the table backing topic is almost certainly log-compacted which means you can't achieve these semantics regardless as these older values are now gone, and worse, the new values have newer timestamps which perpetuate the problem we're talking about.

> Allow KTable bootstrap
> ----------------------
>
>                 Key: KAFKA-4113
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4113
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Major
>
> On the mailing list, there are multiple request about the possibility to "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the data. Only after this topic got read completely and the KTable is ready, the application should start processing. This would indicate, that on startup, the current partition sizes must be fetched and stored, and after KTable got populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)