You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Charles Tan <ct...@gmail.com> on 2020/12/22 02:57:02 UTC

Flink 1.11 State Compatibility Issue Table API

Hello,

I'm currently investigating a state compatibility issue when trying to
upgrade from Flink 1.9 to Flink 1.11. We are seeing "FlinkException: Could
not restore keyed state backend for WindowOperator" caused by
"StateMigrationException: The new key serializer must be compatible" (stack
trace provided below). Our application uses the legacy planner with the
Table API. I noticed work on FLINK-16998 [1] was introduced in the Flink
1.11 release which I believe to be the cause of our state issues. We assume
that the issue is because Flink doesn't allow state migration for keys [2].
Is this assumption correct? Also, is there any workaround for us to recover
our state? I noticed the state processor API supports reading and writing
window state for Flink 1.12 [3]. Could upgrading directly to Flink 1.12 and
manipulating our savepoints using the state processor API be a viable
option?

[1] https://issues.apache.org/jira/browse/FLINK-16998
[2]
http://mail-archives.apache.org/mod_mbox/flink-dev/201906.mbox/%3CCAJnSTVxpzx2RVb8zKGz2pz=SD5Tmr-EsL8jQwhdO5U_MuY9uDw@mail.gmail.com%3E
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#window-state

Here is the stack trace:

java.lang.Exception: Exception while creating
StreamOperatorStateContext.	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131]Caused by:
org.apache.flink.util.FlinkException: Could not restore keyed state
backend for WindowOperator_c6b609a3afd99a8def9da162cdd0e7db_(1/1) from
any of the 1 provided restore options.	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	... 9 moreCaused by:
org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.	at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	... 9 moreCaused by:
org.apache.flink.util.StateMigrationException: The new key serializer
must be compatible.	at
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
~[flink-dist_2.11-1.11.2.jar:1.11.2]	at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
~[flink-dist_2.11-1.11.2.jar:1.11.2]


Thanks,
Charles

Re: Flink 1.11 State Compatibility Issue Table API

Posted by Arvid Heise <ar...@ververica.com>.
Hi Charles,

I fear that your investigation is correct. I'd also assume that you have to
use state processor API.

Do I assume correctly that your colleague Günther also enquired about the
same issue [1]? If so, let's keep the discussion there.

[1]
https://lists.apache.org/thread.html/r650a77a1426be7b2ad93dd0950c21c06de336be4ad42bf8ad7610baf%40%3Cdev.flink.apache.org%3E

On Tue, Dec 22, 2020 at 3:57 AM Charles Tan <ct...@gmail.com> wrote:

> Hello,
>
> I'm currently investigating a state compatibility issue when trying to
> upgrade from Flink 1.9 to Flink 1.11. We are seeing "FlinkException: Could
> not restore keyed state backend for WindowOperator" caused by
> "StateMigrationException: The new key serializer must be compatible" (stack
> trace provided below). Our application uses the legacy planner with the
> Table API. I noticed work on FLINK-16998 [1] was introduced in the Flink
> 1.11 release which I believe to be the cause of our state issues. We assume
> that the issue is because Flink doesn't allow state migration for keys [2].
> Is this assumption correct? Also, is there any workaround for us to recover
> our state? I noticed the state processor API supports reading and writing
> window state for Flink 1.12 [3]. Could upgrading directly to Flink 1.12 and
> manipulating our savepoints using the state processor API be a viable
> option?
>
> [1] https://issues.apache.org/jira/browse/FLINK-16998
> [2]
> http://mail-archives.apache.org/mod_mbox/flink-dev/201906.mbox/%3CCAJnSTVxpzx2RVb8zKGz2pz=SD5Tmr-EsL8jQwhdO5U_MuY9uDw@mail.gmail.com%3E
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#window-state
>
> Here is the stack trace:
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131]Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_c6b609a3afd99a8def9da162cdd0e7db_(1/1) from any of the 1 provided restore options.	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	... 9 moreCaused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible.	at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) ~[flink-dist_2.11-1.11.2.jar:1.11.2]	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>
> Thanks,
> Charles
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng