You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Alex Balikov (Jira)" <ji...@apache.org> on 2022/10/26 17:47:00 UTC

[jira] [Created] (SPARK-40925) Fix late record filtering to support chaining of steteful operators

Alex Balikov created SPARK-40925:
------------------------------------

             Summary: Fix late record filtering to support chaining of steteful operators
                 Key: SPARK-40925
                 URL: https://issues.apache.org/jira/browse/SPARK-40925
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 3.4.0
            Reporter: Alex Balikov
             Fix For: 3.4.0


This is followup ticket on https://issues.apache.org/jira/browse/SPARK-40821.

Here we propose fixing the late record filtering in stateful operators to allow chaining of stateful operators which do not produce delayed records (like time-interval join of potentially flatMapGroupsWithState) - e.g. time-equality streaming join followed by aggregations or chaining of window aggregations.

 

There are 2 issues which need to be addressed:
 # Stateful operators filter input late records based on the current watermark. If e.g. chaining window aggregations, the records produced by the first window aggregation will be behind the current watermark by semantics (the watermark closes all past windows and emits the corresponding aggregates) and therefore these records will by definition appear late relative to the current watermark in the second stateful operator. The proposed fix for this issue is to use the previous batch watermark for late record filtering and the current batch watermark for state eviction - effectively each stateful operator should be initialized with 2 watermark values instead of 1.
 # The second issue with chaining window aggregators is that the records produced by the first aggregator do not have explicit event time column and thus can not be directly fed into a subsequent stateful operator which needs that column. This is partially handled by [https://github.com/apache/spark/pull/38288] so the user can explicitly introduce a new event time column by extracting the event time from the window column. This is slightly cumbersome. We propose changing the window function to handle the window column transparently - e.g.

input
  .withWatermark("eventTime", "1 seconds")
  .groupBy(window($"eventTime", "5 seconds") as 'window)
  .agg(count("*") as 'count)
  .groupBy({_}*window($"window", "10 seconds")*{_})
  .agg(count("*") as 'count, sum("count") as 'sum)
  .select($"count".as[Long], $"sum".as[Long])

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org