You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/01/04 09:08:18 UTC
[flink] branch master updated: [FLINK-25074][datastream] Simplify name of WindowOperator
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f352dca [FLINK-25074][datastream] Simplify name of WindowOperator
f352dca is described below
commit f352dca3724deb8a42b36102bc298b58158df90d
Author: 龙三 <we...@alibaba-inc.com>
AuthorDate: Tue Nov 23 17:14:23 2021 +0800
[FLINK-25074][datastream] Simplify name of WindowOperator
This closes #18056.
---
.../api/datastream/AllWindowedStream.java | 22 +++++----
.../streaming/api/datastream/WindowedStream.java | 37 +++++++++------
.../operators/windowing/WindowOperatorBuilder.java | 6 ++-
.../apache/flink/streaming/api/DataStreamTest.java | 52 ++++++++++++++++++++++
4 files changed, 94 insertions(+), 23 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index de9299c..9303218 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -259,7 +259,8 @@ public class AllWindowedStream<T, W extends Window> {
String callLocation = Utils.getCallLocationName();
String udfName = "AllWindowedStream." + callLocation;
- String opName;
+ String opName = windowAssigner.getClass().getSimpleName();
+ String opDescription;
KeySelector<T, Byte> keySel = input.getKeySelector();
OneInputStreamOperator<T, R> operator;
@@ -276,7 +277,7 @@ public class AllWindowedStream<T, W extends Window> {
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
- opName =
+ opDescription =
"TriggerWindow("
+ windowAssigner
+ ", "
@@ -313,7 +314,7 @@ public class AllWindowedStream<T, W extends Window> {
input.getType()
.createSerializer(getExecutionEnvironment().getConfig()));
- opName =
+ opDescription =
"TriggerWindow("
+ windowAssigner
+ ", "
@@ -339,7 +340,9 @@ public class AllWindowedStream<T, W extends Window> {
lateDataOutputTag);
}
- return input.transform(opName, resultType, operator).forceNonParallel();
+ return input.transform(opName, resultType, operator)
+ .setDescription(opDescription)
+ .forceNonParallel();
}
/**
@@ -808,7 +811,8 @@ public class AllWindowedStream<T, W extends Window> {
final String callLocation = Utils.getCallLocationName();
final String udfName = "AllWindowedStream." + callLocation;
- final String opName;
+ final String opName = windowAssigner.getClass().getSimpleName();
+ final String opDescription;
final KeySelector<T, Byte> keySel = input.getKeySelector();
OneInputStreamOperator<T, R> operator;
@@ -825,7 +829,7 @@ public class AllWindowedStream<T, W extends Window> {
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
- opName =
+ opDescription =
"TriggerWindow("
+ windowAssigner
+ ", "
@@ -862,7 +866,7 @@ public class AllWindowedStream<T, W extends Window> {
accumulatorType.createSerializer(
getExecutionEnvironment().getConfig()));
- opName =
+ opDescription =
"TriggerWindow("
+ windowAssigner
+ ", "
@@ -888,7 +892,9 @@ public class AllWindowedStream<T, W extends Window> {
lateDataOutputTag);
}
- return input.transform(opName, resultType, operator).forceNonParallel();
+ return input.transform(opName, resultType, operator)
+ .setDescription(opDescription)
+ .forceNonParallel();
}
// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 0e63d66..efec571 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -211,10 +211,11 @@ public class WindowedStream<T, K, W extends Window> {
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
- final String opName = builder.generateOperatorName(reduceFunction, function);
+ final String opName = builder.generateOperatorName();
+ final String opDescription = builder.generateOperatorDescription(reduceFunction, function);
OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
- return input.transform(opName, resultType, operator);
+ return input.transform(opName, resultType, operator).setDescription(opDescription);
}
/**
@@ -258,10 +259,11 @@ public class WindowedStream<T, K, W extends Window> {
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
- final String opName = builder.generateOperatorName(reduceFunction, function);
+ final String opName = builder.generateOperatorName();
+ final String opDescription = builder.generateOperatorDescription(reduceFunction, function);
OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
- return input.transform(opName, resultType, operator);
+ return input.transform(opName, resultType, operator).setDescription(opDescription);
}
// ------------------------------------------------------------------------
@@ -405,12 +407,14 @@ public class WindowedStream<T, K, W extends Window> {
windowFunction = input.getExecutionEnvironment().clean(windowFunction);
aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
- final String opName = builder.generateOperatorName(aggregateFunction, windowFunction);
+ final String opName = builder.generateOperatorName();
+ final String opDescription =
+ builder.generateOperatorDescription(aggregateFunction, windowFunction);
OneInputStreamOperator<T, R> operator =
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
- return input.transform(opName, resultType, operator);
+ return input.transform(opName, resultType, operator).setDescription(opDescription);
}
/**
@@ -514,12 +518,14 @@ public class WindowedStream<T, K, W extends Window> {
windowFunction = input.getExecutionEnvironment().clean(windowFunction);
aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
- final String opName = builder.generateOperatorName(aggregateFunction, windowFunction);
+ final String opName = builder.generateOperatorName();
+ final String opDescription =
+ builder.generateOperatorDescription(aggregateFunction, windowFunction);
OneInputStreamOperator<T, R> operator =
builder.aggregate(aggregateFunction, windowFunction, accumulatorType);
- return input.transform(opName, resultType, operator);
+ return input.transform(opName, resultType, operator).setDescription(opDescription);
}
// ------------------------------------------------------------------------
@@ -559,10 +565,11 @@ public class WindowedStream<T, K, W extends Window> {
WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
function = input.getExecutionEnvironment().clean(function);
- final String opName = builder.generateOperatorName(function, null);
+ final String opName = builder.generateOperatorName();
+ final String opDescription = builder.generateOperatorDescription(function, null);
OneInputStreamOperator<T, R> operator = builder.apply(function);
- return input.transform(opName, resultType, operator);
+ return input.transform(opName, resultType, operator).setDescription(opDescription);
}
/**
@@ -601,11 +608,12 @@ public class WindowedStream<T, K, W extends Window> {
ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
function = input.getExecutionEnvironment().clean(function);
- final String opName = builder.generateOperatorName(function, null);
+ final String opName = builder.generateOperatorName();
+ final String opDesc = builder.generateOperatorDescription(function, null);
OneInputStreamOperator<T, R> operator = builder.process(function);
- return input.transform(opName, resultType, operator);
+ return input.transform(opName, resultType, operator).setDescription(opDesc);
}
/**
@@ -651,11 +659,12 @@ public class WindowedStream<T, K, W extends Window> {
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
- final String opName = builder.generateOperatorName(reduceFunction, function);
+ final String opName = builder.generateOperatorName();
+ final String opDesc = builder.generateOperatorDescription(reduceFunction, function);
OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
- return input.transform(opName, resultType, operator);
+ return input.transform(opName, resultType, operator).setDescription(opDesc);
}
// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
index 9fb1506..f9db7a3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
@@ -326,7 +326,11 @@ public class WindowOperatorBuilder<T, K, W extends Window> {
}
}
- public String generateOperatorName(Function function1, @Nullable Function function2) {
+ public String generateOperatorName() {
+ return windowAssigner.getClass().getSimpleName();
+ }
+
+ public String generateOperatorDescription(Function function1, @Nullable Function function2) {
return "Window("
+ windowAssigner
+ ", "
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 144d836..ea61b18 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -69,6 +69,8 @@ import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -1104,6 +1106,56 @@ public class DataStreamTest extends TestLogger {
});
}
+ /** Tests that verifies window operator has different name and description. */
+ @Test
+ public void testWindowOperatorDescription() {
+ // global window
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Long> dataStream1 =
+ env.generateSequence(0, 0)
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of(10)))
+ .reduce(
+ new ReduceFunction<Long>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long reduce(Long value1, Long value2) throws Exception {
+ return null;
+ }
+ });
+ // name is simplified
+ assertEquals("GlobalWindows", dataStream1.getTransformation().getName());
+ // description contains detail of function:
+ // TriggerWindow(GlobalWindows(), ReducingStateDescriptor{name=window-contents,
+ // defaultValue=null,
+ // serializer=org.apache.flink.api.common.typeutils.base.LongSerializer@6af9fcb2},
+ // PurgingTrigger(CountTrigger(10)), AllWindowedStream.reduce(AllWindowedStream.java:229))
+ assertTrue(dataStream1.getTransformation().getDescription().contains("PurgingTrigger"));
+
+ // keyed window
+ DataStream<Long> dataStream2 =
+ env.generateSequence(0, 0)
+ .keyBy(value -> value)
+ .window(TumblingEventTimeWindows.of(Time.milliseconds(1000)))
+ .trigger(PurgingTrigger.of(CountTrigger.of(10)))
+ .reduce(
+ new ReduceFunction<Long>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long reduce(Long value1, Long value2) throws Exception {
+ return null;
+ }
+ });
+ // name is simplified
+ assertEquals("TumblingEventTimeWindows", dataStream2.getTransformation().getName());
+ // description contains detail of function:
+ // Window(TumblingEventTimeWindows(1000), PurgingTrigger, ReduceFunction$36,
+ // PassThroughWindowFunction)
+ assertTrue(dataStream2.getTransformation().getDescription().contains("PurgingTrigger"));
+ }
+
/**
* Tests {@link SingleOutputStreamOperator#setDescription(String)} functionality.
*