You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/01/21 13:02:21 UTC

[flink] branch master updated: [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aa879f  [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
9aa879f is described below

commit 9aa879f50c32de862831e82613f1cf1bc4d760f9
Author: Yao Zhang <xz...@126.com>
AuthorDate: Thu Jan 20 18:28:33 2022 +0800

    [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
    
    MAX_WATERMARK emitted automatically has a special handling e.g. in BATCH
    runtime mode. It flushes remaining records at the end of processing.
    Therefore we should not discard those when converting to a Table
    pipeline.
    
    This closes #18405
---
 .../runtime/stream/sql/DataStreamJavaITCase.java   | 36 ++++++++++++++++++++++
 .../operators/source/InputConversionOperator.java  |  2 +-
 .../source/InputConversionOperatorTest.java        | 14 +++++++++
 3 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index aef4ebc..e8cbdf5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -31,6 +32,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -577,6 +580,39 @@ public class DataStreamJavaITCase extends AbstractTestBase {
     }
 
     @Test
+    public void testTableStreamConversionBatch() throws Exception {
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        DataStreamSource<Row> streamSource =
+                env.fromElements(
+                        Row.of("Alice"),
+                        Row.of("alice"),
+                        Row.of("lily"),
+                        Row.of("Bob"),
+                        Row.of("lily"),
+                        Row.of("lily"));
+        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
+        Table sourceTable = tableEnvironment.fromDataStream(streamSource).as("word");
+        tableEnvironment.createTemporaryView("tmp_table", sourceTable);
+        Table resultTable = tableEnvironment.sqlQuery("select UPPER(word) as word from tmp_table");
+        SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream =
+                tableEnvironment
+                        .toDataStream(resultTable)
+                        .map(row -> (String) row.getField("word"))
+                        .returns(TypeInformation.of(String.class))
+                        .map(s -> new Tuple2<>(s, 1))
+                        .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}))
+                        .keyBy(tuple -> tuple.f0)
+                        .sum(1);
+
+        testResult(
+                resultStream,
+                new Tuple2<>("ALICE", 2),
+                new Tuple2<>("BOB", 1),
+                new Tuple2<>("LILY", 3));
+    }
+
+    @Test
     public void testComplexUnifiedPipelineStreaming() {
         final Table resultTable = getComplexUnifiedPipeline(env);
 
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
index 41bca4f..94b7fea 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
@@ -78,7 +78,7 @@ public final class InputConversionOperator<E> extends TableStreamOperator<RowDat
 
     @Override
     public void processWatermark(Watermark mark) throws Exception {
-        if (propagateWatermark) {
+        if (propagateWatermark || Watermark.MAX_WATERMARK.equals(mark)) {
             super.processWatermark(mark);
         }
     }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
index 8ffc6f4..0d1e24f 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
@@ -98,6 +98,20 @@ public class InputConversionOperatorTest {
         operator.processWatermark(new Watermark(1000));
     }
 
+    @Test(expected = NullPointerException.class)
+    public void testReceiveMaxWatermark() throws Exception {
+        final InputConversionOperator<Row> operator =
+                new InputConversionOperator<>(
+                        createConverter(DataTypes.ROW(DataTypes.FIELD("f", DataTypes.INT()))),
+                        false,
+                        false,
+                        false,
+                        true);
+
+        // would throw an exception because it always emits Watermark.MAX_WATERMARK
+        operator.processWatermark(Watermark.MAX_WATERMARK);
+    }
+
     private static DynamicTableSource.DataStructureConverter createConverter(DataType dataType) {
         final DataStructureConverter<Object, Object> converter =
                 DataStructureConverters.getConverter(dataType);