You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/01/24 16:02:59 UTC
[flink] branch release-1.13 updated: [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 16feba6 [FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
16feba6 is described below
commit 16feba605c87645a658947805be50f6c410fde67
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jan 21 14:11:20 2022 +0100
[FLINK-25683][table] Pass MAX_WATERMARK in InputConversionOperator
MAX_WATERMARK emitted automatically has a special handling e.g. in BATCH
runtime mode. It flushes remaining records at the end of processing.
Therefore we should not discard those when converting to a Table
pipeline.
This closes #18405
---
.../runtime/operators/source/InputConversionOperator.java | 2 +-
.../operators/source/InputConversionOperatorTest.java | 14 ++++++++++++++
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
index 41bca4f..94b7fea 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/source/InputConversionOperator.java
@@ -78,7 +78,7 @@ public final class InputConversionOperator<E> extends TableStreamOperator<RowDat
@Override
public void processWatermark(Watermark mark) throws Exception {
- if (propagateWatermark) {
+ if (propagateWatermark || Watermark.MAX_WATERMARK.equals(mark)) {
super.processWatermark(mark);
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
index 8ffc6f4..0d1e24f 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.java
@@ -98,6 +98,20 @@ public class InputConversionOperatorTest {
operator.processWatermark(new Watermark(1000));
}
+ @Test(expected = NullPointerException.class)
+ public void testReceiveMaxWatermark() throws Exception {
+ final InputConversionOperator<Row> operator =
+ new InputConversionOperator<>(
+ createConverter(DataTypes.ROW(DataTypes.FIELD("f", DataTypes.INT()))),
+ false,
+ false,
+ false,
+ true);
+
+ // would throw an exception because it always emits Watermark.MAX_WATERMARK
+ operator.processWatermark(Watermark.MAX_WATERMARK);
+ }
+
private static DynamicTableSource.DataStructureConverter createConverter(DataType dataType) {
final DataStructureConverter<Object, Object> converter =
DataStructureConverters.getConverter(dataType);