You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/30 08:43:07 UTC
[flink] branch master updated: [FLINK-28453][state-processor] Deduplicate code
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 178cc4e56fb [FLINK-28453][state-processor] Deduplicate code
178cc4e56fb is described below
commit 178cc4e56fb94fe0aa68aeaee780b6227625768d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Sep 28 13:11:35 2022 +0200
[FLINK-28453][state-processor] Deduplicate code
---
.../apache/flink/state/api/SavepointReader.java | 66 +++++++++++-----------
1 file changed, 32 insertions(+), 34 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
index 73cdf334bed..8550341eaba 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
@@ -155,15 +155,7 @@ public class SavepointReader {
*/
public <T> DataStream<T> readListState(String uid, String name, TypeInformation<T> typeInfo)
throws IOException {
- OperatorState operatorState = metadata.getOperatorState(uid);
- ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, typeInfo);
- ListStateInputFormat<T> inputFormat =
- new ListStateInputFormat<>(
- operatorState,
- MutableConfig.of(env.getConfiguration()),
- stateBackend,
- descriptor);
- return SourceBuilder.fromFormat(env, inputFormat, typeInfo);
+ return readListState(uid, typeInfo, new ListStateDescriptor<T>(name, typeInfo));
}
/**
@@ -182,9 +174,14 @@ public class SavepointReader {
public <T> DataStream<T> readListState(
String uid, String name, TypeInformation<T> typeInfo, TypeSerializer<T> serializer)
throws IOException {
+ return readListState(uid, typeInfo, new ListStateDescriptor<T>(name, serializer));
+ }
+
+ private <T> DataStream<T> readListState(
+ String uid, TypeInformation<T> typeInfo, ListStateDescriptor<T> descriptor)
+ throws IOException {
OperatorState operatorState = metadata.getOperatorState(uid);
- ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, serializer);
ListStateInputFormat<T> inputFormat =
new ListStateInputFormat<>(
operatorState,
@@ -206,15 +203,7 @@ public class SavepointReader {
*/
public <T> DataStream<T> readUnionState(String uid, String name, TypeInformation<T> typeInfo)
throws IOException {
- OperatorState operatorState = metadata.getOperatorState(uid);
- ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, typeInfo);
- UnionStateInputFormat<T> inputFormat =
- new UnionStateInputFormat<>(
- operatorState,
- MutableConfig.of(env.getConfiguration()),
- stateBackend,
- descriptor);
- return SourceBuilder.fromFormat(env, inputFormat, typeInfo);
+ return readUnionState(uid, typeInfo, new ListStateDescriptor<>(name, typeInfo));
}
/**
@@ -233,9 +222,14 @@ public class SavepointReader {
public <T> DataStream<T> readUnionState(
String uid, String name, TypeInformation<T> typeInfo, TypeSerializer<T> serializer)
throws IOException {
+ return readUnionState(uid, typeInfo, new ListStateDescriptor<>(name, serializer));
+ }
+
+ private <T> DataStream<T> readUnionState(
+ String uid, TypeInformation<T> typeInfo, ListStateDescriptor<T> descriptor)
+ throws IOException {
OperatorState operatorState = metadata.getOperatorState(uid);
- ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, serializer);
UnionStateInputFormat<T> inputFormat =
new UnionStateInputFormat<>(
operatorState,
@@ -263,18 +257,11 @@ public class SavepointReader {
TypeInformation<K> keyTypeInfo,
TypeInformation<V> valueTypeInfo)
throws IOException {
-
- OperatorState operatorState = metadata.getOperatorState(uid);
- MapStateDescriptor<K, V> descriptor =
- new MapStateDescriptor<>(name, keyTypeInfo, valueTypeInfo);
- BroadcastStateInputFormat<K, V> inputFormat =
- new BroadcastStateInputFormat<>(
- operatorState,
- MutableConfig.of(env.getConfiguration()),
- stateBackend,
- descriptor);
- return SourceBuilder.fromFormat(
- env, inputFormat, new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo));
+ return readBroadcastState(
+ uid,
+ keyTypeInfo,
+ valueTypeInfo,
+ new MapStateDescriptor<>(name, keyTypeInfo, valueTypeInfo));
}
/**
@@ -301,10 +288,21 @@ public class SavepointReader {
TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer)
throws IOException {
+ return readBroadcastState(
+ uid,
+ keyTypeInfo,
+ valueTypeInfo,
+ new MapStateDescriptor<>(name, keySerializer, valueSerializer));
+ }
+
+ private <K, V> DataStream<Tuple2<K, V>> readBroadcastState(
+ String uid,
+ TypeInformation<K> keyTypeInfo,
+ TypeInformation<V> valueTypeInfo,
+ MapStateDescriptor<K, V> descriptor)
+ throws IOException {
OperatorState operatorState = metadata.getOperatorState(uid);
- MapStateDescriptor<K, V> descriptor =
- new MapStateDescriptor<>(name, keySerializer, valueSerializer);
BroadcastStateInputFormat<K, V> inputFormat =
new BroadcastStateInputFormat<>(
operatorState,