You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Andrew Klopper (JIRA)" <ji...@apache.org> on 2019/03/21 11:39:00 UTC

[jira] [Comment Edited] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

    [ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798017#comment-16798017 ] 

Andrew Klopper edited comment on KAFKA-7895 at 3/21/19 11:38 AM:
-----------------------------------------------------------------

Hi

I am still seeing this behaviour using the 2.2.0 Maven artifacts from [https://repository.apache.org/content/groups/staging]

With the topology below, I invariably get emissions of previously emitted windows for the same key on restart of my streams application, and sometimes the re-emitted windows have earlier timestamps and aggregated data that is consistent with the latter part of the window being lost (i.e., state seems to be resetting to an earlier version):
{code:java}
return sourceStream
    .groupByKey()
    .windowedBy(TimeWindows.of(rollupInterval).grace(config.getGracePeriodDuration()))
    .aggregate(
        () -> rollupFactory.createRollup(windowDuration),
        aggregator,
        Materialized.<String, R, WindowStore<Bytes, byte[]>>as(outputTopic + "_state")
            .withValueSerde(rollupSerde)
    )
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
        .withName(outputTopic + "_suppress_state"))
    .toStream((stringWindowed, rollup) -> stringWindowed.key());
{code}
 I have tried disabling caching using:
{code:java}
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
{code}
 but this does not appear to make a difference.

There also seems to be no difference in behaviour between 2.2.0 and 2.1.1.

Regards

Andrew


was (Author: andrewrk):
Hi

I am still seeing this behaviour using the 2.2.0 Maven artifacts from [https://repository.apache.org/content/groups/staging]

With the topology below, I invariably get emissions of previously emitted windows for the same key on restart of my streams application, and sometimes the re-emitted windows have earlier timestamps and aggregated data that is consistent with the latter part of the window being lost (i.e., state seems to be resetting to an earlier version):
{code:java}
return sourceStream
    .groupByKey()
    .windowedBy(TimeWindows.of(rollupInterval).grace(config.getGracePeriodDuration()))
    .aggregate(
        () -> rollupFactory.createRollup(windowDuration),
        aggregator,
        Materialized.<String, R, WindowStore<Bytes, byte[]>>as(outputTopic + "_state")
            .withValueSerde(rollupSerde)
    )
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
        .withName(outputTopic + "_suppress_state"))
    .toStream((stringWindowed, rollup) -> stringWindowed.key());
{code}
 I have tried disabling caching using:
{code:java}
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
{code}
 but this does not appear to make a difference.

Regards

Andrew

> Ktable supress operator emitting more than one record for the same key per window
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-7895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7895
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.0, 2.1.1
>            Reporter: prasanthi
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within a specified window.
> Here's how we configured the suppress operator to emit one final record per key/window.
> {code:java}
> KTable<Windowed<Integer>, Long> windowedCount = groupedStream
>      .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>      .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>      .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown below.
> {code:java}
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162
> [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584
> [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107
> [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315
> [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)