You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/03/29 11:05:07 UTC

[flink] branch release-1.13 updated: Revert "[FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus"

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new c9b98fd  Revert "[FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus"
c9b98fd is described below

commit c9b98fdb39ae45b91da3c78ca253c5343f54828e
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Mar 29 13:01:54 2022 +0200

    Revert "[FLINK-26708] TimestampsAndWatermarksOperator should not propagate WatermarkStatus"
    
    This reverts commit 1e723c568a76d6b37b08404575878a320f5c7f82.
---
 .../runtime/operators/TimestampsAndWatermarksOperator.java |  4 ----
 .../operators/TimestampsAndWatermarksOperatorTest.java     | 14 --------------
 .../streaming/util/OneInputStreamOperatorTestHarness.java  | 11 -----------
 3 files changed, 29 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index 757338a..86c2dfe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -127,10 +127,6 @@ public class TimestampsAndWatermarksOperator<T> extends AbstractStreamOperator<T
         }
     }
 
-    /** Override the base implementation to completely ignore statuses propagated from upstream. */
-    @Override
-    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {}
-
     @Override
     public void close() throws Exception {
         super.close();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index b7f3461..555a1a6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.junit.Test;
@@ -58,19 +57,6 @@ public class TimestampsAndWatermarksOperatorTest {
     }
 
     @Test
-    public void inputStatusesAreNotForwarded() throws Exception {
-        OneInputStreamOperatorTestHarness<Long, Long> testHarness =
-                createTestHarness(
-                        WatermarkStrategy.forGenerator((ctx) -> new PeriodicWatermarkGenerator())
-                                .withTimestampAssigner((ctx) -> new LongExtractor()));
-
-        testHarness.processWatermarkStatus(WatermarkStatus.IDLE);
-        testHarness.setProcessingTime(AUTO_WATERMARK_INTERVAL);
-
-        assertThat(testHarness.getOutput(), empty());
-    }
-
-    @Test
     public void longMaxInputWatermarkIsForwarded() throws Exception {
         OneInputStreamOperatorTestHarness<Long, Long> testHarness =
                 createTestHarness(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 72351df..962f3e5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -215,16 +214,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
         processWatermark(new Watermark(watermark));
     }
 
-    public void processWatermarkStatus(WatermarkStatus status) throws Exception {
-        if (inputs.isEmpty()) {
-            getOneInputOperator().processWatermarkStatus(status);
-        } else {
-            checkState(inputs.size() == 1);
-            Input input = inputs.get(0);
-            input.processWatermarkStatus(status);
-        }
-    }
-
     public void processWatermark(Watermark mark) throws Exception {
         currentWatermark = mark.getTimestamp();
         if (inputs.isEmpty()) {