You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/29 11:05:07 UTC
[flink] branch release-1.13 updated: Revert "[FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus"
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new c9b98fd Revert "[FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus"
c9b98fd is described below
commit c9b98fdb39ae45b91da3c78ca253c5343f54828e
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Mar 29 13:01:54 2022 +0200
Revert "[FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus"
This reverts commit 1e723c568a76d6b37b08404575878a320f5c7f82.
---
.../runtime/operators/TimestampsAndWatermarksOperator.java | 4 ----
.../operators/TimestampsAndWatermarksOperatorTest.java | 14 --------------
.../streaming/util/OneInputStreamOperatorTestHarness.java | 11 -----------
3 files changed, 29 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index 757338a..86c2dfe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -127,10 +127,6 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T
}
}
- /** Override the base implementation to completely ignore statuses propagated from upstream. */
- @Override
- public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {}
-
@Override
public void close() throws Exception {
super.close();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index b7f3461..555a1a6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Test;
@@ -58,19 +57,6 @@ public class TimestampsAndWatermarksOperatorTest {
}
@Test
- public void inputStatusesAreNotForwarded() throws Exception {
- OneInputStreamOperatorTestHarness<Long, Long> testHarness =
- createTestHarness(
- WatermarkStrategy.forGenerator((ctx) -> new PeriodicWatermarkGenerator())
- .withTimestampAssigner((ctx) -> new LongExtractor()));
-
- testHarness.processWatermarkStatus(WatermarkStatus.IDLE);
- testHarness.setProcessingTime(AUTO_WATERMARK_INTERVAL);
-
- assertThat(testHarness.getOutput(), empty());
- }
-
- @Test
public void longMaxInputWatermarkIsForwarded() throws Exception {
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
createTestHarness(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 72351df..962f3e5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
@@ -215,16 +214,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
processWatermark(new Watermark(watermark));
}
- public void processWatermarkStatus(WatermarkStatus status) throws Exception {
- if (inputs.isEmpty()) {
- getOneInputOperator().processWatermarkStatus(status);
- } else {
- checkState(inputs.size() == 1);
- Input input = inputs.get(0);
- input.processWatermarkStatus(status);
- }
- }
-
public void processWatermark(Watermark mark) throws Exception {
currentWatermark = mark.getTimestamp();
if (inputs.isEmpty()) {