You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Tim te Beek (Jira)" <ji...@apache.org> on 2022/08/05 09:07:00 UTC
[jira] [Comment Edited] (KAFKA-13813) left/outer joins can wait indefinitely for emitted record with spurious record fix
[ https://issues.apache.org/jira/browse/KAFKA-13813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575268#comment-17575268 ]
Tim te Beek edited comment on KAFKA-13813 at 8/5/22 9:06 AM:
-------------------------------------------------------------
I guess we're running into this issue with a simplified use case:
{code:java}
@Bean
KStream<String, Request> handle(StreamsBuilder builder) {
KStream<String, Request> unfiltered = builder.stream("unfiltered");
KStream<String, Request> completed = builder.stream("completed");
unfiltered
.leftJoin(completed,
(collected, completed) -> completed == null ? collected : null,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(26)).after(Duration.ZERO),
StreamJoined.<String, Request, Request>as("collectedstore").withName("collectedname"))
.filter((key, Request) -> Request != null)
.to("collected");
return unfiltered;
}{code}
In this case we're not using grace, and an after timeDifference of ZERO.
I would expect that there's then never another record to trigger the join emit, and thus no need to block spurious/eager emission.
What we're seeing is the join is never triggered, unless there's record to join with.
Am I correct in thinking that we're indeed running up against the limits of the new join semantics?
And could the new join semantics be amended to account for the no-grace zero-after use case to join the left hand side with null immediately?
was (Author: timtebeek):
I guess we're running into this issue with a simplified use case:
{code:java}
@Bean
KStream<String, Request> handle(StreamsBuilder builder) {
KStream<String, Request> unfiltered = builder.stream("unfiltered");
KStream<String, Request> completed = builder.stream("completed");
unfiltered
.leftJoin(completed,
(collected, completed) -> completed == null ? collected : null,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(26)).after(Duration.ZERO),
StreamJoined.<String, Request, Request>as("collectedstore").withName("collectedname"))
.filter((key, Request) -> Request != null)
.to("collected");
return unfiltered;
}{code}
In this case we're not using grace, and an after timeDifference of ZERO.
I would expect that there's then never another record to trigger the join emit, and thus no need to block spurious/eager emission.
Am I correct in thinking that we're indeed running up against the limits of the new join semantics?
And could the new join semantics be amended to account for the no-grace zero-after use case to join the left hand side with null immediately?
> left/outer joins can wait indefinitely for emitted record with spurious record fix
> ----------------------------------------------------------------------------------
>
> Key: KAFKA-13813
> URL: https://issues.apache.org/jira/browse/KAFKA-13813
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Clive Cox
> Priority: Major
>
> With the fix in https://issues.apache.org/jira/browse/KAFKA-10847 records will be emitted after the grace period but only when a new record is processed. This means its possible to wait for arbitrary long time for a record to be emitted.
> This also means one can not recreate the previous semantics of emitting immediately records or even now guaranteed emitting after the grace period.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)