You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Tang (Jira)" <ji...@apache.org> on 2022/12/02 09:14:00 UTC

[jira] [Resolved] (FLINK-29430) Sanity check in InternalKeyContextImpl#setCurrentKeyGroupIndex

     [ https://issues.apache.org/jira/browse/FLINK-29430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Tang resolved FLINK-29430.
------------------------------
    Resolution: Fixed

merged in master: 1af9446248677b9540ed5d53bd2b42f3b724f7b5

> Sanity check in InternalKeyContextImpl#setCurrentKeyGroupIndex
> --------------------------------------------------------------
>
>                 Key: FLINK-29430
>                 URL: https://issues.apache.org/jira/browse/FLINK-29430
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.15.3
>            Reporter: Zakelly Lan
>            Assignee: Zakelly Lan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> Currently the HeapStateBackend check whether the current key group index is a valid one while the RocksDBStateBackend will not. When using HeapStateBackend, if the user uses a non-deterministic shuffle key, an exception is thrown as follows:
>  
> {code:java}
> java.lang.IllegalArgumentException: Key group 84 is not in KeyGroupRange{startKeyGroup=32, endKeyGroup=63}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
>     at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
>     at org.apache.flink.runtime.state.heap.StateTable.getMapForKeyGroup(StateTable.java:305)
>     at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:261)
>     at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143)
>     at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
>     at com.alibaba.ververica.flink.state.benchmark.wordcount.WordCount$MixedFlatMapper.flatMap(WordCount.java:169)
>     at com.alibaba.ververica.flink.state.benchmark.wordcount.WordCount$MixedFlatMapper.flatMap(WordCount.java:160)
>     at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>     at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:135)
>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:526)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:811)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:760)
>     at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)
>     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
>     at java.lang.Thread.run(Thread.java:750)
>  {code}
> However, the RocksDBStateBackend will run without an exception. The wrong key group index will cause a state correctness problem, so it is better to do a check in {_}InternalKeyContextImpl#{_}{_}setCurrentKeyGroupIndex{_}, and throw an exception immediately.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)