You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/03/28 08:19:47 UTC
[flink] branch release-1.15 updated: [FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new f858421 [FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus
f858421 is described below
commit f858421d67a27c155accdd7da9117c29541f8939
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Mar 17 15:25:32 2022 +0100
[FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus
---
.../runtime/operators/TimestampsAndWatermarksOperator.java | 4 ++++
.../operators/TimestampsAndWatermarksOperatorTest.java | 14 ++++++++++++++
.../streaming/util/OneInputStreamOperatorTestHarness.java | 11 +++++++++++
3 files changed, 29 insertions(+)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index a10c92e..570cb6b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -126,6 +126,10 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T
}
}
+ /** Override the base implementation to completely ignore statuses propagated from upstream. */
+ @Override
+ public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {}
+
@Override
public void finish() throws Exception {
super.finish();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index 555a1a6..b7f3461 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Test;
@@ -57,6 +58,19 @@ public class TimestampsAndWatermarksOperatorTest {
}
@Test
+ public void inputStatusesAreNotForwarded() throws Exception {
+ OneInputStreamOperatorTestHarness<Long, Long> testHarness =
+ createTestHarness(
+ WatermarkStrategy.forGenerator((ctx) -> new PeriodicWatermarkGenerator())
+ .withTimestampAssigner((ctx) -> new LongExtractor()));
+
+ testHarness.processWatermarkStatus(WatermarkStatus.IDLE);
+ testHarness.setProcessingTime(AUTO_WATERMARK_INTERVAL);
+
+ assertThat(testHarness.getOutput(), empty());
+ }
+
+ @Test
public void longMaxInputWatermarkIsForwarded() throws Exception {
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
createTestHarness(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index a078a08..a65667d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
@@ -225,6 +226,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
processWatermark(new Watermark(watermark));
}
+ public void processWatermarkStatus(WatermarkStatus status) throws Exception {
+ if (inputs.isEmpty()) {
+ getOneInputOperator().processWatermarkStatus(status);
+ } else {
+ checkState(inputs.size() == 1);
+ Input input = inputs.get(0);
+ input.processWatermarkStatus(status);
+ }
+ }
+
public void processWatermark(Watermark mark) throws Exception {
currentWatermark = mark.getTimestamp();
if (inputs.isEmpty()) {