You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Kun Song (JIRA)" <ji...@apache.org> on 2019/05/06 11:15:00 UTC
[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting
more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833708#comment-16833708 ]
Kun Song commented on KAFKA-7895:
---------------------------------
Hi [~vvcephei], [~mjsax], any progress on that?
> Ktable supress operator emitting more than one record for the same key per window
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.1.0, 2.2.0, 2.1.1
> Reporter: prasanthi
> Assignee: John Roesler
> Priority: Blocker
> Fix For: 2.1.2, 2.2.1
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within a specified window.
> Here's how we configured the suppress operator to emit one final record per key/window.
> {code:java}
> KTable<Windowed<Integer>, Long> windowedCount = groupedStream
> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
> .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
> .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown below.
> {code:java}
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162
> [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584
> [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107
> [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315
> [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code}
> Could you please take a look?
> Thanks
>
>
> Added by John:
> Acceptance Criteria:
> * add suppress to system tests, such that it's exercised with crash/shutdown recovery, rebalance, etc.
> ** [https://github.com/apache/kafka/pull/6278]
> * make sure that there's some system test coverage with caching disabled.
> ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
> * test with tighter time bounds with windows of say 30 seconds and use system time without adding any extra time for verification
> ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)