You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (Jira)" <ji...@apache.org> on 2020/10/23 14:37:00 UTC
[jira] [Assigned] (FLINK-19748) StateFun's UnboundedFeedbackLogger
should call startNewKeyGroup for all assigned key groups
[ https://issues.apache.org/jira/browse/FLINK-19748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai reassigned FLINK-19748:
-------------------------------------------
Assignee: Tzu-Li (Gordon) Tai
> StateFun's UnboundedFeedbackLogger should call startNewKeyGroup for all assigned key groups
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-19748
> URL: https://issues.apache.org/jira/browse/FLINK-19748
> Project: Flink
> Issue Type: Bug
> Components: Stateful Functions
> Affects Versions: statefun-2.0.0, statefun-2.1.0, statefun-2.2.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3, statefun-2.2.1
>
>
> Currently, on commit the {{UnboundedFeedbackLogger}} only calls {{startNewKeyGroup}} on the raw keyed stream for key groups that actually have logged messages:
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L102
> This means that it might skip some key groups, if a key group doesn't have any logged messages.
> This doesn't conform with the expected usage of Flink's {{KeyedStateCheckpointOutputStream}}, where it expects that for ALL key groups within the range, {{startNewKeyGroup}} needs to be invoked.
> The reason for this is that underneath, calling {{startNewKeyGroup}} would also record the starting stream offset position for the key group.
> However, when iterating through a raw keyed stream, the key group offsets iterator {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} doesn't take into account that some key groups weren't written and therefore do not have offsets defined, and the streams will be seeked to incorrect positions.
> Ultimately, if some key groups were skipped while writing to the raw keyed stream, the following error will be thrown on restore:
> {code}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
> at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
> ... 9 more
> Caused by: java.io.IOException: position out of bounds
> at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
> ... 10 more
> {code}
> h2. *Possible solutions*
> There are 2 possible solutions, either by fixing in StateFun or in Flink:
> - This can be fixed in StateFun by ensuring that the feedback logger starts a new key group for all key groups in range, by doing:
> {code}
> for (int keyGroupId : rawKeyedStream.getKeyGroupList()) {
> rawKeyedStream.startNewKeyGroup(keyGroupId);
> // write to stream if there are logged messages for this key group
> }
> {code}
> - Or, alternatively, we change the {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} in Flink to skip key groups that don't have a defined offset (i.e. {{startNewKeyGroup}} wasn't called for these key groups).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)