You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2022/07/29 06:43:00 UTC

[jira] [Commented] (FLINK-27849) Harden correctness for non-deterministic updates present in the changelog pipeline

    [ https://issues.apache.org/jira/browse/FLINK-27849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17572773#comment-17572773 ] 

lincoln lee commented on FLINK-27849:
-------------------------------------

Thanks to [~jark]  [~godfrey]  [~jinsong] for all your contributions and valuable suggestions during the long discussion offline, I have written a simple doc of the final solution in: https://docs.google.com/document/d/ 1uFBYqyXJxuNhhB37ydGniQMF2JkS3DtGZMU4oBTJ03U

Before releasing 1.16,  we also need to prepare a formal user documentation FLINK-28738.

> Harden correctness for non-deterministic updates present in the changelog pipeline
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-27849
>                 URL: https://issues.apache.org/jira/browse/FLINK-27849
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Major
>             Fix For: 1.16.0
>
>
> There commonly exists updates(which means not only RowKind.INSERT messages) in a streaming pipeline, then wrong results or error may occurs when use some non-deterministic functions or operations.
> It is a long lived issue since the first day that flink sql was available in streaming, but it still not totally be eliminated though some efforts have been taken.
> We should detect all the non-deterministic operations in the changelog pipelines, raise an error to tell users the risk and also add an mechanism that can process such a issue if a user is willing to pay some cost(probably introduce the state).
> All non-deterministic operations include builtin temporal functions(now, current_timestamp...), UUID, RAND... 
> or user defined non-deterministic functions (override isDeterministic return false)
> or a lookup join on a lookup source which data may change over time
> or a cdc-source with meta data field (described in FLINK-28242)
>  
>  
> ====== Solution  ======
> Will introduce a physical plan checker to validate if there's any non-deterministic updates which may cause wrong result, and also a physical plan rewriter to eliminate the non determinism generated by lookup join node (which we think is commonly used in sql, and hard to solve by users themselves).
> For implementation steps, the main changes may include 4 parts:
>  # [preparing work] Adds an internal postOptimize method for physical dag processing
>  # Introduces a `StreamNonDeterministicPlanResolver` to validate if there's any non-deterministic updates which may cause wrong result and rewrite lookup join node with materialization (to eliminate the non determinism generated by lookup join node)
>  # Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
>  # [optimization] SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)