You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/05/06 15:15:04 UTC

[jira] [Commented] (FLINK-6425) Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

    [ https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999461#comment-15999461 ] 

ASF GitHub Bot commented on FLINK-6425:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3834

    [FLINK-6425] [runtime] Activate serializer upgrades in state backends

    This is a follow-up PR that finalizes serializer upgrades, and is based on #3804 (therefore, only the 2nd and 3rd commits, ed82173 and e77096a is relevant).
    
    This PR includes the following changes:
    1. Write configuration snapshots of serializers along with checkpoints (this changes serialization format of checkpoints).
    2. On restore, confront configuration snapshots with newly registered serializers using the new `TypeSerializer#getMigrationStrategy(TypeSerializerConfigSnapshot)` method.
    3. Serializer upgrades is completed if the confrontation determines that no migration is needed. The confrontation reconfigures the new serializer if the case requires. If the serializer cannot be reconfigured to avoid state migration, the job simply fails (as we currently do not have the actual state migration feature).
    
    Note that the confrontation of config snapshots is currently only performed in the `RocksDBKeyedStateBackend`, which is the only place where this is currently needed due to its lazy deserialization characteristic. After we have eager state migration in place, the confrontation should happen for all state backends on restore.
    
    ## Tests
    - Serialization compatibility of the new checkpoint format is covered with existing tests.
    - Added a test that makes sure `InvalidClassException` is also caught when deserializing old serializers in the checkpoint (which occurs if the old serializer implementation was changed and results in a new serialVersionUID).
    - Added tests for Java serialization failure resilience when reading the new checkpoints, in `SerializerProxiesTest`.
    - Added end-to-end snapshot + restore tests which require reconfiguration of the `KryoSerializer` and `PojoSerializer` in cases where registration order of Kryo classes / Pojo types were changed.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-6425

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3834.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3834
    
----
commit 538a7acecce0d72e36e3726c0df2b6b96be35fc3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-05-01T13:32:10Z

    [FLINK-6190] [core] Migratable TypeSerializers
    
    This commit introduces the user-facing APIs for migratable
    TypeSerializers. The new user-facing APIs are:
    
    - new class: TypeSerializerConfigSnapshot
    - new class: ForwardCompatibleSerializationFormatConfig
    - new method: TypeSerializer#snapshotConfiguration()
    - new method: TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)
    - new enum: ReconfigureResult

commit ed82173fe97c6e9fb0784696bc4c49f10cc4e556
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-05-02T11:35:18Z

    [hotfix] [core] Catch InvalidClassException in TypeSerializerSerializationProxy
    
    Previously, the TypeSerializerSerializationProxy only uses the dummy
    ClassNotFoundDummyTypeSerializer as a placeholder in the case where the
    user uses a completely new serializer and deletes the old one.
    
    There is also the case where the user changes the original serializer's
    implementation and results in an InvalidClassException when trying to
    deserialize the serializer. We should also use the
    ClassNotFoundDummyTypeSerializer as a temporary placeholder in this
    case.

commit e77096af29b4cbea26113928fe93218c075e4035
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-05-06T12:40:58Z

    [FLINK-6425] [runtime] Activate serializer upgrades in state backends
    
    This commit fully activates state serializer upgrades by changing the
    following:
    - Include serializer configuration snapshots in checkpoints
    - On restore, use configuration snapshots to confront new serializers to
      perform the upgrade

----


> Integrate serializer reconfiguration into state restore flow to activate serializer upgrades
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6425
>                 URL: https://issues.apache.org/jira/browse/FLINK-6425
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. Deserialization may fail if a) the serializer no longer exists in classpath, or b) the serializer class is not longer valid (i.e., implementation changed and resulted in different serialVersionUID). In this case, use a dummy serializer as a placeholder. This dummy serializer is currently the {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The configuration snapshot must be successfully deserialized, otherwise the state restore fails.
> 3. When we get the new registered serializer for the state (could be a completely new serializer, the same serializer with different implementations, or the exact same serializer untouched; either way they are seen as a new serializer), we use the configuration snapshot of the old serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the result of the upgrade, state conversion needs to take place (for now, if state conversion is required, we just fail the job as this functionality isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but requires state conversion, without the requirement that the old serializer needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires the old serializer to be present (i.e., can not be the dummy {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)