You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2020/12/23 23:54:00 UTC

[jira] [Resolved] (KAFKA-10722) Timestamped store is used even if not desired

     [ https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias J. Sax resolved KAFKA-10722.
-------------------------------------
    Fix Version/s: 2.8.0
       Resolution: Fixed

Added you to the list of contributors and assigned the ticket to you. You can now also self-assign tickets.

Thanks for reporting the issue and for helping to improve the JavaDocs.

> Timestamped store is used even if not desired
> ---------------------------------------------
>
>                 Key: KAFKA-10722
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10722
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.1, 2.6.0
>            Reporter: fml2
>            Assignee: fml2
>            Priority: Major
>             Fix For: 2.8.0
>
>
> I have a stream which I then group and aggregate (this results in a KTable). When aggregating, I explicitly tell to materialize the result table using a usual (not timestamped) store.
> After that, the KTable is filtered and streamed. This stream is processed by a processor that accesses the store.
> The problem/bug is that even if I tell to use a non-timestamped store, a timestamped one is used, which leads to a ClassCastException in the processor (it iterates over the store and expects the items to be of type "KeyValue" but they are of type "ValueAndTimestamp").
> Here is the code (schematically).
> First, I define the topology:
> {code:java}
> KTable table = ...aggregate(
>   initializer, // initializer for the KTable row
>   aggregator, // aggregator
>   Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- Non-Timestamped!
>     .withKeySerde(...).withValueSerde(...));
> table.toStream().process(theProcessor);
> {code}
> In the class for the processor:
> {code:java}
> public void init(ProcessorContext context) {
>    var store = context.getStateStore("MyStore"); // Returns a TimestampedKeyValueStore!
> }
> {code}
> A timestamped store is returned even if I explicitly told to use a non-timestamped one!
>  
> I tried to find the cause for this behaviour and think that I've found it. It lies in this line: [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]
> There, TimestampedKeyValueStoreMaterializer is used regardless of whether materialization supplier is a timestamped one or not.
> I think this is a bug.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)