You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/10/29 12:10:00 UTC

[jira] [Updated] (FLINK-24704) Exception occurs when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

     [ https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ASF GitHub Bot updated FLINK-24704:
-----------------------------------
    Labels: pull-request-available  (was: )

> Exception occurs when the input record loses monotonicity on the sort key field of UpdatableTopNFunction
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24704
>                 URL: https://issues.apache.org/jira/browse/FLINK-24704
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.14.0
>            Reporter: lincoln lee
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> An IllegalArgumentException occurred when the input retract record's sort key is lower than old sort key, this's because it breaks the monotonicity on sort key field which is guaranteed by the sql semantic. It's highly possible upstream stateful operator has shorter state ttl than the stream records is that cause the staled record cleared by state ttl. 
> A reproduce case like below:
> {{{code:title=RankHarnessTest.java|borderStyle=solid}}}
> val sql =
>  """
>  |SELECT word, cnt, rank_num
>  |FROM (
>  | SELECT word, cnt,
>  | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
>  | FROM (
>  | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word, type
>  | )
>  | )
>  |WHERE rank_num <= 6
>  """.stripMargin
> {code}
> when then aggregated result column `cnt` becomes lower for a key, then downstream retract rank operator will fail on such exception:
>  
> {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167) at org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
> {quote}
> Here we should align with the RetractableTopNFunction, continue processing(but incorrectly result) by default or can be configured to failover after Flink-24666 was addressed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)