You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jessica Johann (Jira)" <ji...@apache.org> on 2021/03/26 08:46:00 UTC
[jira] [Updated] (KAFKA-12564) KTable#filter-method called twice
after aggregation
[ https://issues.apache.org/jira/browse/KAFKA-12564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jessica Johann updated KAFKA-12564:
-----------------------------------
Description:
Libraries from build.sbt:
{{"org.apache.kafka" % "kafka_2.13" % "2.7.0",}}
{{"org.apache.kafka" % "kafka-streams" % "2.7.0",}}
{{"org.apache.kafka" % "kafka-clients" % "2.7.0",}}
{{"org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}}
h4.
Feed the Stream "issue_stream" with:
{{(1->"A")}}
{{(1->"B")}}
h4.
Topology:
{{// #1}}
{{val issueStream:KStream[Int,String] = builder.stream[Int,String]("issue_stream")}}
{{{{// #2}}}}
{{ {{val aggTable:KTable[Int,String] =}}}}
{{issueStream}}
{{.groupBy((k,v)=>k)}}
{{.aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}}
{{{{// #3}}}}
{{ {{aggTable}}}}
{{.toStream}}
{{.print(Printed.toSysOut)}}
{{{{// #4}}}}
{{ {{aggTable.filter((k,v)=>{}}}}
{{ {{ println(s"filter($k, $v) at ${System.nanoTime()}")}}}}
{{ {{ true}}}}
{{})}}
{{.toStream}}
{{.print(Printed.toSysOut)}}
h4.
First Tuple: (1->"A")
#3 Output as expected, the aggregated tuple ("EMPTY"+"+A")
{{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A}}
#4 The filter-method is called twice.
First call with the expected tuple.
{{filter(1, EMPTY+A) at 211379567071847}}
The second call with the empty initialized aggregate.
{{filter(1, EMPTY) at 211379567120375}}
Output contains the correct tuple
{{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A}}
h4.
Second Tuple: (1->"B")
#3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B")
{{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B}}
#4 Again a second unexpected call to filter(...) with the previous tuple before aggregation
First call:
{{filter(1, EMPTY+A+B) at 211379567498482}}
Second call:
{{filter(1, EMPTY+A) at 211379567524475}}
But the output contains only one tuple as expected
{{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B}}
was:
Libraries from build.sbt:
{{ "org.apache.kafka" % "kafka_2.13" % "2.7.0",}}
{{ "org.apache.kafka" % "kafka-streams" % "2.7.0",}}
{{ "org.apache.kafka" % "kafka-clients" % "2.7.0",}}
{{ "org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}}
h4.
Feed the Stream "issue_stream" with:
{{(1->"A")}}
{{(1->"B")}}
h4.
Topology:
{{// #1}}
{{val issueStream:KStream[Int,String] = builder.stream[Int,String]("issue_stream")}}
{{// #2}}
{{val aggTable:KTable[Int,String] =}}
{{ issueStream}}
{{ .groupBy((k,v)=>k)}}
{{ .aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}}
{{// #3}}
{{aggTable}}
{{ .toStream}}
{{ .print(Printed.toSysOut)}}
{{// #4}}
{{aggTable.filter((k,v)=>{}}
{{ println(s"filter($k, $v) at ${System.nanoTime()}")}}
{{ true}}
{{ })}}
{{ .toStream}}
{{ .print(Printed.toSysOut)}}
h4.
First Tuple: (1->"A")
#3 Output as expected, the aggregated tuple ("EMPTY"+"+A")
{{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A}}
#4 The filter-method is called twice.
First call with the expected tuple.
{{filter(1, EMPTY+A) at 211379567071847}}
The second call with the empty initialized aggregate.
{{filter(1, EMPTY) at 211379567120375}}
Output contains the correct tuple
{{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A}}
h4.
Second Tuple: (1->"B")
#3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B")
{{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B}}
#4 Again a second unexpected call to filter(...) with the previous tuple before aggregation
First call:
{{filter(1, EMPTY+A+B) at 211379567498482}}
Second call:
{{filter(1, EMPTY+A) at 211379567524475}}
But the output contains only one tuple as expected
{{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B}}
> KTable#filter-method called twice after aggregation
> ---------------------------------------------------
>
> Key: KAFKA-12564
> URL: https://issues.apache.org/jira/browse/KAFKA-12564
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.7.0
> Reporter: Jessica Johann
> Priority: Major
>
> Libraries from build.sbt:
> {{"org.apache.kafka" % "kafka_2.13" % "2.7.0",}}
> {{"org.apache.kafka" % "kafka-streams" % "2.7.0",}}
> {{"org.apache.kafka" % "kafka-clients" % "2.7.0",}}
> {{"org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}}
> h4.
> Feed the Stream "issue_stream" with:
> {{(1->"A")}}
> {{(1->"B")}}
> h4.
> Topology:
> {{// #1}}
> {{val issueStream:KStream[Int,String] = builder.stream[Int,String]("issue_stream")}}
>
> {{{{// #2}}}}
> {{ {{val aggTable:KTable[Int,String] =}}}}
> {{issueStream}}
> {{.groupBy((k,v)=>k)}}
> {{.aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}}
>
> {{{{// #3}}}}
> {{ {{aggTable}}}}
> {{.toStream}}
> {{.print(Printed.toSysOut)}}
>
> {{{{// #4}}}}
> {{ {{aggTable.filter((k,v)=>{}}}}
> {{ {{ println(s"filter($k, $v) at ${System.nanoTime()}")}}}}
> {{ {{ true}}}}
> {{})}}
> {{.toStream}}
> {{.print(Printed.toSysOut)}}
> h4.
> First Tuple: (1->"A")
> #3 Output as expected, the aggregated tuple ("EMPTY"+"+A")
> {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A}}
>
> #4 The filter-method is called twice.
> First call with the expected tuple.
> {{filter(1, EMPTY+A) at 211379567071847}}
> The second call with the empty initialized aggregate.
> {{filter(1, EMPTY) at 211379567120375}}
> Output contains the correct tuple
> {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A}}
> h4.
> Second Tuple: (1->"B")
> #3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B")
> {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B}}
> #4 Again a second unexpected call to filter(...) with the previous tuple before aggregation
> First call:
> {{filter(1, EMPTY+A+B) at 211379567498482}}
> Second call:
> {{filter(1, EMPTY+A) at 211379567524475}}
> But the output contains only one tuple as expected
> {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)