You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by azagrebin <gi...@git.apache.org> on 2018/06/15 14:16:13 UTC
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
GitHub user azagrebin opened a pull request:
https://github.com/apache/flink/pull/6173
[FLINK-9571] Refactor StateBinder to internal backend specific state factories
## What is the purpose of the change
Remove StateBinder and StateDescriptor.bind method from State user API.
Introduce AbstractKeyedStateBackend.createState factory method.
The factory method dispatches StateDescriptor.class to backend specific state implementation.
## Brief change log
- Deprecate StateBinder and StateDescriptor.bind in state descriptors
- Add abstract AbstractKeyedStateBackend.createState
- Implement createState in heap, rockdb and queryable state client backends
## Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/azagrebin/flink FLINK-9571
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6173.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6173
----
commit de5085f75331488fc6b0f72ec06c973c61eaef79
Author: Andrey Zagrebin <az...@...>
Date: 2018-06-13T08:24:10Z
[FLINK-9571] Refactor StateBinder to internal backend specific state factories
----
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6173#discussion_r195903191
--- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---
@@ -244,7 +271,14 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
stateResponse -> {
try {
- return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
+ if (!STATE_FACTORIES.containsKey(stateDescriptor.getClass())) {
+ String message = String.format("State %s is not supported by %s",
+ stateDescriptor.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+ return STATE_FACTORIES
+ .get(stateDescriptor.getClass())
--- End diff --
Maybe we can merge the `containsKey()` and the `get()` into a single `get()`, this way we don't need to query the `STATE_FACTORIES` twice.
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6173#discussion_r195903433
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
@@ -203,91 +216,16 @@ private boolean hasRegisteredState() {
}
@Override
- public <N, V> InternalValueState<K, N, V> createValueState(
- TypeSerializer<N> namespaceSerializer,
- ValueStateDescriptor<V> stateDesc) throws Exception {
-
- StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapValueState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue());
- }
-
- @Override
- public <N, T> InternalListState<K, N, T> createListState(
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<T> stateDesc) throws Exception {
-
- StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapListState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue());
- }
-
- @Override
- public <N, T> InternalReducingState<K, N, T> createReducingState(
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<T> stateDesc) throws Exception {
-
- StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapReducingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getReduceFunction());
- }
-
- @Override
- public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
- TypeSerializer<N> namespaceSerializer,
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
- StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapAggregatingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getAggregateFunction());
- }
-
- @Override
- public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
- TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
- StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
- return new HeapFoldingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getFoldFunction());
- }
-
- @Override
- protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
- TypeSerializer<N> namespaceSerializer,
- MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
- StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
-
- return new HeapMapState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue());
+ public <N, SV, S extends State, IS extends S> IS createState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDesc) throws Exception {
+ if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
+ String message = String.format("State %s is not supported by %s",
+ stateDesc.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+ StateTable<K, N, SV> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+ return STATE_FACTORIES.get(stateDesc.getClass()).createState(stateDesc, stateTable, keySerializer);
--- End diff --
The same like above, maybe the `get()` and `containsKey()` could be merged into one `get()`.
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6173#discussion_r196042694
--- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---
@@ -67,6 +79,21 @@
private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
+ private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
+ org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap
--- End diff --
We have unmodifiable map alternatives in the JDK. If there is not a good reason why we need guava here, I would suggest to solve this without this import. Even in that case, I would use import and not fully qualified classname.
---
[GitHub] flink issue #6173: [FLINK-9571] Refactor StateBinder to internal backend spe...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6173
I had a few comments inline. After they are addressed, I think this is good to merge.
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6173#discussion_r195903383
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
@@ -110,7 +123,7 @@
/**
* Map of state names to their corresponding restored state meta info.
*
- * <p>
+ * <p></p>
--- End diff --
I think the </p> might should be reverted.
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6173#discussion_r196043061
--- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---
@@ -244,7 +271,14 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
stateResponse -> {
try {
- return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
+ StateFactory stateFactory = STATE_FACTORIES
+ .get(stateDescriptor.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not supported by %s",
+ stateDescriptor.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+ return stateFactory.createState(stateDescriptor, stateResponse.getContent());
} catch (Exception e) {
--- End diff --
Maybe it would make sense to adjust the scope of the `try-catch-block` because right now this will catch and wrap our own exception from line 279.
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6173#discussion_r195903638
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
@@ -1303,103 +1316,18 @@ private ColumnFamilyHandle createColumnFamily(String stateName) throws IOExcepti
}
@Override
- protected <N, T> InternalValueState<K, N, T> createValueState(
- TypeSerializer<N> namespaceSerializer,
- ValueStateDescriptor<T> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBValueState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- this);
- }
-
- @Override
- protected <N, T> InternalListState<K, N, T> createListState(
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<T> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBListState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getElementSerializer(),
- this);
- }
-
- @Override
- protected <N, T> InternalReducingState<K, N, T> createReducingState(
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<T> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBReducingState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getReduceFunction(),
- this);
- }
-
- @Override
- protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
- TypeSerializer<N> namespaceSerializer,
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBAggregatingState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getAggregateFunction(),
- this);
- }
-
- @Override
- protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
+ public <N, SV, S extends State, IS extends S> IS createState(
TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBFoldingState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getFoldFunction(),
- this);
- }
-
- @Override
- protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
- TypeSerializer<N> namespaceSerializer,
- MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBMapState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- this);
+ StateDescriptor<S, SV> stateDesc) throws Exception {
+ if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
+ String message = String.format("State %s is not supported by %s",
+ stateDesc.getClass(), this.getClass());
+ throw new UnsupportedOperationException(message);
--- End diff --
The exception type is a bit inconsistent, in other place throw `FlinkRuntimeException`, maybe it better to make this consistent.
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6173#discussion_r196046444
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---
@@ -392,14 +268,14 @@ public KeyGroupRange getKeyGroupRange() {
kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
}
- return state;
+ return (S) kvState;
--- End diff --
For the future we should think about a way to bind the state type of the state descriptor also to the internal state type, so that the factory can produce a proper generic type that reflects matches the state descriptor but also is an `InternalKvState`. For now, I see no simple solution, but we might want to keep this in mind and discuss with @aljoscha .
---
[GitHub] flink issue #6173: [FLINK-9571] Refactor StateBinder to internal backend spe...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:
https://github.com/apache/flink/pull/6173
cc @StefanRRichter @aljoscha
---
[GitHub] flink issue #6173: [FLINK-9571] Refactor StateBinder to internal backend spe...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6173
LGTM 👍 Will merge.
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6173#discussion_r195903619
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
@@ -1303,103 +1316,18 @@ private ColumnFamilyHandle createColumnFamily(String stateName) throws IOExcepti
}
@Override
- protected <N, T> InternalValueState<K, N, T> createValueState(
- TypeSerializer<N> namespaceSerializer,
- ValueStateDescriptor<T> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBValueState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- this);
- }
-
- @Override
- protected <N, T> InternalListState<K, N, T> createListState(
- TypeSerializer<N> namespaceSerializer,
- ListStateDescriptor<T> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBListState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getElementSerializer(),
- this);
- }
-
- @Override
- protected <N, T> InternalReducingState<K, N, T> createReducingState(
- TypeSerializer<N> namespaceSerializer,
- ReducingStateDescriptor<T> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBReducingState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getReduceFunction(),
- this);
- }
-
- @Override
- protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
- TypeSerializer<N> namespaceSerializer,
- AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBAggregatingState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getAggregateFunction(),
- this);
- }
-
- @Override
- protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
+ public <N, SV, S extends State, IS extends S> IS createState(
TypeSerializer<N> namespaceSerializer,
- FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBFoldingState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- stateDesc.getFoldFunction(),
- this);
- }
-
- @Override
- protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
- TypeSerializer<N> namespaceSerializer,
- MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
- Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult =
- tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
-
- return new RocksDBMapState<>(
- registerResult.f0,
- registerResult.f1.getNamespaceSerializer(),
- registerResult.f1.getStateSerializer(),
- stateDesc.getDefaultValue(),
- this);
+ StateDescriptor<S, SV> stateDesc) throws Exception {
+ if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
+ String message = String.format("State %s is not supported by %s",
+ stateDesc.getClass(), this.getClass());
+ throw new UnsupportedOperationException(message);
+ }
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult =
+ tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
+ return STATE_FACTORIES.get(stateDesc.getClass()).createState(
--- End diff --
The same above, the `containsKey()` and `get()` might could be merged into a single `get()`.
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6173
---
[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6173#discussion_r195903979
--- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java ---
@@ -23,7 +23,10 @@
/**
* The {@code StateBinder} is used by {@link StateDescriptor} instances to create actual
* {@link State} objects.
+ *
+ * @deprecated refactored to StateFactory in flink-runtime
*/
+@Deprecated
@Internal
public interface StateBinder {
--- End diff --
Since it is internal, how about just remove it?
---
[GitHub] flink issue #6173: [FLINK-9571] Refactor StateBinder to internal backend spe...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:
https://github.com/apache/flink/pull/6173
Thanks @sihuazhou! I have added changes to address the comments
---