You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2022/07/29 06:43:00 UTC
[jira] [Commented] (FLINK-27849) Harden correctness for non-deterministic updates present in the changelog pipeline
[ https://issues.apache.org/jira/browse/FLINK-27849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17572773#comment-17572773 ]
lincoln lee commented on FLINK-27849:
-------------------------------------
Thanks to [~jark] [~godfrey] [~jinsong] for all your contributions and valuable suggestions during the long discussion offline, I have written a simple doc of the final solution in: https://docs.google.com/document/d/ 1uFBYqyXJxuNhhB37ydGniQMF2JkS3DtGZMU4oBTJ03U
Before releasing 1.16, we also need to prepare a formal user documentation FLINK-28738.
> Harden correctness for non-deterministic updates present in the changelog pipeline
> ----------------------------------------------------------------------------------
>
> Key: FLINK-27849
> URL: https://issues.apache.org/jira/browse/FLINK-27849
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Fix For: 1.16.0
>
>
> There commonly exists updates(which means not only RowKind.INSERT messages) in a streaming pipeline, then wrong results or error may occurs when use some non-deterministic functions or operations.
> It is a long lived issue since the first day that flink sql was available in streaming, but it still not totally be eliminated though some efforts have been taken.
> We should detect all the non-deterministic operations in the changelog pipelines, raise an error to tell users the risk and also add an mechanism that can process such a issue if a user is willing to pay some cost(probably introduce the state).
> All non-deterministic operations include builtin temporal functions(now, current_timestamp...), UUID, RAND...
> or user defined non-deterministic functions (override isDeterministic return false)
> or a lookup join on a lookup source which data may change over time
> or a cdc-source with meta data field (described in FLINK-28242)
>
>
> ====== Solution ======
> Will introduce a physical plan checker to validate if there's any non-deterministic updates which may cause wrong result, and also a physical plan rewriter to eliminate the non determinism generated by lookup join node (which we think is commonly used in sql, and hard to solve by users themselves).
> For implementation steps, the main changes may include 4 parts:
> # [preparing work] Adds an internal postOptimize method for physical dag processing
> # Introduces a `StreamNonDeterministicPlanResolver` to validate if there's any non-deterministic updates which may cause wrong result and rewrite lookup join node with materialization (to eliminate the non determinism generated by lookup join node)
> # Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
> # [optimization] SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)