You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/12 12:08:41 UTC
[flink] branch master updated: [FLINK-21469][runtime] Implement
advanceToEndOfEventTime for MultipleInputStreamTask
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 62f91de [FLINK-21469][runtime] Implement advanceToEndOfEventTime for MultipleInputStreamTask
62f91de is described below
commit 62f91de993eb1c74ef92a42608e688f5cb711724
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Mon May 10 13:46:43 2021 +0800
[FLINK-21469][runtime] Implement advanceToEndOfEventTime for MultipleInputStreamTask
For stop with savepoint, StreamTask#advanceToEndOfEventTime() is called (in source tasks)
to advance to the max watermark. This PR implments advanceToEndOfEventTime for
MultipleInputStreamTask chained sources.
---
.../runtime/tasks/MultipleInputStreamTask.java | 10 ++++
.../streaming/runtime/tasks/OperatorChain.java | 6 ++
.../runtime/tasks/MultipleInputStreamTaskTest.java | 70 +++++++++++++---------
3 files changed, 58 insertions(+), 28 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
index 1337632..20daeaf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
@@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessorFactory;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
@@ -39,6 +41,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import javax.annotation.Nullable;
@@ -264,4 +267,11 @@ public class MultipleInputStreamTask<OUT>
}
super.abortCheckpointOnBarrier(checkpointId, cause);
}
+
+ @Override
+ protected void advanceToEndOfEventTime() throws Exception {
+ for (Output<StreamRecord<?>> sourceOutput : operatorChain.getChainedSourceOutputs()) {
+ sourceOutput.emitWatermark(Watermark.MAX_WATERMARK);
+ }
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index e6b3e7f..173cd32 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -484,6 +484,12 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
return chainedSources.get(sourceInput).getSourceOutput();
}
+ public List<Output<StreamRecord<?>>> getChainedSourceOutputs() {
+ return chainedSources.values().stream()
+ .map(ChainedSource::getSourceOutput)
+ .collect(Collectors.toList());
+ }
+
public StreamTaskSourceInput<?> getSourceTaskInput(SourceInputConfig sourceInput) {
checkArgument(
chainedSources.containsKey(sourceInput),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 57f109a..e282e27 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -91,6 +91,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -511,20 +512,7 @@ public class MultipleInputStreamTaskTest {
@Test
public void testWatermark() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new StreamTaskMailboxTestHarnessBuilder<>(
- MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
- .modifyExecutionConfig(config -> config.enableObjectReuse())
- .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
- .addSourceInput(
- new SourceOperatorFactory<>(
- new MockSource(
- Boundedness.CONTINUOUS_UNBOUNDED, 2, true, false),
- WatermarkStrategy.forGenerator(
- ctx -> new RecordToWatermarkGenerator())))
- .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
- .setupOutputForSingletonOperatorChain(
- new MapToStringMultipleInputOperatorFactory(3))
- .build()) {
+ buildWatermarkTestHarness(2, false)) {
ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
int initialTime = 0;
@@ -601,20 +589,7 @@ public class MultipleInputStreamTaskTest {
@Test
public void testWatermarkAndStreamStatusForwarding() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new StreamTaskMailboxTestHarnessBuilder<>(
- MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
- .modifyExecutionConfig(config -> config.enableObjectReuse())
- .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
- .addSourceInput(
- new SourceOperatorFactory<>(
- new MockSource(
- Boundedness.CONTINUOUS_UNBOUNDED, 2, true, true),
- WatermarkStrategy.forGenerator(
- ctx -> new RecordToWatermarkGenerator())))
- .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
- .setupOutputForSingletonOperatorChain(
- new MapToStringMultipleInputOperatorFactory(3))
- .build()) {
+ buildWatermarkTestHarness(2, true)) {
ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
int initialTime = 0;
@@ -678,6 +653,24 @@ public class MultipleInputStreamTaskTest {
}
@Test
+ public void testAdvanceToEndOfEventTime() throws Exception {
+ try (StreamTaskMailboxTestHarness<String> testHarness =
+ buildWatermarkTestHarness(2, false)) {
+ testHarness.processElement(Watermark.MAX_WATERMARK, 0, 0);
+ testHarness.processElement(Watermark.MAX_WATERMARK, 0, 1);
+
+ testHarness.getStreamTask().advanceToEndOfEventTime();
+
+ testHarness.processElement(Watermark.MAX_WATERMARK, 1, 0);
+
+ assertThat(testHarness.getOutput(), not(contains(Watermark.MAX_WATERMARK)));
+
+ testHarness.processElement(Watermark.MAX_WATERMARK, 1, 1);
+ assertThat(testHarness.getOutput(), contains(Watermark.MAX_WATERMARK));
+ }
+ }
+
+ @Test
@SuppressWarnings("unchecked")
public void testWatermarkMetrics() throws Exception {
OperatorID mainOperatorId = new OperatorID();
@@ -1027,6 +1020,27 @@ public class MultipleInputStreamTaskTest {
.dispatchOperatorEvent(sourceOperatorID, new SerializedValue<>(addSplitEvent));
}
+ private static StreamTaskMailboxTestHarness<String> buildWatermarkTestHarness(
+ int inputChannels, boolean readerMarkIdleOnNoSplits) throws Exception {
+ return new StreamTaskMailboxTestHarnessBuilder<>(
+ MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ .modifyExecutionConfig(config -> config.enableObjectReuse())
+ .addInput(BasicTypeInfo.STRING_TYPE_INFO, inputChannels)
+ .addSourceInput(
+ new SourceOperatorFactory<>(
+ new MockSource(
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ 2,
+ true,
+ readerMarkIdleOnNoSplits),
+ WatermarkStrategy.forGenerator(
+ ctx -> new RecordToWatermarkGenerator())))
+ .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, inputChannels)
+ .setupOutputForSingletonOperatorChain(
+ new MapToStringMultipleInputOperatorFactory(3))
+ .build();
+ }
+
private static OperatorID getSourceOperatorID(
StreamTaskMailboxTestHarness<String> testHarness, int sourceId) {
StreamConfig.InputConfig[] inputs =