You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/02 08:29:00 UTC

[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

    [ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460706#comment-16460706 ] 

ASF GitHub Bot commented on FLINK-9169:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5945

    [FLINK-9169] [runtime] Allow KeyedBackendSerializationProxy to specify whether serializer presence is required

    ## What is the purpose of the change
    
    Previously, in both the RocksDB and heap state backends, on restore of a savepoint if the previous state serializer can not be read, the restore will fail.
    
    This is a must in the heap side, but not for RocksDB, since the restore can still proceed with the new serializer (given that the serializer is compatible).
    This PR relaxes the requirement of serializer presence at restore time for RocksDB.
    
    ## Brief change log
    
    - Let `KeyedBackendSerializationProxy` allow specifying a flag whether or not serializer presence is strictly required when restoring the keyed backend. For heap backends, this flag would be `true`, while for RocksDB this flag is `false`.
    - `TypeSerializerSerializationUtil.tryReadSerializer(...)` now always returns a `UnloadableDummyTypeSerializer` if the `useDummyPlaceholder` is set to `true` and an exception occurred (regardless of what exception) while reading serializers. It only returns `null` if `useDummyPlaceholder` is `false`. This flag corresponds to the serializer presence flag mentioned above.
    
    ## Verifying this change
    
    There are already a few existing tests related to this issue:
    - `MemoryStateBackendTest.testKeyedStateRestoreFailsIfSerializerDeserializationFails`
    - `SerializationProxiesTest.testKeyedBackendSerializationProxyRoundtripWithSerializerSerializationFailures`
    
    What may still be missing is an e2e test, that verifies RocksDB can restore correctly even if a previous serializer class is no longer in the classpath (i.e. is replaced by a new compatible serializer of a different class).
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-9169

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5945.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5945
    
----
commit 0eb2918a7551094b712173e332df7f2663ecf0cc
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-05-02T05:37:22Z

    [FLINK-9169] [runtime] Allow KeyedBackendSerializationProxy to specify whether serializer presence is required

----


> NPE when restoring from old savepoint and state serializer could not be deserialized
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-9169
>                 URL: https://issues.apache.org/jira/browse/FLINK-9169
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: Till Rohrmann
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>             Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task                     - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>         at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
>         at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
>         at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
>         ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the {{stateSerializer}} can be {{null}}. This is not the problem, however, in {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} where we null check the state serializer. This will then fail with an indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)