You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Andrew Klopper (JIRA)" <ji...@apache.org> on 2019/03/21 11:39:00 UTC
[jira] [Comment Edited] (KAFKA-7895) Ktable supress operator
emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798017#comment-16798017 ]
Andrew Klopper edited comment on KAFKA-7895 at 3/21/19 11:38 AM:
-----------------------------------------------------------------
Hi
I am still seeing this behaviour using the 2.2.0 Maven artifacts from [https://repository.apache.org/content/groups/staging]
With the topology below, I invariably get emissions of previously emitted windows for the same key on restart of my streams application, and sometimes the re-emitted windows have earlier timestamps and aggregated data that is consistent with the latter part of the window being lost (i.e., state seems to be resetting to an earlier version):
{code:java}
return sourceStream
.groupByKey()
.windowedBy(TimeWindows.of(rollupInterval).grace(config.getGracePeriodDuration()))
.aggregate(
() -> rollupFactory.createRollup(windowDuration),
aggregator,
Materialized.<String, R, WindowStore<Bytes, byte[]>>as(outputTopic + "_state")
.withValueSerde(rollupSerde)
)
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
.withName(outputTopic + "_suppress_state"))
.toStream((stringWindowed, rollup) -> stringWindowed.key());
{code}
I have tried disabling caching using:
{code:java}
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
{code}
but this does not appear to make a difference.
There also seems to be no difference in behaviour between 2.2.0 and 2.1.1.
Regards
Andrew
was (Author: andrewrk):
Hi
I am still seeing this behaviour using the 2.2.0 Maven artifacts from [https://repository.apache.org/content/groups/staging]
With the topology below, I invariably get emissions of previously emitted windows for the same key on restart of my streams application, and sometimes the re-emitted windows have earlier timestamps and aggregated data that is consistent with the latter part of the window being lost (i.e., state seems to be resetting to an earlier version):
{code:java}
return sourceStream
.groupByKey()
.windowedBy(TimeWindows.of(rollupInterval).grace(config.getGracePeriodDuration()))
.aggregate(
() -> rollupFactory.createRollup(windowDuration),
aggregator,
Materialized.<String, R, WindowStore<Bytes, byte[]>>as(outputTopic + "_state")
.withValueSerde(rollupSerde)
)
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
.withName(outputTopic + "_suppress_state"))
.toStream((stringWindowed, rollup) -> stringWindowed.key());
{code}
I have tried disabling caching using:
{code:java}
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
{code}
but this does not appear to make a difference.
Regards
Andrew
> Ktable supress operator emitting more than one record for the same key per window
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.1.0, 2.1.1
> Reporter: prasanthi
> Assignee: John Roesler
> Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within a specified window.
> Here's how we configured the suppress operator to emit one final record per key/window.
> {code:java}
> KTable<Windowed<Integer>, Long> windowedCount = groupedStream
> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
> .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
> .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown below.
> {code:java}
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162
> [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584
> [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107
> [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315
> [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code}
> Could you please take a look?
> Thanks
>
>
> Added by John:
> Acceptance Criteria:
> * add suppress to system tests, such that it's exercised with crash/shutdown recovery, rebalance, etc.
> ** [https://github.com/apache/kafka/pull/6278]
> * make sure that there's some system test coverage with caching disabled.
> ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
> * test with tighter time bounds with windows of say 30 seconds and use system time without adding any extra time for verification
> ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)