You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Tim te Beek (Jira)" <ji...@apache.org> on 2022/08/05 09:07:00 UTC

[jira] [Comment Edited] (KAFKA-13813) left/outer joins can wait indefinitely for emitted record with spurious record fix

    [ https://issues.apache.org/jira/browse/KAFKA-13813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575268#comment-17575268 ] 

Tim te Beek edited comment on KAFKA-13813 at 8/5/22 9:06 AM:
-------------------------------------------------------------

I guess we're running into this issue with a simplified use case:
{code:java}
@Bean
KStream<String, Request> handle(StreamsBuilder builder) {
  KStream<String, Request> unfiltered = builder.stream("unfiltered");
  KStream<String, Request> completed = builder.stream("completed");
  unfiltered
    .leftJoin(completed,
      (collected, completed) -> completed == null ? collected : null,
      JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(26)).after(Duration.ZERO),
      StreamJoined.<String, Request, Request>as("collectedstore").withName("collectedname"))
    .filter((key, Request) -> Request != null)
    .to("collected");
  return unfiltered;
}{code}
In this case we're not using grace, and an after timeDifference of ZERO.
I would expect that there's then never another record to trigger the join emit, and thus no need to block spurious/eager emission.
What we're seeing is the join is never triggered, unless there's record to join with.

Am I correct in thinking that we're indeed running up against the limits of the new join semantics?
And could the new join semantics be amended to account for the no-grace zero-after use case to join the left hand side with null immediately?


was (Author: timtebeek):
I guess we're running into this issue with a simplified use case:
{code:java}
@Bean
KStream<String, Request> handle(StreamsBuilder builder) {
  KStream<String, Request> unfiltered = builder.stream("unfiltered");
  KStream<String, Request> completed = builder.stream("completed");
  unfiltered
    .leftJoin(completed,
      (collected, completed) -> completed == null ? collected : null,
      JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(26)).after(Duration.ZERO),
      StreamJoined.<String, Request, Request>as("collectedstore").withName("collectedname"))
    .filter((key, Request) -> Request != null)
    .to("collected");
  return unfiltered;
}{code}
In this case we're not using grace, and an after timeDifference of ZERO.
I would expect that there's then never another record to trigger the join emit, and thus no need to block spurious/eager emission.

Am I correct in thinking that we're indeed running up against the limits of the new join semantics?
And could the new join semantics be amended to account for the no-grace zero-after use case to join the left hand side with null immediately?

> left/outer joins can wait indefinitely for emitted record with spurious record fix
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-13813
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13813
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Clive Cox
>            Priority: Major
>
> With the fix in https://issues.apache.org/jira/browse/KAFKA-10847 records will be emitted after the grace period but only when a new record is processed. This means its possible to wait for arbitrary long time for a record to be emitted.
> This also means one can not recreate the previous semantics of emitting immediately records or even now guaranteed emitting after the grace period.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)