You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2018/07/13 08:09:33 UTC

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/6325

    [FLINK-9376] Allow upgrading to incompatible state serializers (state schema evolution)

    This is WIP so people can have a look.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink jira-9376-state-upgrade

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6325.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6325
    
----
commit 5fc4a36a144c3f8f22be7e21a4e542d3042d10b1
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-06-13T11:43:53Z

    [FLINK-9377] [core] (part 1) Extend TypeSerializerConfigSnapshot as a factory for restoring serializers
    
    This commit is the first step towards removing serializers from
    checkpointed state meta info and making Flink checkpoints Java
    serialization free.
    
    Instead of writing serializers in checkpoints, and trying to read that
    to obtain a restore serializer at restore time, we aim to only write the
    config snapshot as the single source of truth and use it as a factory to
    create a restore serializer.
    
    This commit adds the method and signatures to the
    TypeSerializerConfigSnapshot interface. Use of the method, as well as
    properly implementing the method for all serializers, will be
    implemented in follow-up commits.

commit 661eb6d34da450ed096a77f166a4cc62ce3efdba
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-06-14T09:52:06Z

    [FLINK-9377] [core] (part 2) Remove fallback deserializer option from CompatibilityResult
    
    Now that the config snapshot is used as a factory for the restore
    serializer, it should be guaranteed that a restore serializer is always
    available. This removes the need for the user to provide a "fallback"
    convert serializer in the case where a migration is required.

commit c91d045c5eb6e355981e4edaa6d1a0d48e5d4a5e
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-06-14T14:41:45Z

    [FLINK-9377] [core] (part 3) Deprecate TypeSerializerSerializationUtil
    
    This commit deprecates all utility methods and classes related to
    serializing serializers. All methods that will still be in use, i.e.
    writing config snapshots, are now moved to a separate new
    TypeSerializerConfigSnapshotSerializationUtil class.

commit e09f91469fb6c86f5d2f05b78a9db3d9af8cce87
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-06-18T14:24:08Z

    [FLINK-9377] [core] (part 4) Introduce BackwardsCompatibleConfigSnapshot
    
    The BackwardsCompatibleConfigSnapshot is a wrapper, dummy config
    snapshot which wraps an actual config snapshot, as well as a
    pre-existing serializer instance.
    
    In previous versions, since the config snapshot wasn't a serializer
    factory but simply a container for serializer parameters, previous
    serializers didn't necessarily have config snapshots that are capable of
    correctly creating a correct corresponding restore serializer.
    
    In this case, since previous serializers still have serializers written
    in the checkpoint, the backwards compatible solution would be to wrap
    the written serializer and the config snapshot within the
    BackwardsCompatibleConfigSnapshot dummy. When attempting to restore the
    serializer, the wrapped serializer instance is returned instead of
    actually calling the restoreSerializer method of the wrapped config
    snapshot.

commit da84665a9b101a803f7446210afc34bbd4a71703
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-07-02T03:45:20Z

    [FLINK-9377] [core] (part 5) Remove serializers from checkpoint state meta infos
    
    This commit officially removes the behaviour of writing serializers in
    the state meta info of keyed state, operator state, and timers state.
    This affects the serialization formats of the
    KeyedBackendSerializationProxy, OperatorBackendSerializationProxy, and
    InternalTimerServiceSerializationProxy, and therefore their versions are
    all upticked.

commit ee583fdcfc920413f9ddd6b70bcb96e324124015
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-07-10T08:04:41Z

    [FLINK-9377] (wip) Extend CompatibilityResult

commit 39d84472518d776c2c0aa575ec75eab8411cc512
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-07-10T09:49:25Z

    [FLINK-9377] (wip) Move responsibility of compatibility checks to config snapshot

commit 53242ec674cc35f2abc881e48e1a07098b2b913c
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-06-18T14:24:35Z

    [FLINK-9377] [core] (part 6) Properly implement restoreSerializer for simple composite serializer config snapshots

commit 89c8b1ab30fa8d92eb9cb9813bdd98ebfc4fdccc
Author: Aljoscha Krettek <al...@...>
Date:   2018-07-11T13:43:46Z

    [FLINK-9376] Move resolveKvStateCompatibility functionality into state backends
    
    The heap backend and the RocksDB backend have different requirements:
    for the heap backend we don't have to check serializer compatibility
    since we always get state conversion for free because we read with the
    old serializer and write the next snapshot with the new serializer. For
    Rocks we need more fine-grained control, therefore we have to do the
    checks (and conversions in a future commit) in the backend itself.

commit 30453b1d7d9c8c7c1d35fee190a4a85584603c3c
Author: Aljoscha Krettek <al...@...>
Date:   2018-07-12T14:42:26Z

    [FLINK-9376] Update version in StateTableByKeyGroupReaders

commit 865f529907ee144e6f7a637f6efd1585d9ea8586
Author: Aljoscha Krettek <al...@...>
Date:   2018-07-11T16:29:30Z

    [FLINK-9376] Migrate RocksDB state when necessary

----


---

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202325526
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
     		return Tuple2.of(stateInfo.f0, newMetaInfo);
     	}
     
    +	private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, SV> migrateStateIfNecessary(
    +			StateDescriptor<S, SV> stateDesc,
    +			TypeSerializer<N> namespaceSerializer,
    +			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +		@SuppressWarnings("unchecked")
    +		RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> restoredMetaInfoSnapshot =
    +			(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) restoredKvStateMetaInfos.get(
    +				stateDesc.getName());
    +
    +		Preconditions.checkState(
    +			restoredMetaInfoSnapshot != null,
    +			"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
    +				" but its corresponding restored snapshot cannot be found.");
    +
    +		StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc);
    +
    +		TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +		RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    +			stateDesc.getType(),
    +			stateDesc.getName(),
    +			namespaceSerializer,
    +			stateSerializer);
    +
    +		// check compatibility results to determine if state migration is required
    +		TypeSerializerSchemaCompatibility<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +			namespaceSerializer);
    +
    +		TypeSerializerSchemaCompatibility<SV> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +			stateSerializer);
    +
    +		if (namespaceCompatibility.isIncompatible()) {
    +			throw new UnsupportedOperationException(
    +				"Changing the namespace TypeSerializer in an incompatible way is currently not supported.");
    +		}
    +
    +		if (stateCompatibility.isIncompatible()) {
    --- End diff --
    
    refactoring 👌 


---

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202293217
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
     		return Tuple2.of(stateInfo.f0, newMetaInfo);
     	}
     
    +	private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, SV> migrateStateIfNecessary(
    +			StateDescriptor<S, SV> stateDesc,
    +			TypeSerializer<N> namespaceSerializer,
    +			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +		@SuppressWarnings("unchecked")
    +		RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> restoredMetaInfoSnapshot =
    +			(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) restoredKvStateMetaInfos.get(
    +				stateDesc.getName());
    +
    +		Preconditions.checkState(
    +			restoredMetaInfoSnapshot != null,
    +			"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
    +				" but its corresponding restored snapshot cannot be found.");
    +
    +		StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc);
    +
    +		TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +		RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    +			stateDesc.getType(),
    +			stateDesc.getName(),
    +			namespaceSerializer,
    +			stateSerializer);
    +
    +		// check compatibility results to determine if state migration is required
    +		TypeSerializerSchemaCompatibility<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +			namespaceSerializer);
    +
    +		TypeSerializerSchemaCompatibility<SV> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +			stateSerializer);
    +
    +		if (namespaceCompatibility.isIncompatible()) {
    +			throw new UnsupportedOperationException(
    +				"Changing the namespace TypeSerializer in an incompatible way is currently not supported.");
    +		}
    +
    +		if (stateCompatibility.isIncompatible()) {
    +			if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
    +				throw new UnsupportedOperationException(
    +					"Changing the TypeSerializers of a MapState in an incompatible way is currently not supported.");
    +			}
    +
    +			LOG.info(
    +				"Performing state migration for state {} because the state serializer changed in an incompatible way.",
    +				stateDesc);
    +
    +			// we need to get an actual state instance because migration is different
    +			// for different state types. For example, ListState needs to deal with
    +			// individual elements
    +			StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
    +			if (stateFactory == null) {
    +				String message = String.format("State %s is not supported by %s",
    +					stateDesc.getClass(), this.getClass());
    +				throw new FlinkRuntimeException(message);
    +			}
    +
    +			State state = stateFactory.createState(
    +				stateDesc,
    +				Tuple2.of(stateInfo.f0, newMetaInfo),
    +				RocksDBKeyedStateBackend.this);
    +
    +			if (!(state instanceof AbstractRocksDBState)) {
    +				throw new FlinkRuntimeException(
    +					"State should be an AbstractRocksDBState but is " + state);
    +			}
    +
    +			AbstractRocksDBState rocksDBState = (AbstractRocksDBState<?, N, ?, S>) state;
    +
    +			Snapshot rocksDBSnapshot = null;
    +			RocksIteratorWrapper iterator = null;
    +
    +			try (ReadOptions readOptions = new ReadOptions();) {
    --- End diff --
    
    I would suggest to try this:
    ```
    			Snapshot rocksDBSnapshot = db.getSnapshot();
    			try (
    				ReadOptions readOptions = new ReadOptions().setSnapshot(rocksDBSnapshot);
    				RocksIteratorWrapper iterator = getRocksIterator(db, stateInfo.f0, readOptions)) {
    
    				iterator.seekToFirst();
                                     (...)
    			} finally {
    				db.releaseSnapshot(rocksDBSnapshot);
    				rocksDBSnapshot.close();
    			}
    ```


---

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202293477
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
     		return Tuple2.of(stateInfo.f0, newMetaInfo);
     	}
     
    +	private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, SV> migrateStateIfNecessary(
    +			StateDescriptor<S, SV> stateDesc,
    +			TypeSerializer<N> namespaceSerializer,
    +			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +		@SuppressWarnings("unchecked")
    +		RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> restoredMetaInfoSnapshot =
    +			(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) restoredKvStateMetaInfos.get(
    +				stateDesc.getName());
    +
    +		Preconditions.checkState(
    +			restoredMetaInfoSnapshot != null,
    +			"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
    +				" but its corresponding restored snapshot cannot be found.");
    +
    +		StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc);
    +
    +		TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +		RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    +			stateDesc.getType(),
    +			stateDesc.getName(),
    +			namespaceSerializer,
    +			stateSerializer);
    +
    +		// check compatibility results to determine if state migration is required
    +		TypeSerializerSchemaCompatibility<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +			namespaceSerializer);
    +
    +		TypeSerializerSchemaCompatibility<SV> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +			stateSerializer);
    +
    +		if (namespaceCompatibility.isIncompatible()) {
    +			throw new UnsupportedOperationException(
    +				"Changing the namespace TypeSerializer in an incompatible way is currently not supported.");
    +		}
    +
    +		if (stateCompatibility.isIncompatible()) {
    +			if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
    +				throw new UnsupportedOperationException(
    +					"Changing the TypeSerializers of a MapState in an incompatible way is currently not supported.");
    +			}
    +
    +			LOG.info(
    +				"Performing state migration for state {} because the state serializer changed in an incompatible way.",
    +				stateDesc);
    +
    +			// we need to get an actual state instance because migration is different
    +			// for different state types. For example, ListState needs to deal with
    +			// individual elements
    +			StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
    +			if (stateFactory == null) {
    +				String message = String.format("State %s is not supported by %s",
    +					stateDesc.getClass(), this.getClass());
    +				throw new FlinkRuntimeException(message);
    +			}
    +
    +			State state = stateFactory.createState(
    +				stateDesc,
    +				Tuple2.of(stateInfo.f0, newMetaInfo),
    +				RocksDBKeyedStateBackend.this);
    +
    +			if (!(state instanceof AbstractRocksDBState)) {
    +				throw new FlinkRuntimeException(
    +					"State should be an AbstractRocksDBState but is " + state);
    +			}
    +
    +			AbstractRocksDBState rocksDBState = (AbstractRocksDBState<?, N, ?, S>) state;
    +
    +			Snapshot rocksDBSnapshot = null;
    +			RocksIteratorWrapper iterator = null;
    +
    +			try (ReadOptions readOptions = new ReadOptions();) {
    +				// TODO: can I do this with try-with-resource or do I always have to call
    +				// db.releaseSnapshot()
    +				// I think I can't use try-with-resource anyways since I have to set the snapshot
    +				// on the ReadOptions
    +
    +				rocksDBSnapshot = db.getSnapshot();
    +				readOptions.setSnapshot(rocksDBSnapshot);
    +
    +				iterator = getRocksIterator(db, stateInfo.f0, readOptions);
    +				iterator.seekToFirst();
    +
    +				while (iterator.isValid()) {
    +
    +					byte[] serializedValue = iterator.value();
    +
    +					byte[] migratedSerializedValue = rocksDBState.migrateSerializedValue(
    +						serializedValue,
    +						restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot().restoreSerializer(),
    +						stateDesc.getSerializer());
    +
    +					db.put(stateInfo.f0, iterator.key(), migratedSerializedValue);
    +
    +					iterator.next();
    +				}
    +			} finally {
    +				if (iterator != null) {
    +					iterator.close();
    +				}
    +				if (rocksDBSnapshot != null) {
    +					db.releaseSnapshot(rocksDBSnapshot);
    +					// TODO: do I need to call close() or is calling db.releaseSnapshot() enough
    +					rocksDBSnapshot.close();
    --- End diff --
    
    It is backed by a native objects, so always call `close` if the method is there


---

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202293686
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
     		return Tuple2.of(stateInfo.f0, newMetaInfo);
     	}
     
    +	private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, SV> migrateStateIfNecessary(
    +			StateDescriptor<S, SV> stateDesc,
    +			TypeSerializer<N> namespaceSerializer,
    +			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +		@SuppressWarnings("unchecked")
    +		RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> restoredMetaInfoSnapshot =
    +			(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) restoredKvStateMetaInfos.get(
    +				stateDesc.getName());
    +
    +		Preconditions.checkState(
    +			restoredMetaInfoSnapshot != null,
    +			"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
    +				" but its corresponding restored snapshot cannot be found.");
    +
    +		StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc);
    +
    +		TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +		RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    +			stateDesc.getType(),
    +			stateDesc.getName(),
    +			namespaceSerializer,
    +			stateSerializer);
    +
    +		// check compatibility results to determine if state migration is required
    +		TypeSerializerSchemaCompatibility<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +			namespaceSerializer);
    +
    +		TypeSerializerSchemaCompatibility<SV> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +			stateSerializer);
    +
    +		if (namespaceCompatibility.isIncompatible()) {
    +			throw new UnsupportedOperationException(
    +				"Changing the namespace TypeSerializer in an incompatible way is currently not supported.");
    +		}
    +
    +		if (stateCompatibility.isIncompatible()) {
    +			if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
    +				throw new UnsupportedOperationException(
    +					"Changing the TypeSerializers of a MapState in an incompatible way is currently not supported.");
    +			}
    +
    +			LOG.info(
    +				"Performing state migration for state {} because the state serializer changed in an incompatible way.",
    +				stateDesc);
    +
    +			// we need to get an actual state instance because migration is different
    +			// for different state types. For example, ListState needs to deal with
    +			// individual elements
    +			StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
    +			if (stateFactory == null) {
    +				String message = String.format("State %s is not supported by %s",
    +					stateDesc.getClass(), this.getClass());
    +				throw new FlinkRuntimeException(message);
    +			}
    +
    +			State state = stateFactory.createState(
    +				stateDesc,
    +				Tuple2.of(stateInfo.f0, newMetaInfo),
    +				RocksDBKeyedStateBackend.this);
    +
    +			if (!(state instanceof AbstractRocksDBState)) {
    +				throw new FlinkRuntimeException(
    +					"State should be an AbstractRocksDBState but is " + state);
    +			}
    +
    +			AbstractRocksDBState rocksDBState = (AbstractRocksDBState<?, N, ?, S>) state;
    --- End diff --
    
    Avoid using raw types for the reference.


---

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202289229
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
     		return Tuple2.of(stateInfo.f0, newMetaInfo);
     	}
     
    +	private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, SV> migrateStateIfNecessary(
    +			StateDescriptor<S, SV> stateDesc,
    +			TypeSerializer<N> namespaceSerializer,
    +			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +		@SuppressWarnings("unchecked")
    +		RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> restoredMetaInfoSnapshot =
    +			(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) restoredKvStateMetaInfos.get(
    +				stateDesc.getName());
    +
    +		Preconditions.checkState(
    +			restoredMetaInfoSnapshot != null,
    +			"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
    +				" but its corresponding restored snapshot cannot be found.");
    +
    +		StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc);
    +
    +		TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +		RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    +			stateDesc.getType(),
    +			stateDesc.getName(),
    +			namespaceSerializer,
    +			stateSerializer);
    +
    +		// check compatibility results to determine if state migration is required
    +		TypeSerializerSchemaCompatibility<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +			namespaceSerializer);
    +
    +		TypeSerializerSchemaCompatibility<SV> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +			stateSerializer);
    +
    +		if (namespaceCompatibility.isIncompatible()) {
    +			throw new UnsupportedOperationException(
    +				"Changing the namespace TypeSerializer in an incompatible way is currently not supported.");
    +		}
    +
    +		if (stateCompatibility.isIncompatible()) {
    --- End diff --
    
    The handling of this branch could maybe go to it's own private method to break down this big monolithic method a bit.


---

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202295131
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1298,6 +1283,128 @@ private void copyStateDataHandleData(
     		return Tuple2.of(stateInfo.f0, newMetaInfo);
     	}
     
    +	private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, SV> migrateStateIfNecessary(
    +			StateDescriptor<S, SV> stateDesc,
    +			TypeSerializer<N> namespaceSerializer,
    +			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +		@SuppressWarnings("unchecked")
    +		RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> restoredMetaInfoSnapshot =
    +			(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) restoredKvStateMetaInfos.get(
    +				stateDesc.getName());
    +
    +		Preconditions.checkState(
    +			restoredMetaInfoSnapshot != null,
    +			"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
    +				" but its corresponding restored snapshot cannot be found.");
    +
    +		StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc);
    +
    +		TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +		RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    +			stateDesc.getType(),
    +			stateDesc.getName(),
    +			namespaceSerializer,
    +			stateSerializer);
    +
    +		// check compatibility results to determine if state migration is required
    +		TypeSerializerSchemaCompatibility<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +			namespaceSerializer);
    +
    +		TypeSerializerSchemaCompatibility<SV> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    +			restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +			stateSerializer);
    +
    +		if (namespaceCompatibility.isIncompatible()) {
    +			throw new UnsupportedOperationException(
    +				"Changing the namespace TypeSerializer in an incompatible way is currently not supported.");
    +		}
    +
    +		if (stateCompatibility.isIncompatible()) {
    +			if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
    +				throw new UnsupportedOperationException(
    +					"Changing the TypeSerializers of a MapState in an incompatible way is currently not supported.");
    +			}
    +
    +			LOG.info(
    +				"Performing state migration for state {} because the state serializer changed in an incompatible way.",
    +				stateDesc);
    +
    +			// we need to get an actual state instance because migration is different
    +			// for different state types. For example, ListState needs to deal with
    +			// individual elements
    +			StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
    +			if (stateFactory == null) {
    +				String message = String.format("State %s is not supported by %s",
    +					stateDesc.getClass(), this.getClass());
    +				throw new FlinkRuntimeException(message);
    +			}
    +
    +			State state = stateFactory.createState(
    +				stateDesc,
    +				Tuple2.of(stateInfo.f0, newMetaInfo),
    +				RocksDBKeyedStateBackend.this);
    +
    +			if (!(state instanceof AbstractRocksDBState)) {
    +				throw new FlinkRuntimeException(
    +					"State should be an AbstractRocksDBState but is " + state);
    +			}
    +
    +			AbstractRocksDBState rocksDBState = (AbstractRocksDBState<?, N, ?, S>) state;
    +
    +			Snapshot rocksDBSnapshot = null;
    +			RocksIteratorWrapper iterator = null;
    +
    +			try (ReadOptions readOptions = new ReadOptions();) {
    +				// TODO: can I do this with try-with-resource or do I always have to call
    +				// db.releaseSnapshot()
    +				// I think I can't use try-with-resource anyways since I have to set the snapshot
    +				// on the ReadOptions
    +
    +				rocksDBSnapshot = db.getSnapshot();
    --- End diff --
    
    Not sure if we even need a snapshot iterator in this case right now.


---

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202285840
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java ---
    @@ -99,7 +99,8 @@
     		// TODO this method actually should not have a default implementation;
     		// TODO this placeholder should be removed as soon as all subclasses have a proper implementation in place, and
     		// TODO the method is properly integrated in state backends' restore procedures
    -		throw new UnsupportedOperationException();
    +//		throw new UnsupportedOperationException();
    --- End diff --
    
    Remove this line.


---