You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by xyq <xi...@163.com> on 2020/04/21 06:37:14 UTC

flink checkpoint savepoint问题

hello 我是用flink1.10的ddl的双流窗口join,但是当我新增字段或修改字段类型,把程序重启后,无论是从savepoint处还是checkpoint处重启都是失败,最后只能删除掉checkpoint或savepoint才能使用,但是这样会丢些数据,请问该怎么处理,非常感谢?


报错如下:
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_33601e3dd532edccff92bfce124910c6_(1/3) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 common frames omitted
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
... 15 common frames omitted

Re: flink checkpoint savepoint问题

Posted by Yun Tang <my...@live.com>.
Hi
原因是因为新增字段或者修改字段类型后,新的serializer无法(反)序列化原先存储的数据,对于这种有字段增改需求的场景,目前Flink社区主要借助于Pojo或者avro来实现 [1],建议对相关的state schema做重新规划,以满足这种有后续升级需求的场景。

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html

祝好
唐云

________________________________
From: xyq <xi...@163.com>
Sent: Tuesday, April 21, 2020 14:37
To: user-zh <us...@flink.apache.org>
Subject: flink checkpoint savepoint问题

hello 我是用flink1.10的ddl的双流窗口join,但是当我新增字段或修改字段类型,把程序重启后,无论是从savepoint处还是checkpoint处重启都是失败,最后只能删除掉checkpoint或savepoint才能使用,但是这样会丢些数据,请问该怎么处理,非常感谢?


报错如下:
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_33601e3dd532edccff92bfce124910c6_(1/3) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 common frames omitted
Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
... 15 common frames omitted