You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2018/04/20 13:36:23 UTC

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5885

    [FLINK-8715] Remove usage of StateDescriptor in state handles

    ## What is the purpose of the change
    
    This PR is WIP, and is still lacking test coverage.
    It is opened now to collect some feedback for a proposed solution for FLINK-8715.
    
    Previously, reconfigured state serializers on restore were not properly forwarded to the state handles. In the past, the `StateDescriptor` served as the holder for the reconfigured serializer.
    However, since 88ffad27, `StateDescriptor#getSerializer()` started giving out duplicates of the serializer, which caused reconfigured serializers to be a completely different copy then what the state handles were using.
    
    This fix corrects this by explicitly forwarding the serializer to the instantiated state handles after the state is registered at the state backend. It also eliminates the use of `StateDescriptor`s internally in the state handles, so that the behaviour is independent of the `StateDescriptor#getSerializer()` method's implementation.
    
    The alternative to this approach is to have an internal `setSerializer` method on the `StateDescriptor`, which should be used after state serializers are reconfigured on registration.
    Then, that assures that handed out serializers by the descriptor are always reconfigured, as soon as the descriptor is registered at the backend.
    
    ## Brief change log
    
    - Remove `StateDescriptor`s from heap / RocksDB state handle classes
    - Forwards state serializer and any other necessary information provided by the state descriptor (e.g. default value, user functions, nested serializers, etc.) when instantiating state handles.
    
    ## Verifying this change
    
    This fix still lacks test coverage.
    It has been opened to collect feedback for the approach.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / (**no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (**yes** / no / don't know)
      - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-8715

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5885.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5885
    
----
commit c092dd6518d9e6f47f4cfc797c18bedc8a89cc05
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-04-20T13:15:42Z

    [FLINK-8715] Remove usage of StateDescriptor in state handles

----


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184691691
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
    -	 * we don't restore the individual k/v states, just the global RocksDB database and the
    -	 * list of column families. When a k/v state is first requested we check here whether we
    -	 * already have a column family for that and return it or create a new one if it doesn't exist.
    +	 * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers.
     	 *
    -	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
    -	 * that we checkpointed, i.e. is already in the map of column families.
    +	 * When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and
    +	 * the list of k/v state information. When a k/v state is first requested we check here whether we
    +	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
    +	 * or create a new one if it does not exist.
     	 */
    -	@SuppressWarnings("rawtypes, unchecked")
    -	protected <N, S> ColumnFamilyHandle getColumnFamily(
    -		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
    +	private Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tryRegisterKvStateInformation(
    --- End diff --
    
    And we can remove also the lines that suppress warnings


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184690567
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
    -	 * we don't restore the individual k/v states, just the global RocksDB database and the
    -	 * list of column families. When a k/v state is first requested we check here whether we
    -	 * already have a column family for that and return it or create a new one if it doesn't exist.
    +	 * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers.
     	 *
    -	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
    -	 * that we checkpointed, i.e. is already in the map of column families.
    +	 * When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and
    +	 * the list of k/v state information. When a k/v state is first requested we check here whether we
    +	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
    +	 * or create a new one if it does not exist.
     	 */
    -	@SuppressWarnings("rawtypes, unchecked")
    -	protected <N, S> ColumnFamilyHandle getColumnFamily(
    -		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
    +	private Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tryRegisterKvStateInformation(
    --- End diff --
    
    We could rewrite this as
    ```
    	private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(
    			StateDescriptor<?, S> stateDesc,
    			TypeSerializer<N> namespaceSerializer) throws StateMigrationException, IOException {
    
    		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredInfo =
    			this.kvStateInformation.get(stateDesc.getName());
    
    		if (registeredInfo != null) {
    
    			@SuppressWarnings("unchecked")
    			RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfoSnapshot =
    				restoredKvStateMetaInfos.get(stateDesc.getName());
    
    			Preconditions.checkState(
    				restoredMetaInfoSnapshot != null,
    				"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
    					" but its corresponding restored snapshot cannot be found.");
    
    			RegisteredKeyedBackendStateMetaInfo<N, S> resolveKvStateCompatibility =
    				RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
    				restoredMetaInfoSnapshot,
    				namespaceSerializer,
    				stateDesc);
    
    			registeredInfo.f1 = resolveKvStateCompatibility;
    
    			return Tuple2.of(registeredInfo.f0, resolveKvStateCompatibility);
    		} else {
    			String stateName = stateDesc.getName();
    			RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    				stateDesc.getType(),
    				stateName,
    				namespaceSerializer,
    				stateDesc.getSerializer());
    
    			ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
    			registeredInfo = Tuple2.of(columnFamily, newMetaInfo);
    			this.kvStateInformation.put(stateDesc.getName(), registeredInfo);
    			return Tuple2.of(columnFamily, newMetaInfo);
    		}
    	}
    ```
    
    and get rid of all the individual casts.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184086555
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java ---
    @@ -47,21 +47,23 @@
     	/**
     	 * Creates a new key/value state for the given hash map of key/value pairs.
     	 *
    -	 * @param stateDesc
    -	 *             The state identifier for the state. This contains name and can create a default state value.
    +	 * @param valueSerializer
    +	 *             The serializer for the state.
     	 * @param stateTable
     	 *             The state table to use in this kev/value state. May contain initial state.
     	 * @param namespaceSerializer
     	 *             The serializer for the type that indicates the namespace
     	 */
     	public HeapAggregatingState(
    -			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc,
     			StateTable<K, N, ACC> stateTable,
     			TypeSerializer<K> keySerializer,
    -			TypeSerializer<N> namespaceSerializer) {
    +			TypeSerializer<ACC> valueSerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			ACC defaultValue,
    --- End diff --
    
    The comments are not updated to reflect this and the next line.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184696134
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
    -	 * we don't restore the individual k/v states, just the global RocksDB database and the
    -	 * list of column families. When a k/v state is first requested we check here whether we
    -	 * already have a column family for that and return it or create a new one if it doesn't exist.
    +	 * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers.
     	 *
    -	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
    -	 * that we checkpointed, i.e. is already in the map of column families.
    +	 * When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and
    +	 * the list of k/v state information. When a k/v state is first requested we check here whether we
    +	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
    +	 * or create a new one if it does not exist.
     	 */
    -	@SuppressWarnings("rawtypes, unchecked")
    -	protected <N, S> ColumnFamilyHandle getColumnFamily(
    -		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
    +	private Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tryRegisterKvStateInformation(
    --- End diff --
    
    This method is rarely invoked, the tuples are not immutable so giving a defensive copy can also have its benefits an I feel like this is cleaner than having casts all over the place. There might also be different ways to solve the general problem, but this feels better to me than the initial version.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184084995
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java ---
    @@ -42,33 +42,35 @@
     	/** Map containing the actual key/value pairs. */
     	protected final StateTable<K, N, SV> stateTable;
     
    -	/** This holds the name of the state and can create an initial default value for the state. */
    -	protected final SD stateDesc;
    --- End diff --
    
    I think with removing this, you can also remove SD from the generic type of this class, and transitively from the subclasses as well.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184840215
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---
    @@ -455,4 +455,5 @@ public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
     	@VisibleForTesting
     	public abstract int numStateEntries();
     
    +
    --- End diff --
    
    Will revert 👍 


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184093390
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java ---
    @@ -103,17 +104,17 @@ public void add(T value) throws IOException {
     
     	private static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
     
    -		private final FoldingStateDescriptor<T, ACC> stateDescriptor;
    +		private final HeapFoldingState<?, ?, T, ACC> stateRef;
     		private final FoldFunction<T, ACC> foldFunction;
     
    -		FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
    -			this.stateDescriptor = Preconditions.checkNotNull(stateDesc);
    -			this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction());
    +		FoldTransformation(FoldFunction<T, ACC> foldFunction, HeapFoldingState<?, ?, T, ACC> stateRef) {
    +			this.stateRef = Preconditions.checkNotNull(stateRef);
    --- End diff --
    
    Maybe the more honest and simple way is making this a non-static inner class instead of passing a reference to `HeapFoldingState.this` in the constructor. Essentially it does the same.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r183309701
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1177,7 +1177,7 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     				throw new StateMigrationException("State migration isn't supported, yet.");
     			} else {
     				stateInfo.f1 = newMetaInfo;
    -				return stateInfo.f0;
    +				return Tuple2.of(stateInfo.f0, newMetaInfo.getStateSerializer());
    --- End diff --
    
    I agree here, that we should let the meta info be immutable, and let the compatibility check result carry the compatible, reconfigured serializer.
    
    However, one issue is that this requires changes to the `CompatibilityResult` interface which is part of the public API. I would prefer not to touch the API now as we're approaching release. It would be possible to by-pass this by maybe introducing an internal compat result class, but downsides are - 1) that would have almost identical implementation to `CompatibilityResult`, and 2) that would entail touching a lot of our more complex serializer's code, because they use `CompatibilityUtil.resolveCompatibilityResult`.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184085595
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---
    @@ -61,8 +64,7 @@
     	/** The column family of this particular instance of state. */
     	protected ColumnFamilyHandle columnFamily;
     
    -	/** State descriptor from which to create this state instance. */
    -	protected final SD stateDesc;
    --- End diff --
    
    Same as for the heap state also applies here, you can remove `SD` from the generic types of this class and the subclasses.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r183996477
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1125,59 +1125,62 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     	 * that we checkpointed, i.e. is already in the map of column families.
     	 */
     	@SuppressWarnings("rawtypes, unchecked")
    -	protected <N, S> ColumnFamilyHandle getColumnFamily(
    +	protected <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> getColumnFamilyAndStateSerializer(
    --- End diff --
    
    I think this method has grown way too complex over time, and looking at the `Tuple2` return type it becomes more and more clear that this code is mixing up 2 different concerns and could be untangled a bit. I would suggest to separate this into: 
    1) checking if this is a new state (does the map contain the name string), this is like a inlined check in current calling code. 
    2) If yes, do the serializer checks and configuration magic and create the `RegisteredKeyedBackendStateMetaInfo`. this goes to a separate method that is called by the current caller.
    3) Request the column family, either by new registration or the existing one. can use the result from step 1 or recheck. this goes in another separate method called by the current caller.
    x) Optional: helper method that does steps 1-3 if we otherwise duplicate them too much.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184830451
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---
    @@ -455,4 +455,5 @@ public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
     	@VisibleForTesting
     	public abstract int numStateEntries();
     
    +
    --- End diff --
    
    revert this?


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r183214737
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1177,7 +1177,7 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     				throw new StateMigrationException("State migration isn't supported, yet.");
     			} else {
     				stateInfo.f1 = newMetaInfo;
    -				return stateInfo.f0;
    +				return Tuple2.of(stateInfo.f0, newMetaInfo.getStateSerializer());
    --- End diff --
    
    Mirroring a the result from an offline discussion here:
    
    This is a bit fragile - the fact that the `newMetaInfo` is mutable and the serializer is altered in there and then obtained from there again. Makes it harder for future maintainers and easy to accidentally break in the future.
    
    The meta info should be immutable, and the re-configured serializer (or the original, if no reconfiguration is needed) would probably best be part of the compatiblity result.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184104126
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -185,36 +188,44 @@ public HeapKeyedStateBackend(
     				stateName.equals(stateTable.getMetaInfo().getName()),
     				"Incompatible state names. " +
     					"Was [" + stateTable.getMetaInfo().getName() + "], " +
    -					"registered with [" + newMetaInfo.getName() + "].");
    +					"registered with [" + stateName + "].");
     
    -			if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
    +			if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
     					&& !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
     
     				Preconditions.checkState(
    -					newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
    +					stateType.equals(stateTable.getMetaInfo().getStateType()),
     					"Incompatible state types. " +
     						"Was [" + stateTable.getMetaInfo().getStateType() + "], " +
    -						"registered with [" + newMetaInfo.getStateType() + "].");
    +						"registered with [" + stateType + "].");
     			}
     
     			@SuppressWarnings("unchecked")
     			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo =
     				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName);
     
     			// check compatibility results to determine if state migration is required
    +			TypeSerializer<N> newNamespaceSerializer = namespaceSerializer.duplicate();
     			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
     					restoredMetaInfo.getNamespaceSerializer(),
     					null,
     					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    -					newMetaInfo.getNamespaceSerializer());
    +					newNamespaceSerializer);
     
    +			TypeSerializer<V> newValueSerializer = valueSerializer.duplicate();
    --- End diff --
    
    Similar to my comment on the Rocks code, this duplicate seems redundant, because the serializer also comes straight from a `StateDescriptor` which duplicates before handing it out. One thing to consider when removing this call: it is just less obvious here that the serializer was already duplicated, so maybe it would be good to pass the state descriptor as argument and get the serializer directly here to avoid any surprises for people working on this in the future.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184657676
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---
    @@ -61,8 +64,7 @@
     	/** The column family of this particular instance of state. */
     	protected ColumnFamilyHandle columnFamily;
     
    -	/** State descriptor from which to create this state instance. */
    -	protected final SD stateDesc;
    --- End diff --
    
    Fixed, thanks for catching this.


---

[GitHub] flink issue #5885: [FLINK-8715] Remove usage of StateDescriptor in state han...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5885
  
    Hi @StefanRRichter, I've addressed your comments and also rebased the PR.
    Could you have a final pass through this?


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184657584
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -185,36 +188,44 @@ public HeapKeyedStateBackend(
     				stateName.equals(stateTable.getMetaInfo().getName()),
     				"Incompatible state names. " +
     					"Was [" + stateTable.getMetaInfo().getName() + "], " +
    -					"registered with [" + newMetaInfo.getName() + "].");
    +					"registered with [" + stateName + "].");
     
    -			if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
    +			if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
     					&& !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
     
     				Preconditions.checkState(
    -					newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
    +					stateType.equals(stateTable.getMetaInfo().getStateType()),
     					"Incompatible state types. " +
     						"Was [" + stateTable.getMetaInfo().getStateType() + "], " +
    -						"registered with [" + newMetaInfo.getStateType() + "].");
    +						"registered with [" + stateType + "].");
     			}
     
     			@SuppressWarnings("unchecked")
     			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo =
     				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName);
     
     			// check compatibility results to determine if state migration is required
    +			TypeSerializer<N> newNamespaceSerializer = namespaceSerializer.duplicate();
     			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
     					restoredMetaInfo.getNamespaceSerializer(),
     					null,
     					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    -					newMetaInfo.getNamespaceSerializer());
    +					newNamespaceSerializer);
     
    +			TypeSerializer<V> newValueSerializer = valueSerializer.duplicate();
    --- End diff --
    
    I've updated this according to your suggestion:
    The reconfiguration method now gets a state descriptor as an argument to reduce confusion.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184087357
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java ---
    @@ -49,17 +49,18 @@
     	/**
     	 * Creates a new key/value state for the given hash map of key/value pairs.
     	 *
    -	 * @param stateDesc The state identifier for the state. This contains name
    -	 *                           and can create a default state value.
    +	 * @param valueSerializer The serializer for the state.
     	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
     	 */
     	public HeapFoldingState(
    -			FoldingStateDescriptor<T, ACC> stateDesc,
     			StateTable<K, N, ACC> stateTable,
     			TypeSerializer<K> keySerializer,
    -			TypeSerializer<N> namespaceSerializer) {
    -		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
    -		this.foldTransformation = new FoldTransformation<>(stateDesc);
    +			TypeSerializer<ACC> valueSerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			ACC defaultValue,
    --- End diff --
    
    Comments require update.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184840230
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
    -	 * we don't restore the individual k/v states, just the global RocksDB database and the
    -	 * list of column families. When a k/v state is first requested we check here whether we
    -	 * already have a column family for that and return it or create a new one if it doesn't exist.
    +	 * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers.
     	 *
    -	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
    -	 * that we checkpointed, i.e. is already in the map of column families.
    +	 * When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and
    +	 * the list of k/v state information. When a k/v state is first requested we check here whether we
    +	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
    +	 * or create a new one if it does not exist.
     	 */
    -	@SuppressWarnings("rawtypes, unchecked")
    -	protected <N, S> ColumnFamilyHandle getColumnFamily(
    -		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
    +	private Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tryRegisterKvStateInformation(
    --- End diff --
    
    Will change this as suggested!


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184657419
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java ---
    @@ -103,17 +104,17 @@ public void add(T value) throws IOException {
     
     	private static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
     
    -		private final FoldingStateDescriptor<T, ACC> stateDescriptor;
    +		private final HeapFoldingState<?, ?, T, ACC> stateRef;
     		private final FoldFunction<T, ACC> foldFunction;
     
    -		FoldTransformation(FoldingStateDescriptor<T, ACC> stateDesc) {
    -			this.stateDescriptor = Preconditions.checkNotNull(stateDesc);
    -			this.foldFunction = Preconditions.checkNotNull(stateDesc.getFoldFunction());
    +		FoldTransformation(FoldFunction<T, ACC> foldFunction, HeapFoldingState<?, ?, T, ACC> stateRef) {
    +			this.stateRef = Preconditions.checkNotNull(stateRef);
    --- End diff --
    
    Done 👌 


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184846476
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---
    @@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace(
     		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
     		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
     	}
    +
    +	protected V getDefaultValue() {
    --- End diff --
    
    Since this PR is blocking a bug for the 1.5 release, I'll proceed to merge this as it is.
    @bowenli86 Perhaps we can open a separate JIRA for this to keep this in mind?


---

[GitHub] flink issue #5885: [FLINK-8715] Remove usage of StateDescriptor in state han...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5885
  
    Had two more suggestions, otherwise this is 👍 for merging.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184694977
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
    -	 * we don't restore the individual k/v states, just the global RocksDB database and the
    -	 * list of column families. When a k/v state is first requested we check here whether we
    -	 * already have a column family for that and return it or create a new one if it doesn't exist.
    +	 * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers.
     	 *
    -	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
    -	 * that we checkpointed, i.e. is already in the map of column families.
    +	 * When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and
    +	 * the list of k/v state information. When a k/v state is first requested we check here whether we
    +	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
    +	 * or create a new one if it does not exist.
     	 */
    -	@SuppressWarnings("rawtypes, unchecked")
    -	protected <N, S> ColumnFamilyHandle getColumnFamily(
    -		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
    +	private Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tryRegisterKvStateInformation(
    --- End diff --
    
    I did not go with this approach because of the extra tuples introduced.
    Though, TBH, I wasn't sure which was the better approach, this or the one you pointed out.
    
    I would not be against this version.


---

[GitHub] flink issue #5885: [FLINK-8715] Remove usage of StateDescriptor in state han...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5885
  
    +1 to the general approach here
    Should add some tests (also checking why previous state migration tests did not catch this)


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184101616
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1125,59 +1125,62 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     	 * that we checkpointed, i.e. is already in the map of column families.
     	 */
     	@SuppressWarnings("rawtypes, unchecked")
    -	protected <N, S> ColumnFamilyHandle getColumnFamily(
    +	protected <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> getColumnFamilyAndStateSerializer(
     		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
     
     		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
     			kvStateInformation.get(descriptor.getName());
     
    -		RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    -			descriptor.getType(),
    -			descriptor.getName(),
    -			namespaceSerializer,
    -			descriptor.getSerializer());
    -
     		if (stateInfo != null) {
     			// TODO with eager registration in place, these checks should be moved to restore()
     
     			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
     				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
     
     			Preconditions.checkState(
    -				Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()),
    +				Objects.equals(descriptor.getName(), restoredMetaInfo.getName()),
     				"Incompatible state names. " +
     					"Was [" + restoredMetaInfo.getName() + "], " +
    -					"registered with [" + newMetaInfo.getName() + "].");
    +					"registered with [" + descriptor.getName() + "].");
     
    -			if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)
    +			if (!Objects.equals(descriptor.getType(), StateDescriptor.Type.UNKNOWN)
     				&& !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) {
     
     				Preconditions.checkState(
    -					newMetaInfo.getStateType() == restoredMetaInfo.getStateType(),
    +					descriptor.getType() == restoredMetaInfo.getStateType(),
     					"Incompatible state types. " +
     						"Was [" + restoredMetaInfo.getStateType() + "], " +
    -						"registered with [" + newMetaInfo.getStateType() + "].");
    +						"registered with [" + descriptor.getType() + "].");
     			}
     
     			// check compatibility results to determine if state migration is required
    +			TypeSerializer<N> newNamespaceSerializer = namespaceSerializer.duplicate();
     			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
     				restoredMetaInfo.getNamespaceSerializer(),
     				null,
     				restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    -				newMetaInfo.getNamespaceSerializer());
    +				newNamespaceSerializer);
     
    +			TypeSerializer<S> newStateSerializer = descriptor.getSerializer().duplicate();
    --- End diff --
    
    The `duplicate()` here looks redundant because it comes from the descriptor that already duplicates.


---

[GitHub] flink issue #5885: [FLINK-8715] Remove usage of StateDescriptor in state han...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5885
  
    Updates:
    
    - 4953ad0 Treat state backend registration meta info as immutable. The new meta info is instantiated with the reconfigured serializer, instead of instantiating it with the pre-reconfigured serializer and altering it in there. 
    
    - b7a0ed0 Add a new test `StateBackendTestBase#testStateSerializerReconfiguration` that verifies 1) state serializers are reconfigured, and 2) state handles actually use the reconfigured serializer. This is a common test for all state backends.
    
    @StefanRRichter @aljoscha the PR is ready for another review. Could you have a look?


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184087656
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java ---
    @@ -49,17 +49,18 @@
     	/**
     	 * Creates a new key/value state for the given hash map of key/value pairs.
     	 *
    -	 * @param stateDesc The state identifier for the state. This contains name
    -	 *                           and can create a default state value.
    +	 * @param valueSerializer The serializer for the state.
     	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
     	 */
     	public HeapFoldingState(
    -			FoldingStateDescriptor<T, ACC> stateDesc,
     			StateTable<K, N, ACC> stateTable,
     			TypeSerializer<K> keySerializer,
    -			TypeSerializer<N> namespaceSerializer) {
    -		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
    -		this.foldTransformation = new FoldTransformation<>(stateDesc);
    +			TypeSerializer<ACC> valueSerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			ACC defaultValue,
    --- End diff --
    
    I think you need to double check this on every state class, they look all not updated.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5885


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184087107
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java ---
    @@ -42,33 +42,35 @@
     	/** Map containing the actual key/value pairs. */
     	protected final StateTable<K, N, SV> stateTable;
     
    -	/** This holds the name of the state and can create an initial default value for the state. */
    -	protected final SD stateDesc;
    -
     	/** The current namespace, which the access methods will refer to. */
     	protected N currentNamespace;
     
     	protected final TypeSerializer<K> keySerializer;
     
    +	protected final TypeSerializer<SV> valueSerializer;
    +
     	protected final TypeSerializer<N> namespaceSerializer;
     
    +	private final SV defaultValue;
    +
     	/**
     	 * Creates a new key/value state for the given hash map of key/value pairs.
     	 *
    -	 * @param stateDesc The state identifier for the state. This contains name
    -	 *                           and can create a default state value.
    +	 * @param valueSerializer The serializer for the state.
     	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
     	 */
     	protected AbstractHeapState(
    -			SD stateDesc,
     			StateTable<K, N, SV> stateTable,
     			TypeSerializer<K> keySerializer,
    -			TypeSerializer<N> namespaceSerializer) {
    +			TypeSerializer<SV> valueSerializer,
    +			TypeSerializer<N> namespaceSerializer,
    --- End diff --
    
    Comments require update.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184657398
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java ---
    @@ -49,17 +49,18 @@
     	/**
     	 * Creates a new key/value state for the given hash map of key/value pairs.
     	 *
    -	 * @param stateDesc The state identifier for the state. This contains name
    -	 *                           and can create a default state value.
    +	 * @param valueSerializer The serializer for the state.
     	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
     	 */
     	public HeapFoldingState(
    -			FoldingStateDescriptor<T, ACC> stateDesc,
     			StateTable<K, N, ACC> stateTable,
     			TypeSerializer<K> keySerializer,
    -			TypeSerializer<N> namespaceSerializer) {
    -		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
    -		this.foldTransformation = new FoldTransformation<>(stateDesc);
    +			TypeSerializer<ACC> valueSerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			ACC defaultValue,
    --- End diff --
    
    Javadocs for all state classes are now updated.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184841123
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---
    @@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace(
     		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
     		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
     	}
    +
    +	protected V getDefaultValue() {
    --- End diff --
    
    Not too sure about this one.
    
    That would require introducing 2 methods in the `InternalKvState`:
    1. A getter method that returns the default value.
    2. A default method that actually does the serialization copying of the default value (the current `getDefaultValue` method).


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184098081
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -185,36 +188,44 @@ public HeapKeyedStateBackend(
     				stateName.equals(stateTable.getMetaInfo().getName()),
     				"Incompatible state names. " +
     					"Was [" + stateTable.getMetaInfo().getName() + "], " +
    -					"registered with [" + newMetaInfo.getName() + "].");
    +					"registered with [" + stateName + "].");
     
    -			if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
    +			if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
     					&& !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
     
     				Preconditions.checkState(
    -					newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
    +					stateType.equals(stateTable.getMetaInfo().getStateType()),
     					"Incompatible state types. " +
     						"Was [" + stateTable.getMetaInfo().getStateType() + "], " +
    -						"registered with [" + newMetaInfo.getStateType() + "].");
    +						"registered with [" + stateType + "].");
     			}
     
     			@SuppressWarnings("unchecked")
     			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo =
     				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName);
     
     			// check compatibility results to determine if state migration is required
    +			TypeSerializer<N> newNamespaceSerializer = namespaceSerializer.duplicate();
    --- End diff --
    
    Just curious, why do we need to duplicate the serializer here but not in all other places like where `resolveCompatibilityResult()` is called? Or asked differently, should `resolveCompatibilityResult()` always do duplication internally or not at all or is this just as intended?


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184675164
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1116,148 +1115,177 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
    -	 * we don't restore the individual k/v states, just the global RocksDB database and the
    -	 * list of column families. When a k/v state is first requested we check here whether we
    -	 * already have a column family for that and return it or create a new one if it doesn't exist.
    +	 * Registers a k/v state information, which includes its state id, type, RocksDB column family handle, and serializers.
     	 *
    -	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
    -	 * that we checkpointed, i.e. is already in the map of column families.
    +	 * When restoring from a snapshot, we don’t restore the individual k/v states, just the global RocksDB database and
    +	 * the list of k/v state information. When a k/v state is first requested we check here whether we
    +	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
    +	 * or create a new one if it does not exist.
     	 */
    -	@SuppressWarnings("rawtypes, unchecked")
    -	protected <N, S> ColumnFamilyHandle getColumnFamily(
    -		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
    +	private Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tryRegisterKvStateInformation(
    +			StateDescriptor<?, ?> stateDesc,
    +			TypeSerializer<?> namespaceSerializer) throws StateMigrationException, IOException {
     
     		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
    -			kvStateInformation.get(descriptor.getName());
    -
    -		RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    -			descriptor.getType(),
    -			descriptor.getName(),
    -			namespaceSerializer,
    -			descriptor.getSerializer());
    +			kvStateInformation.get(stateDesc.getName());
     
    +		RegisteredKeyedBackendStateMetaInfo<?, ?> newMetaInfo;
     		if (stateInfo != null) {
    -			// TODO with eager registration in place, these checks should be moved to restore()
     
    -			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
    -				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
    +			@SuppressWarnings("unchecked")
    +			RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfoSnapshot =
    +				restoredKvStateMetaInfos.get(stateDesc.getName());
     
     			Preconditions.checkState(
    -				Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()),
    -				"Incompatible state names. " +
    -					"Was [" + restoredMetaInfo.getName() + "], " +
    -					"registered with [" + newMetaInfo.getName() + "].");
    -
    -			if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)
    -				&& !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) {
    -
    -				Preconditions.checkState(
    -					newMetaInfo.getStateType() == restoredMetaInfo.getStateType(),
    -					"Incompatible state types. " +
    -						"Was [" + restoredMetaInfo.getStateType() + "], " +
    -						"registered with [" + newMetaInfo.getStateType() + "].");
    -			}
    +				restoredMetaInfoSnapshot != null,
    +				"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
    +					" but its corresponding restored snapshot cannot be found.");
     
    -			// check compatibility results to determine if state migration is required
    -			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    -				restoredMetaInfo.getNamespaceSerializer(),
    -				null,
    -				restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    -				newMetaInfo.getNamespaceSerializer());
    +			newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
    +				restoredMetaInfoSnapshot,
    +				namespaceSerializer,
    +				stateDesc);
     
    -			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
    -				restoredMetaInfo.getStateSerializer(),
    -				UnloadableDummyTypeSerializer.class,
    -				restoredMetaInfo.getStateSerializerConfigSnapshot(),
    -				newMetaInfo.getStateSerializer());
    +			stateInfo.f1 = newMetaInfo;
    +		} else {
    +			String stateName = stateDesc.getName();
     
    -			if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
    -				// TODO state migration currently isn't possible.
    -				throw new StateMigrationException("State migration isn't supported, yet.");
    -			} else {
    -				stateInfo.f1 = newMetaInfo;
    -				return stateInfo.f0;
    -			}
    +			newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    +				stateDesc.getType(),
    +				stateName,
    +				namespaceSerializer,
    +				stateDesc.getSerializer());
    +
    +			ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
    +
    +			stateInfo = Tuple2.of(columnFamily, newMetaInfo);
    +			kvStateInformation.put(stateDesc.getName(), stateInfo);
     		}
     
    -		byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
    +		return stateInfo;
    +	}
    +
    +	/**
    +	 * Creates a column family handle for use with a k/v state.
    +	 */
    +	private ColumnFamilyHandle createColumnFamily(String stateName) throws IOException, StateMigrationException {
    --- End diff --
    
    This method now longer throws `StateMigrationException`, so it could be removed.


---

[GitHub] flink issue #5885: [FLINK-8715] Remove usage of StateDescriptor in state han...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5885
  
    Thanks for the reviews @bowenli86 @StefanRRichter! I've merged this.


---

[GitHub] flink issue #5885: [FLINK-8715] Remove usage of StateDescriptor in state han...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5885
  
    A note about previous tests that should have been failing because of this issue:
    The tests `StateBackendTestBase#testKryoRestoreResilienceWithDifferentRegistrationOrder()` and `StateBackendTestBase#testPojoRestoreResilienceWithDifferentRegistrationOrder` was supposed to, to some extend, test serializer reconfigurations across backend state restores.
    
    f553d9b adds an extension to the Kryo test that verifies the state handles actually uses the correct reconfigured serializer. After adding that extension, the test fails without this PR.
    However, that still doesn't explain why, especially in the RocksDB case, modifications to the state did not cause errors while it was using the non-reconfigured serializer (with outdated Kryo registration mappings).


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r183997653
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -169,13 +169,16 @@ public HeapKeyedStateBackend(
     			TypeSerializer<N> namespaceSerializer,
     			TypeSerializer<V> valueSerializer) throws StateMigrationException {
     
    -		final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo =
    -				new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
    -
     		@SuppressWarnings("unchecked")
     		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
     
     		if (stateTable == null) {
    +			RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    --- End diff --
    
    Mixing of concerns is not yet as bad as in the Rocks backend code, you might also start separating this a bit more here as well.


---

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184831125
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---
    @@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace(
     		RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
     		RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
     	}
    +
    +	protected V getDefaultValue() {
    --- End diff --
    
    this method is duplicated among some impl classes. We can move it to `InternalKvState` as a [default method](https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html).


---