You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Tzu-Li (Gordon) Tai (Jira)" <ji...@apache.org> on 2020/10/21 06:41:00 UTC

[jira] [Created] (FLINK-19748) StateFun's UnboundedFeedbackLogger should call startNewKeyGroup for all assigned key groups

Tzu-Li (Gordon) Tai created FLINK-19748:
-------------------------------------------

             Summary: StateFun's UnboundedFeedbackLogger should call startNewKeyGroup for all assigned key groups
                 Key: FLINK-19748
                 URL: https://issues.apache.org/jira/browse/FLINK-19748
             Project: Flink
          Issue Type: Bug
          Components: Stateful Functions
    Affects Versions: statefun-2.2.0, statefun-2.1.0, statefun-2.0.0
            Reporter: Tzu-Li (Gordon) Tai


Currently, on commit the {{UnboundedFeedbackLogger}} only calls {{startNewKeyGroup}} on the raw keyed stream for key groups that actually have logged messages:
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L102

This means that it might skip some key groups, if a key group doesn't have any logged messages.

This doesn't conform with the expected usage of Flink's {{KeyedStateCheckpointOutputStream}}, where it expects that for ALL key groups within the range, {{startNewKeyGroup}} needs to be invoked.
The reason for this is that underneath, calling {{startNewKeyGroup}} would also record the starting stream offset position for the key group.
However, when iterating through a raw keyed stream, the key group offsets iterator {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} doesn't take into account that some key groups weren't written and therefore do not have offsets defined, and the streams will be seeked to incorrect positions.

Ultimately, if some key groups were skipped while writing to the raw keyed stream, the following error will be thrown on restore:
{code}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.io.IOException: position out of bounds
	at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
	... 9 more
Caused by: java.io.IOException: position out of bounds
	at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
	... 10 more
{code}

**Possible solutions**

There are 2 possible solutions, either by fixing in StateFun or in Flink:

- This can be fixed in StateFun by ensuring that the feedback logger starts a new key group for all key groups in range, by doing:
{code}
for (int keyGroupId : rawKeyedStream.getKeyGroupList()) {
    rawKeyedStream.startNewKeyGroup(keyGroupId);
    // write to stream if there are logged messages for this key group
}
{code}

- Or, alternatively, we change the {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} in Flink to skip key groups that don't have a defined offset (i.e. {{startNewKeyGroup}} wasn't called for these key groups).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)