You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jun Qin (Jira)" <ji...@apache.org> on 2020/08/12 13:11:00 UTC

[jira] [Comment Edited] (FLINK-14942) State Processing API: add an option to make deep copy

    [ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176294#comment-17176294 ] 

Jun Qin edited comment on FLINK-14942 at 8/12/20, 1:10 PM:
-----------------------------------------------------------

I have doubled checked, when creating a new savepoint from an existing savepoint, if the state for an operator in the existing savepoint is not touched (i.e., removed/modified), the SavepointMetadata of the new savepoint will contain the same path as the original metadata which is a relative path in Flink 1.11. When you try to restore a job from the new savepoint and load the state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 provided restore options.Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 moreCaused by: java.io.FileNotFoundException: /path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or directory) at java.base/java.io.FileInputStream.open0(Native Method) at java.base/java.io.FileInputStream.open(FileInputStream.java:219) at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69) at org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 15 more{code}
For those untouched operators, a potential solution, I think, is to use the read/write API of state processing API library to re-create the state of those operators in the new savepoint. [~sjwiesman], What do you think? 


was (Author: qinjunjerry):
I have doubled checked, when you creating a new savepoint from an existing savepoint, if the state for an operator in the existing savepoint is not touched (i.e., removed/modified), the SavepointMetadata of the new savepoint will contain the same path as the original metadata which is a relative path in Flink 1.11. When you try to restore a job from the new savepoint and load the state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 provided restore options.Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 moreCaused by: java.io.FileNotFoundException: /path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or directory) at java.base/java.io.FileInputStream.open0(Native Method) at java.base/java.io.FileInputStream.open(FileInputStream.java:219) at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69) at org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 15 more{code}
For those untouched operators, a potential solution, I think, is to use the read/write API of state processing API library to re-create the state of those operators in the new savepoint. [~sjwiesman], What do you think? 

> State Processing API: add an option to make deep copy
> -----------------------------------------------------
>
>                 Key: FLINK-14942
>                 URL: https://issues.apache.org/jira/browse/FLINK-14942
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / State Processor
>            Reporter: Jun Qin
>            Assignee: Jun Qin
>            Priority: Major
>              Labels: usability
>             Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then there are references in the new savepoint to the source savepoint. Here is the [State Processing API doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] says: 
> bq. Note: When basing a new savepoint on existing state, the state processor api makes a shallow copy of the pointers to the existing operators. This means that both savepoints share state and one cannot be deleted without corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)