You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/12 12:08:41 UTC

[flink] branch master updated: [FLINK-21469][runtime] Implement advanceToEndOfEventTime for MultipleInputStreamTask

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 62f91de  [FLINK-21469][runtime] Implement advanceToEndOfEventTime for MultipleInputStreamTask
62f91de is described below

commit 62f91de993eb1c74ef92a42608e688f5cb711724
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Mon May 10 13:46:43 2021 +0800

    [FLINK-21469][runtime] Implement advanceToEndOfEventTime for MultipleInputStreamTask
    
    For stop with savepoint, StreamTask#advanceToEndOfEventTime() is called (in source tasks)
    to advance to the max watermark. This PR implments advanceToEndOfEventTime for
    MultipleInputStreamTask chained sources.
---
 .../runtime/tasks/MultipleInputStreamTask.java     | 10 ++++
 .../streaming/runtime/tasks/OperatorChain.java     |  6 ++
 .../runtime/tasks/MultipleInputStreamTaskTest.java | 70 +++++++++++++---------
 3 files changed, 58 insertions(+), 28 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index 1337632..20daeaf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessorFactory;
 import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
@@ -39,6 +41,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil;
 import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import javax.annotation.Nullable;
 
@@ -264,4 +267,11 @@ public class MultipleInputStreamTask<OUT>
         }
         super.abortCheckpointOnBarrier(checkpointId, cause);
     }
+
+    @Override
+    protected void advanceToEndOfEventTime() throws Exception {
+        for (Output<StreamRecord<?>> sourceOutput : operatorChain.getChainedSourceOutputs()) {
+            sourceOutput.emitWatermark(Watermark.MAX_WATERMARK);
+        }
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index e6b3e7f..173cd32 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -484,6 +484,12 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
         return chainedSources.get(sourceInput).getSourceOutput();
     }
 
+    public List<Output<StreamRecord<?>>> getChainedSourceOutputs() {
+        return chainedSources.values().stream()
+                .map(ChainedSource::getSourceOutput)
+                .collect(Collectors.toList());
+    }
+
     public StreamTaskSourceInput<?> getSourceTaskInput(SourceInputConfig sourceInput) {
         checkArgument(
                 chainedSources.containsKey(sourceInput),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 57f109a..e282e27 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -91,6 +91,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -511,20 +512,7 @@ public class MultipleInputStreamTaskTest {
     @Test
     public void testWatermark() throws Exception {
         try (StreamTaskMailboxTestHarness<String> testHarness =
-                new StreamTaskMailboxTestHarnessBuilder<>(
-                                MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
-                        .modifyExecutionConfig(config -> config.enableObjectReuse())
-                        .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
-                        .addSourceInput(
-                                new SourceOperatorFactory<>(
-                                        new MockSource(
-                                                Boundedness.CONTINUOUS_UNBOUNDED, 2, true, false),
-                                        WatermarkStrategy.forGenerator(
-                                                ctx -> new RecordToWatermarkGenerator())))
-                        .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
-                        .setupOutputForSingletonOperatorChain(
-                                new MapToStringMultipleInputOperatorFactory(3))
-                        .build()) {
+                buildWatermarkTestHarness(2, false)) {
             ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
 
             int initialTime = 0;
@@ -601,20 +589,7 @@ public class MultipleInputStreamTaskTest {
     @Test
     public void testWatermarkAndStreamStatusForwarding() throws Exception {
         try (StreamTaskMailboxTestHarness<String> testHarness =
-                new StreamTaskMailboxTestHarnessBuilder<>(
-                                MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
-                        .modifyExecutionConfig(config -> config.enableObjectReuse())
-                        .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
-                        .addSourceInput(
-                                new SourceOperatorFactory<>(
-                                        new MockSource(
-                                                Boundedness.CONTINUOUS_UNBOUNDED, 2, true, true),
-                                        WatermarkStrategy.forGenerator(
-                                                ctx -> new RecordToWatermarkGenerator())))
-                        .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
-                        .setupOutputForSingletonOperatorChain(
-                                new MapToStringMultipleInputOperatorFactory(3))
-                        .build()) {
+                buildWatermarkTestHarness(2, true)) {
             ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
 
             int initialTime = 0;
@@ -678,6 +653,24 @@ public class MultipleInputStreamTaskTest {
     }
 
     @Test
+    public void testAdvanceToEndOfEventTime() throws Exception {
+        try (StreamTaskMailboxTestHarness<String> testHarness =
+                buildWatermarkTestHarness(2, false)) {
+            testHarness.processElement(Watermark.MAX_WATERMARK, 0, 0);
+            testHarness.processElement(Watermark.MAX_WATERMARK, 0, 1);
+
+            testHarness.getStreamTask().advanceToEndOfEventTime();
+
+            testHarness.processElement(Watermark.MAX_WATERMARK, 1, 0);
+
+            assertThat(testHarness.getOutput(), not(contains(Watermark.MAX_WATERMARK)));
+
+            testHarness.processElement(Watermark.MAX_WATERMARK, 1, 1);
+            assertThat(testHarness.getOutput(), contains(Watermark.MAX_WATERMARK));
+        }
+    }
+
+    @Test
     @SuppressWarnings("unchecked")
     public void testWatermarkMetrics() throws Exception {
         OperatorID mainOperatorId = new OperatorID();
@@ -1027,6 +1020,27 @@ public class MultipleInputStreamTaskTest {
                 .dispatchOperatorEvent(sourceOperatorID, new SerializedValue<>(addSplitEvent));
     }
 
+    private static StreamTaskMailboxTestHarness<String> buildWatermarkTestHarness(
+            int inputChannels, boolean readerMarkIdleOnNoSplits) throws Exception {
+        return new StreamTaskMailboxTestHarnessBuilder<>(
+                        MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+                .modifyExecutionConfig(config -> config.enableObjectReuse())
+                .addInput(BasicTypeInfo.STRING_TYPE_INFO, inputChannels)
+                .addSourceInput(
+                        new SourceOperatorFactory<>(
+                                new MockSource(
+                                        Boundedness.CONTINUOUS_UNBOUNDED,
+                                        2,
+                                        true,
+                                        readerMarkIdleOnNoSplits),
+                                WatermarkStrategy.forGenerator(
+                                        ctx -> new RecordToWatermarkGenerator())))
+                .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, inputChannels)
+                .setupOutputForSingletonOperatorChain(
+                        new MapToStringMultipleInputOperatorFactory(3))
+                .build();
+    }
+
     private static OperatorID getSourceOperatorID(
             StreamTaskMailboxTestHarness<String> testHarness, int sourceId) {
         StreamConfig.InputConfig[] inputs =