You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/01/21 13:02:21 UTC
[flink] branch master updated: [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9aa879f [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
9aa879f is described below
commit 9aa879f50c32de862831e82613f1cf1bc4d760f9
Author: Yao Zhang <xz...@126.com>
AuthorDate: Thu Jan 20 18:28:33 2022 +0800
[FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
MAX_WATERMARK emitted automatically has a special handling e.g. in BATCH
runtime mode. It flushes remaining records at the end of processing.
Therefore we should not discard those when converting to a Table
pipeline.
This closes #18405
---
.../runtime/stream/sql/DataStreamJavaITCase.java | 36 ++++++++++++++++++++++
.../operators/source/InputConversionOperator.java | 2 +-
.../source/InputConversionOperatorTest.java | 14 +++++++++
3 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index aef4ebc..e8cbdf5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -31,6 +32,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -577,6 +580,39 @@ public class DataStreamJavaITCase extends AbstractTestBase {
}
@Test
+ public void testTableStreamConversionBatch() throws Exception {
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ DataStreamSource<Row> streamSource =
+ env.fromElements(
+ Row.of("Alice"),
+ Row.of("alice"),
+ Row.of("lily"),
+ Row.of("Bob"),
+ Row.of("lily"),
+ Row.of("lily"));
+ StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
+ Table sourceTable = tableEnvironment.fromDataStream(streamSource).as("word");
+ tableEnvironment.createTemporaryView("tmp_table", sourceTable);
+ Table resultTable = tableEnvironment.sqlQuery("select UPPER(word) as word from tmp_table");
+ SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream =
+ tableEnvironment
+ .toDataStream(resultTable)
+ .map(row -> (String) row.getField("word"))
+ .returns(TypeInformation.of(String.class))
+ .map(s -> new Tuple2<>(s, 1))
+ .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}))
+ .keyBy(tuple -> tuple.f0)
+ .sum(1);
+
+ testResult(
+ resultStream,
+ new Tuple2<>("ALICE", 2),
+ new Tuple2<>("BOB", 1),
+ new Tuple2<>("LILY", 3));
+ }
+
+ @Test
public void testComplexUnifiedPipelineStreaming() {
final Table resultTable = getComplexUnifiedPipeline(env);
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
index 41bca4f..94b7fea 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
@@ -78,7 +78,7 @@ public final class InputConversionOperator<E> extends TableStreamOperator<RowDat
@Override
public void processWatermark(Watermark mark) throws Exception {
- if (propagateWatermark) {
+ if (propagateWatermark || Watermark.MAX_WATERMARK.equals(mark)) {
super.processWatermark(mark);
}
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
index 8ffc6f4..0d1e24f 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
@@ -98,6 +98,20 @@ public class InputConversionOperatorTest {
operator.processWatermark(new Watermark(1000));
}
+ @Test(expected = NullPointerException.class)
+ public void testReceiveMaxWatermark() throws Exception {
+ final InputConversionOperator<Row> operator =
+ new InputConversionOperator<>(
+ createConverter(DataTypes.ROW(DataTypes.FIELD("f", DataTypes.INT()))),
+ false,
+ false,
+ false,
+ true);
+
+ // would throw an exception because it always emits Watermark.MAX_WATERMARK
+ operator.processWatermark(Watermark.MAX_WATERMARK);
+ }
+
private static DynamicTableSource.DataStructureConverter createConverter(DataType dataType) {
final DataStructureConverter<Object, Object> converter =
DataStructureConverters.getConverter(dataType);