You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/01/04 09:08:18 UTC

[flink] branch master updated: [FLINK-25074][datastream] Simplify name of WindowOperator

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f352dca  [FLINK-25074][datastream] Simplify name of WindowOperator
f352dca is described below

commit f352dca3724deb8a42b36102bc298b58158df90d
Author: 龙三 <we...@alibaba-inc.com>
AuthorDate: Tue Nov 23 17:14:23 2021 +0800

    [FLINK-25074][datastream] Simplify name of WindowOperator
    
    This closes #18056.
---
 .../api/datastream/AllWindowedStream.java          | 22 +++++----
 .../streaming/api/datastream/WindowedStream.java   | 37 +++++++++------
 .../operators/windowing/WindowOperatorBuilder.java |  6 ++-
 .../apache/flink/streaming/api/DataStreamTest.java | 52 ++++++++++++++++++++++
 4 files changed, 94 insertions(+), 23 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index de9299c..9303218 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -259,7 +259,8 @@ public class AllWindowedStream<T, W extends Window> {
         String callLocation = Utils.getCallLocationName();
         String udfName = "AllWindowedStream." + callLocation;
 
-        String opName;
+        String opName = windowAssigner.getClass().getSimpleName();
+        String opDescription;
         KeySelector<T, Byte> keySel = input.getKeySelector();
 
         OneInputStreamOperator<T, R> operator;
@@ -276,7 +277,7 @@ public class AllWindowedStream<T, W extends Window> {
             ListStateDescriptor<StreamRecord<T>> stateDesc =
                     new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-            opName =
+            opDescription =
                     "TriggerWindow("
                             + windowAssigner
                             + ", "
@@ -313,7 +314,7 @@ public class AllWindowedStream<T, W extends Window> {
                             input.getType()
                                     .createSerializer(getExecutionEnvironment().getConfig()));
 
-            opName =
+            opDescription =
                     "TriggerWindow("
                             + windowAssigner
                             + ", "
@@ -339,7 +340,9 @@ public class AllWindowedStream<T, W extends Window> {
                             lateDataOutputTag);
         }
 
-        return input.transform(opName, resultType, operator).forceNonParallel();
+        return input.transform(opName, resultType, operator)
+                .setDescription(opDescription)
+                .forceNonParallel();
     }
 
     /**
@@ -808,7 +811,8 @@ public class AllWindowedStream<T, W extends Window> {
         final String callLocation = Utils.getCallLocationName();
         final String udfName = "AllWindowedStream." + callLocation;
 
-        final String opName;
+        final String opName = windowAssigner.getClass().getSimpleName();
+        final String opDescription;
         final KeySelector<T, Byte> keySel = input.getKeySelector();
 
         OneInputStreamOperator<T, R> operator;
@@ -825,7 +829,7 @@ public class AllWindowedStream<T, W extends Window> {
             ListStateDescriptor<StreamRecord<T>> stateDesc =
                     new ListStateDescriptor<>("window-contents", streamRecordSerializer);
 
-            opName =
+            opDescription =
                     "TriggerWindow("
                             + windowAssigner
                             + ", "
@@ -862,7 +866,7 @@ public class AllWindowedStream<T, W extends Window> {
                             accumulatorType.createSerializer(
                                     getExecutionEnvironment().getConfig()));
 
-            opName =
+            opDescription =
                     "TriggerWindow("
                             + windowAssigner
                             + ", "
@@ -888,7 +892,9 @@ public class AllWindowedStream<T, W extends Window> {
                             lateDataOutputTag);
         }
 
-        return input.transform(opName, resultType, operator).forceNonParallel();
+        return input.transform(opName, resultType, operator)
+                .setDescription(opDescription)
+                .forceNonParallel();
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 0e63d66..efec571 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -211,10 +211,11 @@ public class WindowedStream<T, K, W extends Window> {
         function = input.getExecutionEnvironment().clean(function);
         reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
 
-        final String opName = builder.generateOperatorName(reduceFunction, function);
+        final String opName = builder.generateOperatorName();
+        final String opDescription = builder.generateOperatorDescription(reduceFunction, function);
 
         OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
-        return input.transform(opName, resultType, operator);
+        return input.transform(opName, resultType, operator).setDescription(opDescription);
     }
 
     /**
@@ -258,10 +259,11 @@ public class WindowedStream<T, K, W extends Window> {
         function = input.getExecutionEnvironment().clean(function);
         reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
 
-        final String opName = builder.generateOperatorName(reduceFunction, function);
+        final String opName = builder.generateOperatorName();
+        final String opDescription = builder.generateOperatorDescription(reduceFunction, function);
         OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
 
-        return input.transform(opName, resultType, operator);
+        return input.transform(opName, resultType, operator).setDescription(opDescription);
     }
 
     // ------------------------------------------------------------------------
@@ -405,12 +407,14 @@ public class WindowedStream<T, K, W extends Window> {
         windowFunction = input.getExecutionEnvironment().clean(windowFunction);
         aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
 
-        final String opName = builder.generateOperatorName(aggregateFunction, windowFunction);
+        final String opName = builder.generateOperatorName();
+        final String opDescription =
+                builder.generateOperatorDescription(aggregateFunction, windowFunction);
 
         OneInputStreamOperator<T, R> operator =
                 builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
 
-        return input.transform(opName, resultType, operator);
+        return input.transform(opName, resultType, operator).setDescription(opDescription);
     }
 
     /**
@@ -514,12 +518,14 @@ public class WindowedStream<T, K, W extends Window> {
         windowFunction = input.getExecutionEnvironment().clean(windowFunction);
         aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
 
-        final String opName = builder.generateOperatorName(aggregateFunction, windowFunction);
+        final String opName = builder.generateOperatorName();
+        final String opDescription =
+                builder.generateOperatorDescription(aggregateFunction, windowFunction);
 
         OneInputStreamOperator<T, R> operator =
                 builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
 
-        return input.transform(opName, resultType, operator);
+        return input.transform(opName, resultType, operator).setDescription(opDescription);
     }
 
     // ------------------------------------------------------------------------
@@ -559,10 +565,11 @@ public class WindowedStream<T, K, W extends Window> {
             WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
         function = input.getExecutionEnvironment().clean(function);
 
-        final String opName = builder.generateOperatorName(function, null);
+        final String opName = builder.generateOperatorName();
+        final String opDescription = builder.generateOperatorDescription(function, null);
         OneInputStreamOperator<T, R> operator = builder.apply(function);
 
-        return input.transform(opName, resultType, operator);
+        return input.transform(opName, resultType, operator).setDescription(opDescription);
     }
 
     /**
@@ -601,11 +608,12 @@ public class WindowedStream<T, K, W extends Window> {
             ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
         function = input.getExecutionEnvironment().clean(function);
 
-        final String opName = builder.generateOperatorName(function, null);
+        final String opName = builder.generateOperatorName();
+        final String opDesc = builder.generateOperatorDescription(function, null);
 
         OneInputStreamOperator<T, R> operator = builder.process(function);
 
-        return input.transform(opName, resultType, operator);
+        return input.transform(opName, resultType, operator).setDescription(opDesc);
     }
 
     /**
@@ -651,11 +659,12 @@ public class WindowedStream<T, K, W extends Window> {
         function = input.getExecutionEnvironment().clean(function);
         reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
 
-        final String opName = builder.generateOperatorName(reduceFunction, function);
+        final String opName = builder.generateOperatorName();
+        final String opDesc = builder.generateOperatorDescription(reduceFunction, function);
 
         OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
 
-        return input.transform(opName, resultType, operator);
+        return input.transform(opName, resultType, operator).setDescription(opDesc);
     }
 
     // ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
index 9fb1506..f9db7a3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
@@ -326,7 +326,11 @@ public class WindowOperatorBuilder<T, K, W extends Window> {
         }
     }
 
-    public String generateOperatorName(Function function1, @Nullable Function function2) {
+    public String generateOperatorName() {
+        return windowAssigner.getClass().getSimpleName();
+    }
+
+    public String generateOperatorDescription(Function function1, @Nullable Function function2) {
         return "Window("
                 + windowAssigner
                 + ", "
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 144d836..ea61b18 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -69,6 +69,8 @@ import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -1104,6 +1106,56 @@ public class DataStreamTest extends TestLogger {
                 });
     }
 
+    /** Tests that verifies window operator has different name and description. */
+    @Test
+    public void testWindowOperatorDescription() {
+        // global window
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<Long> dataStream1 =
+                env.generateSequence(0, 0)
+                        .windowAll(GlobalWindows.create())
+                        .trigger(PurgingTrigger.of(CountTrigger.of(10)))
+                        .reduce(
+                                new ReduceFunction<Long>() {
+                                    private static final long serialVersionUID = 1L;
+
+                                    @Override
+                                    public Long reduce(Long value1, Long value2) throws Exception {
+                                        return null;
+                                    }
+                                });
+        // name is simplified
+        assertEquals("GlobalWindows", dataStream1.getTransformation().getName());
+        // description contains detail of function:
+        // TriggerWindow(GlobalWindows(), ReducingStateDescriptor{name=window-contents,
+        // defaultValue=null,
+        // serializer=org.apache.flink.api.common.typeutils.base.LongSerializer@6af9fcb2},
+        // PurgingTrigger(CountTrigger(10)), AllWindowedStream.reduce(AllWindowedStream.java:229))
+        assertTrue(dataStream1.getTransformation().getDescription().contains("PurgingTrigger"));
+
+        // keyed window
+        DataStream<Long> dataStream2 =
+                env.generateSequence(0, 0)
+                        .keyBy(value -> value)
+                        .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
+                        .trigger(PurgingTrigger.of(CountTrigger.of(10)))
+                        .reduce(
+                                new ReduceFunction<Long>() {
+                                    private static final long serialVersionUID = 1L;
+
+                                    @Override
+                                    public Long reduce(Long value1, Long value2) throws Exception {
+                                        return null;
+                                    }
+                                });
+        // name is simplified
+        assertEquals("TumblingEventTimeWindows", dataStream2.getTransformation().getName());
+        // description contains detail of function:
+        // Window(TumblingEventTimeWindows(1000), PurgingTrigger, ReduceFunction$36,
+        // PassThroughWindowFunction)
+        assertTrue(dataStream2.getTransformation().getDescription().contains("PurgingTrigger"));
+    }
+
     /**
      * Tests {@link SingleOutputStreamOperator#setDescription(String)} functionality.
      *