You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/09/14 06:41:59 UTC
[flink] branch release-1.16 updated: [FLINK-28975][connector/base] Add IdlenessManager for main and per-split output in source operator to mark idleness only if both outputs are idle
This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new a5be641a9a9 [FLINK-28975][connector/base] Add IdlenessManager for main and per-split output in source operator to mark idleness only if both outputs are idle
a5be641a9a9 is described below
commit a5be641a9a95de631dc697f3d2906e46cb126ee5
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Mon Sep 5 18:13:06 2022 +0800
[FLINK-28975][connector/base] Add IdlenessManager for main and per-split output in source operator to mark idleness only if both outputs are idle
---
.../common/eventtime/WatermarksWithIdleness.java | 8 +-
.../source/ProgressiveTimestampsAndWatermarks.java | 70 +++++++++-
.../source/SourceOperatorEventTimeTest.java | 155 +++++++++++++++------
3 files changed, 187 insertions(+), 46 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
index 8a4858b291f..3d61217ce04 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
@@ -40,6 +40,8 @@ public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {
private final IdlenessTimer idlenessTimer;
+ private boolean isIdleNow = false;
+
/**
* Creates a new WatermarksWithIdleness generator to the given generator idleness detection with
* the given timeout.
@@ -65,12 +67,16 @@ public class WatermarksWithIdleness<T> implements WatermarkGenerator<T> {
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
watermarks.onEvent(event, eventTimestamp, output);
idlenessTimer.activity();
+ isIdleNow = false;
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
if (idlenessTimer.checkIfIdle()) {
- output.markIdle();
+ if (!isIdleNow) {
+ output.markIdle();
+ isIdleNow = true;
+ }
} else {
watermarks.onPeriodicEmit(output);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
index 6f9fa9eeb61..5693f180d6f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
@@ -100,13 +101,15 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater
"already created a main output");
final WatermarkOutput watermarkOutput = new WatermarkToDataOutput(output, watermarkEmitted);
+ IdlenessManager idlenessManager = new IdlenessManager(watermarkOutput);
+
final WatermarkGenerator<T> watermarkGenerator =
watermarksFactory.createWatermarkGenerator(watermarksContext);
currentPerSplitOutputs =
new SplitLocalOutputs<>(
output,
- watermarkOutput,
+ idlenessManager.getSplitLocalOutput(),
timestampAssigner,
watermarksFactory,
watermarksContext);
@@ -114,7 +117,7 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater
currentMainOutput =
new StreamingReaderOutput<>(
output,
- watermarkOutput,
+ idlenessManager.getMainOutput(),
timestampAssigner,
watermarkGenerator,
currentPerSplitOutputs);
@@ -260,4 +263,67 @@ public class ProgressiveTimestampsAndWatermarks<T> implements TimestampsAndWater
watermarkMultiplexer.onPeriodicEmit();
}
}
+
+ /**
+ * A helper class for managing idleness status of the underlying output.
+ *
+ * <p>This class tracks the idleness status of main and split-local output, and only marks the
+ * underlying output as idle if both main and per-split output are idle.
+ *
+ * <p>The reason of adding this manager is that the implementation of source reader might only
+ * use one of main or split-local output for emitting records and watermarks, and we could avoid
+ * watermark generator on the vacant output keep marking the underlying output as idle.
+ */
+ private static class IdlenessManager {
+ private final WatermarkOutput underlyingOutput;
+ private final IdlenessAwareWatermarkOutput splitLocalOutput;
+ private final IdlenessAwareWatermarkOutput mainOutput;
+
+ IdlenessManager(WatermarkOutput underlyingOutput) {
+ this.underlyingOutput = underlyingOutput;
+ this.splitLocalOutput = new IdlenessAwareWatermarkOutput(underlyingOutput);
+ this.mainOutput = new IdlenessAwareWatermarkOutput(underlyingOutput);
+ }
+
+ IdlenessAwareWatermarkOutput getSplitLocalOutput() {
+ return splitLocalOutput;
+ }
+
+ IdlenessAwareWatermarkOutput getMainOutput() {
+ return mainOutput;
+ }
+
+ void maybeMarkUnderlyingOutputAsIdle() {
+ if (splitLocalOutput.isIdle && mainOutput.isIdle) {
+ underlyingOutput.markIdle();
+ }
+ }
+
+ private class IdlenessAwareWatermarkOutput implements WatermarkOutput {
+ private final WatermarkOutput underlyingOutput;
+ private boolean isIdle = true;
+
+ private IdlenessAwareWatermarkOutput(WatermarkOutput underlyingOutput) {
+ this.underlyingOutput = underlyingOutput;
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ underlyingOutput.emitWatermark(watermark);
+ isIdle = false;
+ }
+
+ @Override
+ public void markIdle() {
+ isIdle = true;
+ maybeMarkUnderlyingOutputAsIdle();
+ }
+
+ @Override
+ public void markActive() {
+ isIdle = false;
+ underlyingOutput.markActive();
+ }
+ }
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index 959bc82402f..9e295abc50f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -28,17 +28,15 @@ import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -46,30 +44,17 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.flink.streaming.api.operators.source.TestingSourceOperator.createTestOperator;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.hasSize;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests that validate correct handling of watermark generation in the {@link ReaderOutput} as
* created by the {@link ProgressiveTimestampsAndWatermarks}.
*/
-@RunWith(Parameterized.class)
-public class SourceOperatorEventTimeTest {
-
- @Parameterized.Parameters(name = "Emit progressive watermarks: {0}")
- public static Collection<Boolean> parameters() {
- return Arrays.asList(true, false);
- }
-
- private final boolean emitProgressiveWatermarks;
+class SourceOperatorEventTimeTest {
- public SourceOperatorEventTimeTest(boolean emitProgressiveWatermarks) {
- this.emitProgressiveWatermarks = emitProgressiveWatermarks;
- }
-
- @Test
- public void testMainOutputPeriodicWatermarks() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testMainOutputPeriodicWatermarks(boolean emitProgressiveWatermarks) throws Exception {
final WatermarkStrategy<Integer> watermarkStrategy =
WatermarkStrategy.forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>());
@@ -81,11 +66,13 @@ public class SourceOperatorEventTimeTest {
(output) -> output.collect(0, 120L),
(output) -> output.collect(0, 110L));
- assertWatermarksOrEmpty(result, new Watermark(100L), new Watermark(120L));
+ assertWatermarksOrEmpty(
+ emitProgressiveWatermarks, result, new Watermark(100L), new Watermark(120L));
}
- @Test
- public void testMainOutputEventWatermarks() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testMainOutputEventWatermarks(boolean emitProgressiveWatermarks) throws Exception {
final WatermarkStrategy<Integer> watermarkStrategy =
WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>());
@@ -97,11 +84,13 @@ public class SourceOperatorEventTimeTest {
(output) -> output.collect(0, 120L),
(output) -> output.collect(0, 110L));
- assertWatermarksOrEmpty(result, new Watermark(100L), new Watermark(120L));
+ assertWatermarksOrEmpty(
+ emitProgressiveWatermarks, result, new Watermark(100L), new Watermark(120L));
}
- @Test
- public void testPerSplitOutputPeriodicWatermarks() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testPerSplitOutputPeriodicWatermarks(boolean emitProgressiveWatermarks) throws Exception {
final WatermarkStrategy<Integer> watermarkStrategy =
WatermarkStrategy.forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>());
@@ -120,11 +109,16 @@ public class SourceOperatorEventTimeTest {
(output) -> output.createOutputForSplit("B").collect(0, 200L));
assertWatermarksOrEmpty(
- result, new Watermark(100L), new Watermark(150L), new Watermark(200L));
+ emitProgressiveWatermarks,
+ result,
+ new Watermark(100L),
+ new Watermark(150L),
+ new Watermark(200L));
}
- @Test
- public void testPerSplitOutputEventWatermarks() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testPerSplitOutputEventWatermarks(boolean emitProgressiveWatermarks) throws Exception {
final WatermarkStrategy<Integer> watermarkStrategy =
WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>());
@@ -143,11 +137,17 @@ public class SourceOperatorEventTimeTest {
(output) -> output.createOutputForSplit("two").collect(0, 200L));
assertWatermarksOrEmpty(
- result, new Watermark(100L), new Watermark(150L), new Watermark(200L));
+ emitProgressiveWatermarks,
+ result,
+ new Watermark(100L),
+ new Watermark(150L),
+ new Watermark(200L));
}
- @Test
- public void testCreatingPerSplitOutputOnSplitAddition() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCreatingPerSplitOutputOnSplitAddition(boolean emitProgressiveWatermarks)
+ throws Exception {
final WatermarkStrategy<Integer> watermarkStrategy =
WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>());
@@ -172,7 +172,76 @@ public class SourceOperatorEventTimeTest {
new MockSourceSplitSerializer()));
final List<Watermark> result = testSequenceOfWatermarks(sourceOperator);
- assertWatermarksOrEmpty(result, new Watermark(150L), new Watermark(300L));
+ assertWatermarksOrEmpty(
+ emitProgressiveWatermarks, result, new Watermark(150L), new Watermark(300L));
+ }
+
+ @Test
+ void testMainAndPerSplitWatermarkIdleness() throws Exception {
+ final WatermarkStrategy<Integer> watermarkStrategy =
+ WatermarkStrategy.forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>());
+
+ InterpretingSourceReader reader =
+ new InterpretingSourceReader(
+ // Emit from main output
+ output -> output.collect(0, 100L),
+ // Mark main output as idle
+ // This should not generate IDLE event as per-split output is active
+ ReaderOutput::markIdle,
+ // Emit from per-split output
+ output -> output.createOutputForSplit("1").collect(0, 150L),
+ output -> output.createOutputForSplit("2").collect(0, 200L),
+ // Mark output of split 1 as idle
+ // This should not generate IDLE event as output of split 2 is active
+ output -> output.createOutputForSplit("1").markIdle(),
+ // Mark output of split 2 as idle
+ // Expect to have an IDLE event as all outputs (main, split 1 and split 2)
+ // are idle
+ output -> output.createOutputForSplit("2").markIdle(),
+ // Emit from main output
+ // Expect to have an ACTIVE event
+ output -> output.collect(0, 250L),
+ // Mark output 1 and 2 as idle again
+ // This should not generate IDLE event as main output is active
+ output -> output.createOutputForSplit("1").markIdle(),
+ output -> output.createOutputForSplit("2").markIdle(),
+ // Mark main output as idle again to test active from per-split output
+ // Expect to have an IDLE event
+ ReaderOutput::markIdle,
+ // Emit from per-split output
+ // Expect to have an ACTIVE event
+ output -> output.createOutputForSplit("1").collect(0, 300L));
+
+ SourceOperator<Integer, MockSourceSplit> sourceOperator =
+ createTestOperator(reader, watermarkStrategy, true);
+
+ List<Object> events = testSequenceOfEvents(sourceOperator);
+
+ assertThat(events)
+ .containsExactly(
+ // Record and watermark from main output
+ new StreamRecord<>(0, 100L),
+ new Watermark(100L),
+ new WatermarkStatus(WatermarkStatus.IDLE_STATUS),
+ // Record and watermark from split 1
+ new StreamRecord<>(0, 150L),
+ new WatermarkStatus(WatermarkStatus.ACTIVE_STATUS),
+ new Watermark(150L),
+ // Record and watermark from split 2
+ new StreamRecord<>(0, 200L),
+ new Watermark(200L),
+ // IDLE event as output of main, split 1 and split 2 are idle
+ new WatermarkStatus(WatermarkStatus.IDLE_STATUS),
+ // Record and watermark from main output, together with an ACTIVE event
+ new StreamRecord<>(0, 250L),
+ new WatermarkStatus(WatermarkStatus.ACTIVE_STATUS),
+ new Watermark(250L),
+ // IDLE event as output of main, split 1 and split 2 are idle
+ new WatermarkStatus(WatermarkStatus.IDLE_STATUS),
+ // Record and watermark from split 1, together with an ACTIVE event
+ new StreamRecord<>(0, 300L),
+ new WatermarkStatus(WatermarkStatus.ACTIVE_STATUS),
+ new Watermark(300));
}
// ------------------------------------------------------------------------
@@ -184,18 +253,18 @@ public class SourceOperatorEventTimeTest {
* mode. Otherwise, asserts that the list of actual watermarks is empty in BATCH mode.
*/
private void assertWatermarksOrEmpty(
- List<Watermark> actualWatermarks, Watermark... expectedWatermarks) {
+ boolean emitProgressiveWatermarks,
+ List<Watermark> actualWatermarks,
+ Watermark... expectedWatermarks) {
// We add the expected Long.MAX_VALUE watermark to the end. We expect that for both
// "STREAMING" and "BATCH" mode.
if (emitProgressiveWatermarks) {
- ArrayList<Watermark> watermarks = Lists.newArrayList(expectedWatermarks);
- assertThat(actualWatermarks, contains(watermarks.toArray()));
+ assertThat(actualWatermarks).contains(expectedWatermarks);
} else {
- assertThat(actualWatermarks, hasSize(0));
+ assertThat(actualWatermarks).isEmpty();
}
}
- @SuppressWarnings("FinalPrivateMethod")
@SafeVarargs
private final List<Watermark> testSequenceOfWatermarks(
final boolean emitProgressiveWatermarks,