You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/27 20:01:02 UTC
[flink] 06/16: [FLINK-17903][core] WatermarkOutputMultiplexer
supports String IDs and de-registration of outputs
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e01f5284b2a9b1d41ef49507598e2912b839206
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 23:14:05 2020 +0200
[FLINK-17903][core] WatermarkOutputMultiplexer supports String IDs and de-registration of outputs
---
.../kafka/internals/AbstractFetcher.java | 11 ++-
.../eventtime/WatermarkOutputMultiplexer.java | 35 ++++---
.../eventtime/WatermarkOutputMultiplexerTest.java | 101 +++++++++++++++++++--
3 files changed, 119 insertions(+), 28 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 978cd97..1a10582 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -394,15 +394,18 @@ public abstract class AbstractFetcher<T, KPH> {
case WITH_WATERMARK_GENERATOR: {
for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
- KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
+ final KafkaTopicPartition kafkaTopicPartition = partitionEntry.getKey();
+ KPH kafkaHandle = createKafkaPartitionHandle(kafkaTopicPartition);
WatermarkStrategy<T> deserializedWatermarkStrategy = watermarkStrategy.deserializeValue(
userCodeClassLoader);
- int outputId = watermarkOutputMultiplexer.registerNewOutput();
+ // the format of the ID does not matter, as long as it is unique
+ final String partitionId = kafkaTopicPartition.getTopic() + '-' + kafkaTopicPartition.getPartition();
+ watermarkOutputMultiplexer.registerNewOutput(partitionId);
WatermarkOutput immediateOutput =
- watermarkOutputMultiplexer.getImmediateOutput(outputId);
+ watermarkOutputMultiplexer.getImmediateOutput(partitionId);
WatermarkOutput deferredOutput =
- watermarkOutputMultiplexer.getDeferredOutput(outputId);
+ watermarkOutputMultiplexer.getDeferredOutput(partitionId);
KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH> partitionState =
new KafkaTopicPartitionStateWithWatermarkGenerator<>(
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
index 83976e3..44e1e35 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
@@ -40,8 +40,8 @@ import static org.apache.flink.util.Preconditions.checkState;
* #onPeriodicEmit()} is called will the deferred updates be combined and forwarded to the
* underlying output.
*
- * <p>For registering a new multiplexed output, you must first call {@link #registerNewOutput()}
- * and then call {@link #getImmediateOutput(int)} or {@link #getDeferredOutput(int)} with the output
+ * <p>For registering a new multiplexed output, you must first call {@link #registerNewOutput(String)}
+ * and then call {@link #getImmediateOutput(String)} or {@link #getDeferredOutput(String)} with the output
* ID you get from that. You can get both an immediate and deferred output for a given output ID,
* you can also call the getters multiple times.
*
@@ -57,16 +57,13 @@ public class WatermarkOutputMultiplexer {
*/
private final WatermarkOutput underlyingOutput;
- /** The id to use for the next registered output. */
- private int nextOutputId = 0;
-
/** The combined watermark over the per-output watermarks. */
private long combinedWatermark = Long.MIN_VALUE;
/**
* Map view, to allow finding them when requesting the {@link WatermarkOutput} for a given id.
*/
- private final Map<Integer, OutputState> watermarkPerOutputId;
+ private final Map<String, OutputState> watermarkPerOutputId;
/**
* List of all watermark outputs, for efficient access.
@@ -88,13 +85,23 @@ public class WatermarkOutputMultiplexer {
* an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
* output.
*/
- public int registerNewOutput() {
- int newOutputId = nextOutputId;
- nextOutputId++;
- OutputState outputState = new OutputState();
- watermarkPerOutputId.put(newOutputId, outputState);
+ public void registerNewOutput(String id) {
+ final OutputState outputState = new OutputState();
+
+ final OutputState previouslyRegistered = watermarkPerOutputId.putIfAbsent(id, outputState);
+ checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
+
watermarkOutputs.add(outputState);
- return newOutputId;
+ }
+
+ public boolean unregisterOutput(String id) {
+ final OutputState output = watermarkPerOutputId.remove(id);
+ if (output != null) {
+ watermarkOutputs.remove(output);
+ return true;
+ } else {
+ return false;
+ }
}
/**
@@ -103,7 +110,7 @@ public class WatermarkOutputMultiplexer {
* <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
* outputs.
*/
- public WatermarkOutput getImmediateOutput(int outputId) {
+ public WatermarkOutput getImmediateOutput(String outputId) {
Preconditions.checkArgument(
watermarkPerOutputId.containsKey(outputId),
"no output registered under id " + outputId);
@@ -118,7 +125,7 @@ public class WatermarkOutputMultiplexer {
* <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
* outputs.
*/
- public WatermarkOutput getDeferredOutput(int outputId) {
+ public WatermarkOutput getDeferredOutput(String outputId) {
Preconditions.checkArgument(
watermarkPerOutputId.containsKey(outputId),
"no output registered under id " + outputId);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
index 224bf48..59bb6df 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
@@ -20,10 +20,15 @@ package org.apache.flink.api.common.eventtime;
import org.junit.Test;
+import java.util.UUID;
+
import static org.apache.flink.api.common.eventtime.WatermarkMatchers.watermark;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
/**
* Tests for the {@link WatermarkOutputMultiplexer}.
@@ -261,9 +266,10 @@ public class WatermarkOutputMultiplexerTest {
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
- int outputId = multiplexer.registerNewOutput();
- WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
- WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
+ final String id = "test-id";
+ multiplexer.registerNewOutput(id);
+ WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
+ WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
deferredOutput.emitWatermark(new Watermark(5));
multiplexer.onPeriodicEmit();
@@ -284,9 +290,10 @@ public class WatermarkOutputMultiplexerTest {
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
- int outputId = multiplexer.registerNewOutput();
- WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
- WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
+ final String id = "1234-test";
+ multiplexer.registerNewOutput(id);
+ WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
+ WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
deferredOutput.emitWatermark(new Watermark(5));
immediateOutput.emitWatermark(new Watermark(2));
@@ -294,13 +301,86 @@ public class WatermarkOutputMultiplexerTest {
assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue()));
}
+ @Test
+ public void testRemoveUnblocksWatermarks() {
+ final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+ final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+ final long lowTimestamp = 156765L;
+ final long highTimestamp = lowTimestamp + 10;
+
+ multiplexer.registerNewOutput("lower");
+ multiplexer.registerNewOutput("higher");
+ multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
+
+ multiplexer.unregisterOutput("lower");
+ multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
+
+ assertEquals(highTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
+ }
+
+ @Test
+ public void testRemoveOfLowestDoesNotImmediatelyAdvanceWatermark() {
+ final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+ final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+ final long lowTimestamp = -4343L;
+ final long highTimestamp = lowTimestamp + 10;
+
+ multiplexer.registerNewOutput("lower");
+ multiplexer.registerNewOutput("higher");
+ multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
+ multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
+
+ multiplexer.unregisterOutput("lower");
+
+ assertEquals(lowTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
+ }
+
+ @Test
+ public void testRemoveOfHighestDoesNotRetractWatermark() {
+ final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+ final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+ final long lowTimestamp = 1L;
+ final long highTimestamp = 2L;
+
+ multiplexer.registerNewOutput("higher");
+ multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
+ multiplexer.unregisterOutput("higher");
+
+ multiplexer.registerNewOutput("lower");
+ multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
+
+ assertEquals(highTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
+ }
+
+ @Test
+ public void testRemoveRegisteredReturnValue() {
+ final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+ final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+ multiplexer.registerNewOutput("does-exist");
+
+ final boolean unregistered = multiplexer.unregisterOutput("does-exist");
+
+ assertTrue(unregistered);
+ }
+
+ @Test
+ public void testRemoveNotRegisteredReturnValue() {
+ final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+ final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+
+ final boolean unregistered = multiplexer.unregisterOutput("does-not-exist");
+
+ assertFalse(unregistered);
+ }
+
/**
* Convenience method so we don't have to go through the output ID dance when we only want an
* immediate output for a given output ID.
*/
private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer multiplexer) {
- int outputId = multiplexer.registerNewOutput();
- return multiplexer.getImmediateOutput(outputId);
+ final String id = UUID.randomUUID().toString();
+ multiplexer.registerNewOutput(id);
+ return multiplexer.getImmediateOutput(id);
}
/**
@@ -308,8 +388,9 @@ public class WatermarkOutputMultiplexerTest {
* deferred output for a given output ID.
*/
private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer multiplexer) {
- int outputId = multiplexer.registerNewOutput();
- return multiplexer.getDeferredOutput(outputId);
+ final String id = UUID.randomUUID().toString();
+ multiplexer.registerNewOutput(id);
+ return multiplexer.getDeferredOutput(id);
}
private static TestingWatermarkOutput createTestingWatermarkOutput() {