You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Hengyu Dai (Jira)" <ji...@apache.org> on 2022/03/01 04:07:00 UTC

[jira] [Commented] (FLINK-26408) retract a non-existent record in RetractableTopNFunction

    [ https://issues.apache.org/jira/browse/FLINK-26408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499315#comment-17499315 ] 

Hengyu Dai commented on FLINK-26408:
------------------------------------

Hi, [~lzljs3620320], I find you have some research on this Exception before, could you give some help on this issue.

cc [~jark] 

> retract a non-existent record in RetractableTopNFunction 
> ---------------------------------------------------------
>
>                 Key: FLINK-26408
>                 URL: https://issues.apache.org/jira/browse/FLINK-26408
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.12.2
>            Reporter: Hengyu Dai
>            Priority: Major
>
> RetractableTopNFunction will throw a RuntimeException when
>  # the sorted Map {color:#0747a6}ValueState<SortedMap<RowData, Long>> treeMap{color} is not empty.
>  # and the sorted Map doesn't contains current sort key.
>  
> Now we have Flink SQL job:
>  
> {code:java}
> // table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk)
> select
> a_key,a_time,a_jk,b_key,b_time,b_jk
> from
> (
>     select
>     a_key,a_time,a_jk,b_key,b_time,b_jk,
>     row_number() over(partition by a_key order by a_time desc) as rn
>     from
>     (
>         select a_key, a_time, a_jk
>         from (
>             select * , row_number() over(partition by a_key order by a_time desc) as rn
>             from table_a
>         ) tmp1
>         where rn = 1
>     ) t1
>     left join
>     (
>         select b_key, b_time, b_jk
>         from (
>             select * , row_number() over(partition by b_key order by b_time desc) as rn
>             from table_b
>         ) tmp2
>         where rn = 1
>     ) t2
>     on t1.a_jk = t2.b_jk
> ) t3
> where rn = 1{code}
> the JobGraph is like:
> {{Source table_a  —>  Rank_a}}
>                                                                {{—>  Join  —> Final Rank         }}             
> {{Source table_b —>   Rank_b}}
>  
>  
>  
> Suppose we hava following input:
>  
>  # 
> ||ts||SourceA
> (a_key, a_time,a_jk)||SourceB
> (b_key,b_time,b_jk)||RankA
> (a_key, a_time,a_jk)||RankB
> (b_key,b_time,b_jk)||Join
> (a_key,b_key,a_time, a_jk)||Final Rank
> (a_key,b_key,a_time)||
> |t1| |+(b1,1,jk1)| |+(b1,1,jk1)| | |
> |t2| |+(b2,2,jk2)| |+(b2,2,jk2)| | |
> |t3|+(a1,3,jk1)| |+(a1,3,jk1)| |+(a1,b1,3,jk1)|+(a1,b1,3)|
> |t4|+(a1,4,jk1)| |-(a1,3,jk1)
> +(a1,4,jk1)| |-(a1,b1,3,jk1)
> +(a1,b1,4,jk1)|-(a1,b1,3)
> +(a1,b1,4)|
> |t5|+(a1,5,jk2)| |-(a1,4,jk1)
> +(a1,5,jk2)| |-(a1,b1,4,jk1)
> +(a1,b2,5,jk2)|-(a1,b1,4)
> +(a1,b2,5)|
> | | | | | | | |
> Assume:
>  
>  # t4&t5 is almost at the same time, the Join Operator produce 4 message at t4&t5, as the Hash Key changed(from jk1 to jk2), +(a1,b2,5,jk2) may runs on different task from the other 3 messages, and it may arrrive earlier than them
>  # Due to network congestion or high machine load, etc. the messages produced by t4&t5 take a while before they arrive Final Rank, when Final Rank received them, the state is expired because of State TTL.
> Now if +(a1,b2,5,jk2)arrives Final Rank first, the sortedMap of partition key a1 will put a sort value 5. then when -(a1,b1,3,jk1)arrives Final Rank, it will find that then sortedMap is not empty, and it doesn't contains sort key value 3. meet the conditions for that Runtime Exception.
> we met this exception in our production environment (Flink verision 1.12.2), it's very serious because when is happens, the job can not recover automatically as the state is polluted.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)