You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lincoln lee (Jira)" <ji...@apache.org> on 2022/05/16 07:50:00 UTC
[jira] [Updated] (FLINK-24666) Add job level lenient option to all stateful stream operators
[ https://issues.apache.org/jira/browse/FLINK-24666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lincoln lee updated FLINK-24666:
--------------------------------
Summary: Add job level lenient option to all stateful stream operators (was: The lenient option was not exposed to users on RetractableTopNFunction)
> Add job level lenient option to all stateful stream operators
> -------------------------------------------------------------
>
> Key: FLINK-24666
> URL: https://issues.apache.org/jira/browse/FLINK-24666
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Runtime
> Affects Versions: 1.14.0, 1.13.3
> Reporter: lincoln lee
> Priority: Major
> Fix For: 1.16.0
>
>
> Currently, the lenient option was not exposed to users on RetractableTopNFunction
> {quote}// flag to skip records with non-exist error instead to fail, true by default.
> private final boolean lenient = true
> {quote}
> So there's no chance to raise the exception when the record(s) unexpectedly cleared by state ttl. Commonly this happens because a too shorter ttl at Line 190 or inconstancy between the two internal state(dataState and treeMap) at other place.
> {quote}List<RowData> inputs = dataState.get(key);
> if (inputs == null) {
> // Skip the data if it's state is cleared because of state ttl.
> if (lenient)
> Unknown macro: \{ LOG.warn(STATE_CLEARED_WARN_MSG); }
> else
> Unknown macro: \{ throw new RuntimeException(STATE_CLEARED_WARN_MSG); }
> }
> {quote}
> We'd better to expose it to users (default value can be true to keep consistent with previous version)
>
> And completely resolve the inconsistency problem (it's different from Line 190 which is uncontrollable by itself) between the two state as follows in another issue.
> {quote}// a map state stores mapping from sort key to records list
> private transient MapState<RowData, List<RowData>> dataState;
> // a sorted map stores mapping from sort key to records count
> private transient ValueState<SortedMap<RowData, Long>> treeMap
> {quote}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)