You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/03/28 08:19:47 UTC

[flink] branch release-1.15 updated: [FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new f858421  [FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus
f858421 is described below

commit f858421d67a27c155accdd7da9117c29541f8939
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Mar 17 15:25:32 2022 +0100

    [FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus
---
 .../runtime/operators/TimestampsAndWatermarksOperator.java |  4 ++++
 .../operators/TimestampsAndWatermarksOperatorTest.java     | 14 ++++++++++++++
 .../streaming/util/OneInputStreamOperatorTestHarness.java  | 11 +++++++++++
 3 files changed, 29 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index a10c92e..570cb6b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -126,6 +126,10 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T
         }
     }
 
+    /** Override the base implementation to completely ignore statuses propagated from upstream. */
+    @Override
+    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {}
+
     @Override
     public void finish() throws Exception {
         super.finish();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index 555a1a6..b7f3461 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.junit.Test;
@@ -57,6 +58,19 @@ public class TimestampsAndWatermarksOperatorTest {
     }
 
     @Test
+    public void inputStatusesAreNotForwarded() throws Exception {
+        OneInputStreamOperatorTestHarness<Long, Long> testHarness =
+                createTestHarness(
+                        WatermarkStrategy.forGenerator((ctx) -> new PeriodicWatermarkGenerator())
+                                .withTimestampAssigner((ctx) -> new LongExtractor()));
+
+        testHarness.processWatermarkStatus(WatermarkStatus.IDLE);
+        testHarness.setProcessingTime(AUTO_WATERMARK_INTERVAL);
+
+        assertThat(testHarness.getOutput(), empty());
+    }
+
+    @Test
     public void longMaxInputWatermarkIsForwarded() throws Exception {
         OneInputStreamOperatorTestHarness<Long, Long> testHarness =
                 createTestHarness(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index a078a08..a65667d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -225,6 +226,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
         processWatermark(new Watermark(watermark));
     }
 
+    public void processWatermarkStatus(WatermarkStatus status) throws Exception {
+        if (inputs.isEmpty()) {
+            getOneInputOperator().processWatermarkStatus(status);
+        } else {
+            checkState(inputs.size() == 1);
+            Input input = inputs.get(0);
+            input.processWatermarkStatus(status);
+        }
+    }
+
     public void processWatermark(Watermark mark) throws Exception {
         currentWatermark = mark.getTimestamp();
         if (inputs.isEmpty()) {