You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by azagrebin <gi...@git.apache.org> on 2018/06/15 14:16:13 UTC

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

GitHub user azagrebin opened a pull request:

    https://github.com/apache/flink/pull/6173

    [FLINK-9571] Refactor StateBinder to internal backend specific state factories

    ## What is the purpose of the change
    
    Remove StateBinder and StateDescriptor.bind method from State user API.
    Introduce AbstractKeyedStateBackend.createState factory method.
    The factory method dispatches StateDescriptor.class to backend specific state implementation.
    
    ## Brief change log
    
      - Deprecate StateBinder and StateDescriptor.bind in state descriptors
      - Add abstract AbstractKeyedStateBackend.createState
      - Implement createState in heap, rockdb and queryable state client backends
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/azagrebin/flink FLINK-9571

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6173.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6173
    
----
commit de5085f75331488fc6b0f72ec06c973c61eaef79
Author: Andrey Zagrebin <az...@...>
Date:   2018-06-13T08:24:10Z

    [FLINK-9571] Refactor StateBinder to internal backend specific state factories

----


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903191
  
    --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---
    @@ -244,7 +271,14 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
     		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
     				stateResponse -> {
     					try {
    -						return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
    +						if (!STATE_FACTORIES.containsKey(stateDescriptor.getClass())) {
    +							String message = String.format("State %s is not supported by %s",
    +								stateDescriptor.getClass(), this.getClass());
    +							throw new FlinkRuntimeException(message);
    +						}
    +						return STATE_FACTORIES
    +							.get(stateDescriptor.getClass())
    --- End diff --
    
    Maybe we can merge the `containsKey()` and the `get()` into a single `get()`, this way we don't need to query the `STATE_FACTORIES` twice.


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903433
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -203,91 +216,16 @@ private boolean hasRegisteredState() {
     	}
     
     	@Override
    -	public <N, V> InternalValueState<K, N, V> createValueState(
    -			TypeSerializer<N> namespaceSerializer,
    -			ValueStateDescriptor<V> stateDesc) throws Exception {
    -
    -		StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
    -		return new HeapValueState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue());
    -	}
    -
    -	@Override
    -	public <N, T> InternalListState<K, N, T> createListState(
    -			TypeSerializer<N> namespaceSerializer,
    -			ListStateDescriptor<T> stateDesc) throws Exception {
    -
    -		StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
    -		return new HeapListState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue());
    -	}
    -
    -	@Override
    -	public <N, T> InternalReducingState<K, N, T> createReducingState(
    -			TypeSerializer<N> namespaceSerializer,
    -			ReducingStateDescriptor<T> stateDesc) throws Exception {
    -
    -		StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
    -		return new HeapReducingState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getReduceFunction());
    -	}
    -
    -	@Override
    -	public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
    -			TypeSerializer<N> namespaceSerializer,
    -			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
    -
    -		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
    -		return new HeapAggregatingState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getAggregateFunction());
    -	}
    -
    -	@Override
    -	public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
    -			TypeSerializer<N> namespaceSerializer,
    -			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
    -
    -		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
    -		return new HeapFoldingState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getFoldFunction());
    -	}
    -
    -	@Override
    -	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
    -			TypeSerializer<N> namespaceSerializer,
    -			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
    -
    -		StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
    -
    -		return new HeapMapState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue());
    +	public <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), this.getClass());
    +			throw new FlinkRuntimeException(message);
    +		}
    +		StateTable<K, N, SV> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
    +		return STATE_FACTORIES.get(stateDesc.getClass()).createState(stateDesc, stateTable, keySerializer);
    --- End diff --
    
    The same like above, maybe the `get()` and `containsKey()` could be merged into one `get()`.


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r196042694
  
    --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---
    @@ -67,6 +79,21 @@
     
     	private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
     
    +	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
    +		org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap
    --- End diff --
    
    We have unmodifiable map alternatives in the JDK. If there is not a good reason why we need guava here, I would suggest to solve this without this import. Even in that case, I would use import and not fully qualified classname.


---

[GitHub] flink issue #6173: [FLINK-9571] Refactor StateBinder to internal backend spe...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6173
  
    I had a few comments inline. After they are addressed, I think this is good to merge.


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903383
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -110,7 +123,7 @@
     	/**
     	 * Map of state names to their corresponding restored state meta info.
     	 *
    -	 * <p>
    +	 * <p></p>
    --- End diff --
    
    I think the </p> might should be reverted.


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r196043061
  
    --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---
    @@ -244,7 +271,14 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
     		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
     				stateResponse -> {
     					try {
    -						return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
    +						StateFactory stateFactory = STATE_FACTORIES
    +							.get(stateDescriptor.getClass());
    +						if (stateFactory == null) {
    +							String message = String.format("State %s is not supported by %s",
    +								stateDescriptor.getClass(), this.getClass());
    +							throw new FlinkRuntimeException(message);
    +						}
    +						return stateFactory.createState(stateDescriptor, stateResponse.getContent());
     					} catch (Exception e) {
    --- End diff --
    
    Maybe it would make sense to adjust the scope of the `try-catch-block` because right now this will catch and wrap our own exception from line 279.


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903638
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1303,103 +1316,18 @@ private ColumnFamilyHandle createColumnFamily(String stateName) throws IOExcepti
     	}
     
     	@Override
    -	protected <N, T> InternalValueState<K, N, T> createValueState(
    -		TypeSerializer<N> namespaceSerializer,
    -		ValueStateDescriptor<T> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBValueState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T> InternalListState<K, N, T> createListState(
    -		TypeSerializer<N> namespaceSerializer,
    -		ListStateDescriptor<T> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBListState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getElementSerializer(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T> InternalReducingState<K, N, T> createReducingState(
    -		TypeSerializer<N> namespaceSerializer,
    -		ReducingStateDescriptor<T> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBReducingState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getReduceFunction(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
    -		TypeSerializer<N> namespaceSerializer,
    -		AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBAggregatingState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getAggregateFunction(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
    +	public <N, SV, S extends State, IS extends S> IS createState(
     		TypeSerializer<N> namespaceSerializer,
    -		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBFoldingState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getFoldFunction(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
    -		TypeSerializer<N> namespaceSerializer,
    -		MapStateDescriptor<UK, UV> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBMapState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				this);
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), this.getClass());
    +			throw new UnsupportedOperationException(message);
    --- End diff --
    
    The exception type is a bit inconsistent, in other place throw `FlinkRuntimeException`, maybe it better to make this consistent.


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r196046444
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---
    @@ -392,14 +268,14 @@ public KeyGroupRange getKeyGroupRange() {
     			kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
     		}
     
    -		return state;
    +		return (S) kvState;
    --- End diff --
    
    For the future we should think about a way to bind the state type of the state descriptor also to the internal state type, so that the factory can produce a proper generic type that reflects matches the state descriptor but also is an `InternalKvState`. For now, I see no simple solution, but we might want to keep this in mind and discuss with @aljoscha .


---

[GitHub] flink issue #6173: [FLINK-9571] Refactor StateBinder to internal backend spe...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:

    https://github.com/apache/flink/pull/6173
  
    cc @StefanRRichter @aljoscha 


---

[GitHub] flink issue #6173: [FLINK-9571] Refactor StateBinder to internal backend spe...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6173
  
    LGTM 👍 Will merge.


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903619
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1303,103 +1316,18 @@ private ColumnFamilyHandle createColumnFamily(String stateName) throws IOExcepti
     	}
     
     	@Override
    -	protected <N, T> InternalValueState<K, N, T> createValueState(
    -		TypeSerializer<N> namespaceSerializer,
    -		ValueStateDescriptor<T> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBValueState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T> InternalListState<K, N, T> createListState(
    -		TypeSerializer<N> namespaceSerializer,
    -		ListStateDescriptor<T> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBListState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getElementSerializer(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T> InternalReducingState<K, N, T> createReducingState(
    -		TypeSerializer<N> namespaceSerializer,
    -		ReducingStateDescriptor<T> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBReducingState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getReduceFunction(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
    -		TypeSerializer<N> namespaceSerializer,
    -		AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBAggregatingState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getAggregateFunction(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
    +	public <N, SV, S extends State, IS extends S> IS createState(
     		TypeSerializer<N> namespaceSerializer,
    -		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBFoldingState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getFoldFunction(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
    -		TypeSerializer<N> namespaceSerializer,
    -		MapStateDescriptor<UK, UV> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBMapState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				this);
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), this.getClass());
    +			throw new UnsupportedOperationException(message);
    +		}
    +		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult =
    +			tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    +		return STATE_FACTORIES.get(stateDesc.getClass()).createState(
    --- End diff --
    
    The same above, the `containsKey()` and `get()` might could be merged into a single `get()`.


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6173


---

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903979
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java ---
    @@ -23,7 +23,10 @@
     /**
      * The {@code StateBinder} is used by {@link StateDescriptor} instances to create actual
      * {@link State} objects.
    + *
    + * @deprecated refactored to StateFactory in flink-runtime
      */
    +@Deprecated
     @Internal
     public interface StateBinder {
    --- End diff --
    
    Since it is internal, how about just remove it?


---

[GitHub] flink issue #6173: [FLINK-9571] Refactor StateBinder to internal backend spe...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:

    https://github.com/apache/flink/pull/6173
  
    Thanks @sihuazhou! I have added changes to address the comments


---