You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/02/11 19:46:41 UTC

[flink] branch master updated (cf451aa -> ae5bea3)

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from cf451aa  [FLINK-21274] Block main thread when running the TaskManagerRunner
     new d8d446a  [FLINK-21315][state-processor-api]return DataSource<T> such that users can set operator names
     new ae5bea3  [FLINK-21315][state-processor-api]set an operator name when collecting existing operator states.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/state/api/ExistingSavepoint.java  | 18 +++++++++---------
 .../java/org/apache/flink/state/api/WindowReader.java  | 14 +++++++-------
 .../org/apache/flink/state/api/WritableSavepoint.java  |  5 ++++-
 3 files changed, 20 insertions(+), 17 deletions(-)


[flink] 01/02: [FLINK-21315][state-processor-api]return DataSource such that users can set operator names

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8d446af2bdca4c044c87069c88c07190c6b5e62
Author: Jun Qin <11...@users.noreply.github.com>
AuthorDate: Tue Feb 9 11:32:40 2021 +0100

    [FLINK-21315][state-processor-api]return DataSource<T> such that users can set operator names
---
 .../org/apache/flink/state/api/ExistingSavepoint.java  | 18 +++++++++---------
 .../java/org/apache/flink/state/api/WindowReader.java  | 14 +++++++-------
 2 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
index 1212201..c46bd41 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/ExistingSavepoint.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -102,7 +102,7 @@ public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
      * @return A {@code DataSet} representing the elements in state.
      * @throws IOException If the savepoint path is invalid or the uid does not exist.
      */
-    public <T> DataSet<T> readListState(String uid, String name, TypeInformation<T> typeInfo)
+    public <T> DataSource<T> readListState(String uid, String name, TypeInformation<T> typeInfo)
             throws IOException {
         OperatorState operatorState = metadata.getOperatorState(uid);
         ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, typeInfo);
@@ -123,7 +123,7 @@ public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
      * @return A {@code DataSet} representing the elements in state.
      * @throws IOException If the savepoint path is invalid or the uid does not exist.
      */
-    public <T> DataSet<T> readListState(
+    public <T> DataSource<T> readListState(
             String uid, String name, TypeInformation<T> typeInfo, TypeSerializer<T> serializer)
             throws IOException {
 
@@ -143,7 +143,7 @@ public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
      * @return A {@code DataSet} representing the elements in state.
      * @throws IOException If the savepoint path is invalid or the uid does not exist.
      */
-    public <T> DataSet<T> readUnionState(String uid, String name, TypeInformation<T> typeInfo)
+    public <T> DataSource<T> readUnionState(String uid, String name, TypeInformation<T> typeInfo)
             throws IOException {
         OperatorState operatorState = metadata.getOperatorState(uid);
         ListStateDescriptor<T> descriptor = new ListStateDescriptor<>(name, typeInfo);
@@ -165,7 +165,7 @@ public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
      * @return A {@code DataSet} representing the elements in state.
      * @throws IOException If the savepoint path is invalid or the uid does not exist.
      */
-    public <T> DataSet<T> readUnionState(
+    public <T> DataSource<T> readUnionState(
             String uid, String name, TypeInformation<T> typeInfo, TypeSerializer<T> serializer)
             throws IOException {
 
@@ -188,7 +188,7 @@ public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
      * @return A {@code DataSet} of key-value pairs from state.
      * @throws IOException If the savepoint does not contain the specified uid.
      */
-    public <K, V> DataSet<Tuple2<K, V>> readBroadcastState(
+    public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
             String uid,
             String name,
             TypeInformation<K> keyTypeInfo,
@@ -219,7 +219,7 @@ public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
      * @return A {@code DataSet} of key-value pairs from state.
      * @throws IOException If the savepoint path is invalid or the uid does not exist.
      */
-    public <K, V> DataSet<Tuple2<K, V>> readBroadcastState(
+    public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
             String uid,
             String name,
             TypeInformation<K> keyTypeInfo,
@@ -246,7 +246,7 @@ public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
      * @return A {@code DataSet} of objects read from keyed state.
      * @throws IOException If the savepoint does not contain operator state with the given uid.
      */
-    public <K, OUT> DataSet<OUT> readKeyedState(
+    public <K, OUT> DataSource<OUT> readKeyedState(
             String uid, KeyedStateReaderFunction<K, OUT> function) throws IOException {
 
         TypeInformation<K> keyTypeInfo;
@@ -296,7 +296,7 @@ public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
      * @return A {@code DataSet} of objects read from keyed state.
      * @throws IOException If the savepoint does not contain operator state with the given uid.
      */
-    public <K, OUT> DataSet<OUT> readKeyedState(
+    public <K, OUT> DataSource<OUT> readKeyedState(
             String uid,
             KeyedStateReaderFunction<K, OUT> function,
             TypeInformation<K> keyTypeInfo,
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowReader.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowReader.java
index 1b0c062..c8d3679 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowReader.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowReader.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.state.api.functions.WindowReaderFunction;
 import org.apache.flink.state.api.input.KeyedStateInputFormat;
@@ -97,7 +97,7 @@ public class WindowReader<W extends Window> {
      * @return A {@code DataSet} of objects read from keyed state.
      * @throws IOException If savepoint does not contain the specified uid.
      */
-    public <T, K> DataSet<T> reduce(
+    public <T, K> DataSource<T> reduce(
             String uid,
             ReduceFunction<T> function,
             TypeInformation<K> keyType,
@@ -122,7 +122,7 @@ public class WindowReader<W extends Window> {
      * @return A {@code DataSet} of objects read from keyed state.
      * @throws IOException If savepoint does not contain the specified uid.
      */
-    public <K, T, OUT> DataSet<OUT> reduce(
+    public <K, T, OUT> DataSource<OUT> reduce(
             String uid,
             ReduceFunction<T> function,
             WindowReaderFunction<T, OUT, K, W> readerFunction,
@@ -153,7 +153,7 @@ public class WindowReader<W extends Window> {
      * @return A {@code DataSet} of objects read from keyed state.
      * @throws IOException If savepoint does not contain the specified uid.
      */
-    public <K, T, ACC, R> DataSet<R> aggregate(
+    public <K, T, ACC, R> DataSource<R> aggregate(
             String uid,
             AggregateFunction<T, ACC, R> aggregateFunction,
             TypeInformation<K> keyType,
@@ -182,7 +182,7 @@ public class WindowReader<W extends Window> {
      * @return A {@code DataSet} of objects read from keyed state.
      * @throws IOException If savepoint does not contain the specified uid.
      */
-    public <K, T, ACC, R, OUT> DataSet<OUT> aggregate(
+    public <K, T, ACC, R, OUT> DataSource<OUT> aggregate(
             String uid,
             AggregateFunction<T, ACC, R> aggregateFunction,
             WindowReaderFunction<R, OUT, K, W> readerFunction,
@@ -213,7 +213,7 @@ public class WindowReader<W extends Window> {
      * @return A {@code DataSet} of objects read from keyed state.
      * @throws IOException If the savepoint does not contain the specified uid.
      */
-    public <K, T, OUT> DataSet<OUT> process(
+    public <K, T, OUT> DataSource<OUT> process(
             String uid,
             WindowReaderFunction<T, OUT, K, W> readerFunction,
             TypeInformation<K> keyType,
@@ -227,7 +227,7 @@ public class WindowReader<W extends Window> {
         return readWindowOperator(uid, outputType, operator);
     }
 
-    private <K, T, OUT> DataSet<OUT> readWindowOperator(
+    private <K, T, OUT> DataSource<OUT> readWindowOperator(
             String uid,
             TypeInformation<OUT> outputType,
             WindowReaderOperator<?, K, T, W, OUT> operator)


[flink] 02/02: [FLINK-21315][state-processor-api]set an operator name when collecting existing operator states.

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae5bea39491860ccdff3316877ef28e64f466f64
Author: Jun Qin <11...@users.noreply.github.com>
AuthorDate: Tue Feb 9 11:38:01 2021 +0100

    [FLINK-21315][state-processor-api]set an operator name when collecting existing operator states.
    
    This closes #14907
---
 .../src/main/java/org/apache/flink/state/api/WritableSavepoint.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
index f428a27..324e4c1 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -105,7 +105,10 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
             finalOperatorStates = newOperatorStates;
         } else {
             DataSet<OperatorState> existingOperatorStates =
-                    newOperatorStates.getExecutionEnvironment().fromCollection(existingOperators);
+                    newOperatorStates
+                            .getExecutionEnvironment()
+                            .fromCollection(existingOperators)
+                            .name("existingOperatorStates");
 
             existingOperatorStates
                     .flatMap(new StatePathExtractor())