You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/01/24 16:02:59 UTC

[flink] branch release-1.13 updated: [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 16feba6  [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
16feba6 is described below

commit 16feba605c87645a658947805be50f6c410fde67
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jan 21 14:11:20 2022 +0100

    [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
    
    MAX_WATERMARK emitted automatically has a special handling e.g. in BATCH
    runtime mode. It flushes remaining records at the end of processing.
    Therefore we should not discard those when converting to a Table
    pipeline.
    
    This closes #18405
---
 .../runtime/operators/source/InputConversionOperator.java  |  2 +-
 .../operators/source/InputConversionOperatorTest.java      | 14 ++++++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
index 41bca4f..94b7fea 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
@@ -78,7 +78,7 @@ public final class InputConversionOperator<E> extends TableStreamOperator<RowDat
 
     @Override
     public void processWatermark(Watermark mark) throws Exception {
-        if (propagateWatermark) {
+        if (propagateWatermark || Watermark.MAX_WATERMARK.equals(mark)) {
             super.processWatermark(mark);
         }
     }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
index 8ffc6f4..0d1e24f 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
@@ -98,6 +98,20 @@ public class InputConversionOperatorTest {
         operator.processWatermark(new Watermark(1000));
     }
 
+    @Test(expected = NullPointerException.class)
+    public void testReceiveMaxWatermark() throws Exception {
+        final InputConversionOperator<Row> operator =
+                new InputConversionOperator<>(
+                        createConverter(DataTypes.ROW(DataTypes.FIELD("f", DataTypes.INT()))),
+                        false,
+                        false,
+                        false,
+                        true);
+
+        // would throw an exception because it always emits Watermark.MAX_WATERMARK
+        operator.processWatermark(Watermark.MAX_WATERMARK);
+    }
+
     private static DynamicTableSource.DataStructureConverter createConverter(DataType dataType) {
         final DataStructureConverter<Object, Object> converter =
                 DataStructureConverters.getConverter(dataType);