You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (Jira)" <ji...@apache.org> on 2020/10/23 14:37:00 UTC

[jira] [Assigned] (FLINK-19748) StateFun's UnboundedFeedbackLogger should call startNewKeyGroup for all assigned key groups

     [ https://issues.apache.org/jira/browse/FLINK-19748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tzu-Li (Gordon) Tai reassigned FLINK-19748:
-------------------------------------------

    Assignee: Tzu-Li (Gordon) Tai

> StateFun's UnboundedFeedbackLogger should call startNewKeyGroup for all assigned key groups
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19748
>                 URL: https://issues.apache.org/jira/browse/FLINK-19748
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-2.0.0, statefun-2.1.0, statefun-2.2.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.3, statefun-2.2.1
>
>
> Currently, on commit the {{UnboundedFeedbackLogger}} only calls {{startNewKeyGroup}} on the raw keyed stream for key groups that actually have logged messages:
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L102
> This means that it might skip some key groups, if a key group doesn't have any logged messages.
> This doesn't conform with the expected usage of Flink's {{KeyedStateCheckpointOutputStream}}, where it expects that for ALL key groups within the range, {{startNewKeyGroup}} needs to be invoked.
> The reason for this is that underneath, calling {{startNewKeyGroup}} would also record the starting stream offset position for the key group.
> However, when iterating through a raw keyed stream, the key group offsets iterator {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} doesn't take into account that some key groups weren't written and therefore do not have offsets defined, and the streams will be seeked to incorrect positions.
> Ultimately, if some key groups were skipped while writing to the raw keyed stream, the following error will be thrown on restore:
> {code}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
> 	at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
> 	... 9 more
> Caused by: java.io.IOException: position out of bounds
> 	at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
> 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
> 	... 10 more
> {code}
> h2. *Possible solutions*
> There are 2 possible solutions, either by fixing in StateFun or in Flink:
> - This can be fixed in StateFun by ensuring that the feedback logger starts a new key group for all key groups in range, by doing:
> {code}
> for (int keyGroupId : rawKeyedStream.getKeyGroupList()) {
>     rawKeyedStream.startNewKeyGroup(keyGroupId);
>     // write to stream if there are logged messages for this key group
> }
> {code}
> - Or, alternatively, we change the {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} in Flink to skip key groups that don't have a defined offset (i.e. {{startNewKeyGroup}} wasn't called for these key groups).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)