You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/30 08:43:07 UTC

[flink] branch master updated: [FLINK-28453][state-processor] Deduplicate code

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 178cc4e56fb [FLINK-28453][state-processor] Deduplicate code
178cc4e56fb is described below

commit 178cc4e56fb94fe0aa68aeaee780b6227625768d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Sep 28 13:11:35 2022 +0200

    [FLINK-28453][state-processor] Deduplicate code
---
 .../apache/flink/state/api/SavepointReader.java    | 66 +++++++++++-----------
 1 file changed, 32 insertions(+), 34 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
index 73cdf334bed..8550341eaba 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
@@ -155,15 +155,7 @@ public class SavepointReader {
      */
     public <T> DataStream<T> readListState(String uid, String name, TypeInformation<T> typeInfo)
             throws IOException {
-        OperatorState operatorState = metadata.getOperatorState(uid);
-        ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, typeInfo);
-        ListStateInputFormat<T> inputFormat =
-                new ListStateInputFormat<>(
-                        operatorState,
-                        MutableConfig.of(env.getConfiguration()),
-                        stateBackend,
-                        descriptor);
-        return SourceBuilder.fromFormat(env, inputFormat, typeInfo);
+        return readListState(uid, typeInfo, new ListStateDescriptor<T>(name, typeInfo));
     }
 
     /**
@@ -182,9 +174,14 @@ public class SavepointReader {
     public <T> DataStream<T> readListState(
             String uid, String name, TypeInformation<T> typeInfo, TypeSerializer<T> serializer)
             throws IOException {
+        return readListState(uid, typeInfo, new ListStateDescriptor<T>(name, serializer));
+    }
+
+    private <T> DataStream<T> readListState(
+            String uid, TypeInformation<T> typeInfo, ListStateDescriptor<T> descriptor)
+            throws IOException {
 
         OperatorState operatorState = metadata.getOperatorState(uid);
-        ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, serializer);
         ListStateInputFormat<T> inputFormat =
                 new ListStateInputFormat<>(
                         operatorState,
@@ -206,15 +203,7 @@ public class SavepointReader {
      */
     public <T> DataStream<T> readUnionState(String uid, String name, TypeInformation<T> typeInfo)
             throws IOException {
-        OperatorState operatorState = metadata.getOperatorState(uid);
-        ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, typeInfo);
-        UnionStateInputFormat<T> inputFormat =
-                new UnionStateInputFormat<>(
-                        operatorState,
-                        MutableConfig.of(env.getConfiguration()),
-                        stateBackend,
-                        descriptor);
-        return SourceBuilder.fromFormat(env, inputFormat, typeInfo);
+        return readUnionState(uid, typeInfo, new ListStateDescriptor<>(name, typeInfo));
     }
 
     /**
@@ -233,9 +222,14 @@ public class SavepointReader {
     public <T> DataStream<T> readUnionState(
             String uid, String name, TypeInformation<T> typeInfo, TypeSerializer<T> serializer)
             throws IOException {
+        return readUnionState(uid, typeInfo, new ListStateDescriptor<>(name, serializer));
+    }
+
+    private <T> DataStream<T> readUnionState(
+            String uid, TypeInformation<T> typeInfo, ListStateDescriptor<T> descriptor)
+            throws IOException {
 
         OperatorState operatorState = metadata.getOperatorState(uid);
-        ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, serializer);
         UnionStateInputFormat<T> inputFormat =
                 new UnionStateInputFormat<>(
                         operatorState,
@@ -263,18 +257,11 @@ public class SavepointReader {
             TypeInformation<K> keyTypeInfo,
             TypeInformation<V> valueTypeInfo)
             throws IOException {
-
-        OperatorState operatorState = metadata.getOperatorState(uid);
-        MapStateDescriptor<K, V> descriptor =
-                new MapStateDescriptor<>(name, keyTypeInfo, valueTypeInfo);
-        BroadcastStateInputFormat<K, V> inputFormat =
-                new BroadcastStateInputFormat<>(
-                        operatorState,
-                        MutableConfig.of(env.getConfiguration()),
-                        stateBackend,
-                        descriptor);
-        return SourceBuilder.fromFormat(
-                env, inputFormat, new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo));
+        return readBroadcastState(
+                uid,
+                keyTypeInfo,
+                valueTypeInfo,
+                new MapStateDescriptor<>(name, keyTypeInfo, valueTypeInfo));
     }
 
     /**
@@ -301,10 +288,21 @@ public class SavepointReader {
             TypeSerializer<K> keySerializer,
             TypeSerializer<V> valueSerializer)
             throws IOException {
+        return readBroadcastState(
+                uid,
+                keyTypeInfo,
+                valueTypeInfo,
+                new MapStateDescriptor<>(name, keySerializer, valueSerializer));
+    }
+
+    private <K, V> DataStream<Tuple2<K, V>> readBroadcastState(
+            String uid,
+            TypeInformation<K> keyTypeInfo,
+            TypeInformation<V> valueTypeInfo,
+            MapStateDescriptor<K, V> descriptor)
+            throws IOException {
 
         OperatorState operatorState = metadata.getOperatorState(uid);
-        MapStateDescriptor<K, V> descriptor =
-                new MapStateDescriptor<>(name, keySerializer, valueSerializer);
         BroadcastStateInputFormat<K, V> inputFormat =
                 new BroadcastStateInputFormat<>(
                         operatorState,