You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2020/11/14 00:58:00 UTC

[jira] [Commented] (KAFKA-10722) Timestamped store is used even if not desired

    [ https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231896#comment-17231896 ] 

Matthias J. Sax commented on KAFKA-10722:
-----------------------------------------

[~fml2], what you describe is not a bug but it's by design. Note, that technically, the `aggregate()` operator requires a timestamped key-value store. The only reason why we allow you to pass in a plain key-value store is for backward compatibility reasons. Before we introduced timestamped key-value store, users might have written code like yours, and we needed to make sure to not break their code when they upgrade.

If we would have designed the API "from scratch" your code would not be valid and we would enforce that you pass a timestamped key-value store into `aggregate()`.

Does this make sense?

> Timestamped store is used even if not desired
> ---------------------------------------------
>
>                 Key: KAFKA-10722
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10722
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.1, 2.6.0
>            Reporter: fml2
>            Priority: Major
>
> I have a stream which I then group and aggregate (this results in a KTable). When aggregating, I explicitly tell to materialize the result table using a usual (not timestamped) store.
> After that, the KTable is filtered and streamed. This stream is processed by a processor that accesses the store.
> The problem/bug is that even if I tell to use a non-timestamped store, a timestamped one is used, which leads to a ClassCastException in the processor (it iterates over the store and expects the items to be of type "KeyValue" but they are of type "ValueAndTimestamp").
> Here is the code (schematically).
> First, I define the topology:
> {code:java}
> KTable table = ...aggregate(
>   initializer, // initializer for the KTable row
>   aggregator, // aggregator
>   Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- Non-Timestamped!
>     .withKeySerde(...).withValueSerde(...));
> table.toStream().process(theProcessor);
> {code}
> In the class for the processor:
> {code:java}
> public void init(ProcessorContext context) {
>    var store = context.getStateStore("MyStore"); // Returns a TimestampedKeyValueStore!
> }
> {code}
> A timestamped store is returned even if I explicitly told to use a non-timestamped one!
>  
> I tried to find the cause for this behaviour and think that I've found it. It lies in this line: [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]
> There, TimestampedKeyValueStoreMaterializer is used regardless of whether materialization supplier is a timestamped one or not.
> I think this is a bug.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)