You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/27 20:01:02 UTC

[flink] 06/16: [FLINK-17903][core] WatermarkOutputMultiplexer supports String IDs and de-registration of outputs

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e01f5284b2a9b1d41ef49507598e2912b839206
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 23:14:05 2020 +0200

    [FLINK-17903][core] WatermarkOutputMultiplexer supports String IDs and de-registration of outputs
---
 .../kafka/internals/AbstractFetcher.java           |  11 ++-
 .../eventtime/WatermarkOutputMultiplexer.java      |  35 ++++---
 .../eventtime/WatermarkOutputMultiplexerTest.java  | 101 +++++++++++++++++++--
 3 files changed, 119 insertions(+), 28 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 978cd97..1a10582 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -394,15 +394,18 @@ public abstract class AbstractFetcher<T, KPH> {
 
 			case WITH_WATERMARK_GENERATOR: {
 				for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
-					KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
+					final KafkaTopicPartition kafkaTopicPartition = partitionEntry.getKey();
+					KPH kafkaHandle = createKafkaPartitionHandle(kafkaTopicPartition);
 					WatermarkStrategy<T> deserializedWatermarkStrategy = watermarkStrategy.deserializeValue(
 							userCodeClassLoader);
 
-					int outputId = watermarkOutputMultiplexer.registerNewOutput();
+					// the format of the ID does not matter, as long as it is unique
+					final String partitionId = kafkaTopicPartition.getTopic() + '-' + kafkaTopicPartition.getPartition();
+					watermarkOutputMultiplexer.registerNewOutput(partitionId);
 					WatermarkOutput immediateOutput =
-							watermarkOutputMultiplexer.getImmediateOutput(outputId);
+							watermarkOutputMultiplexer.getImmediateOutput(partitionId);
 					WatermarkOutput deferredOutput =
-							watermarkOutputMultiplexer.getDeferredOutput(outputId);
+							watermarkOutputMultiplexer.getDeferredOutput(partitionId);
 
 					KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH> partitionState =
 							new KafkaTopicPartitionStateWithWatermarkGenerator<>(
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
index 83976e3..44e1e35 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
@@ -40,8 +40,8 @@ import static org.apache.flink.util.Preconditions.checkState;
  * #onPeriodicEmit()} is called will the deferred updates be combined and forwarded to the
  * underlying output.
  *
- * <p>For registering a new multiplexed output, you must first call {@link #registerNewOutput()}
- * and then call {@link #getImmediateOutput(int)} or {@link #getDeferredOutput(int)} with the output
+ * <p>For registering a new multiplexed output, you must first call {@link #registerNewOutput(String)}
+ * and then call {@link #getImmediateOutput(String)} or {@link #getDeferredOutput(String)} with the output
  * ID you get from that. You can get both an immediate and deferred output for a given output ID,
  * you can also call the getters multiple times.
  *
@@ -57,16 +57,13 @@ public class WatermarkOutputMultiplexer {
 	 */
 	private final WatermarkOutput underlyingOutput;
 
-	/** The id to use for the next registered output. */
-	private int nextOutputId = 0;
-
 	/** The combined watermark over the per-output watermarks. */
 	private long combinedWatermark = Long.MIN_VALUE;
 
 	/**
 	 * Map view, to allow finding them when requesting the {@link WatermarkOutput} for a given id.
 	 */
-	private final Map<Integer, OutputState> watermarkPerOutputId;
+	private final Map<String, OutputState> watermarkPerOutputId;
 
 	/**
 	 * List of all watermark outputs, for efficient access.
@@ -88,13 +85,23 @@ public class WatermarkOutputMultiplexer {
 	 * an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
 	 * output.
 	 */
-	public int registerNewOutput() {
-		int newOutputId = nextOutputId;
-		nextOutputId++;
-		OutputState outputState = new OutputState();
-		watermarkPerOutputId.put(newOutputId, outputState);
+	public void registerNewOutput(String id) {
+		final OutputState outputState = new OutputState();
+
+		final OutputState previouslyRegistered = watermarkPerOutputId.putIfAbsent(id, outputState);
+		checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
+
 		watermarkOutputs.add(outputState);
-		return newOutputId;
+	}
+
+	public boolean unregisterOutput(String id) {
+		final OutputState output = watermarkPerOutputId.remove(id);
+		if (output != null) {
+			watermarkOutputs.remove(output);
+			return true;
+		} else {
+			return false;
+		}
 	}
 
 	/**
@@ -103,7 +110,7 @@ public class WatermarkOutputMultiplexer {
 	 * <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
 	 * outputs.
 	 */
-	public WatermarkOutput getImmediateOutput(int outputId) {
+	public WatermarkOutput getImmediateOutput(String outputId) {
 		Preconditions.checkArgument(
 				watermarkPerOutputId.containsKey(outputId),
 				"no output registered under id " + outputId);
@@ -118,7 +125,7 @@ public class WatermarkOutputMultiplexer {
 	 * <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
 	 * outputs.
 	 */
-	public WatermarkOutput getDeferredOutput(int outputId) {
+	public WatermarkOutput getDeferredOutput(String outputId) {
 		Preconditions.checkArgument(
 				watermarkPerOutputId.containsKey(outputId),
 				"no output registered under id " + outputId);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
index 224bf48..59bb6df 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
@@ -20,10 +20,15 @@ package org.apache.flink.api.common.eventtime;
 
 import org.junit.Test;
 
+import java.util.UUID;
+
 import static org.apache.flink.api.common.eventtime.WatermarkMatchers.watermark;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the {@link WatermarkOutputMultiplexer}.
@@ -261,9 +266,10 @@ public class WatermarkOutputMultiplexerTest {
 		WatermarkOutputMultiplexer multiplexer =
 				new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
 
-		int outputId = multiplexer.registerNewOutput();
-		WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
-		WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
+		final String id = "test-id";
+		multiplexer.registerNewOutput(id);
+		WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
+		WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
 
 		deferredOutput.emitWatermark(new Watermark(5));
 		multiplexer.onPeriodicEmit();
@@ -284,9 +290,10 @@ public class WatermarkOutputMultiplexerTest {
 		WatermarkOutputMultiplexer multiplexer =
 				new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
 
-		int outputId = multiplexer.registerNewOutput();
-		WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
-		WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
+		final String id = "1234-test";
+		multiplexer.registerNewOutput(id);
+		WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
+		WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
 
 		deferredOutput.emitWatermark(new Watermark(5));
 		immediateOutput.emitWatermark(new Watermark(2));
@@ -294,13 +301,86 @@ public class WatermarkOutputMultiplexerTest {
 		assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue()));
 	}
 
+	@Test
+	public void testRemoveUnblocksWatermarks() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+		final long lowTimestamp = 156765L;
+		final long highTimestamp = lowTimestamp + 10;
+
+		multiplexer.registerNewOutput("lower");
+		multiplexer.registerNewOutput("higher");
+		multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
+
+		multiplexer.unregisterOutput("lower");
+		multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
+
+		assertEquals(highTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
+	}
+
+	@Test
+	public void testRemoveOfLowestDoesNotImmediatelyAdvanceWatermark() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+		final long lowTimestamp = -4343L;
+		final long highTimestamp = lowTimestamp + 10;
+
+		multiplexer.registerNewOutput("lower");
+		multiplexer.registerNewOutput("higher");
+		multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
+		multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
+
+		multiplexer.unregisterOutput("lower");
+
+		assertEquals(lowTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
+	}
+
+	@Test
+	public void testRemoveOfHighestDoesNotRetractWatermark() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+		final long lowTimestamp = 1L;
+		final long highTimestamp = 2L;
+
+		multiplexer.registerNewOutput("higher");
+		multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
+		multiplexer.unregisterOutput("higher");
+
+		multiplexer.registerNewOutput("lower");
+		multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
+
+		assertEquals(highTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
+	}
+
+	@Test
+	public void testRemoveRegisteredReturnValue() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+		multiplexer.registerNewOutput("does-exist");
+
+		final boolean unregistered = multiplexer.unregisterOutput("does-exist");
+
+		assertTrue(unregistered);
+	}
+
+	@Test
+	public void testRemoveNotRegisteredReturnValue() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+
+		final boolean unregistered = multiplexer.unregisterOutput("does-not-exist");
+
+		assertFalse(unregistered);
+	}
+
 	/**
 	 * Convenience method so we don't have to go through the output ID dance when we only want an
 	 * immediate output for a given output ID.
 	 */
 	private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer multiplexer) {
-		int outputId = multiplexer.registerNewOutput();
-		return multiplexer.getImmediateOutput(outputId);
+		final String id = UUID.randomUUID().toString();
+		multiplexer.registerNewOutput(id);
+		return multiplexer.getImmediateOutput(id);
 	}
 
 	/**
@@ -308,8 +388,9 @@ public class WatermarkOutputMultiplexerTest {
 	 * deferred output for a given output ID.
 	 */
 	private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer multiplexer) {
-		int outputId = multiplexer.registerNewOutput();
-		return multiplexer.getDeferredOutput(outputId);
+		final String id = UUID.randomUUID().toString();
+		multiplexer.registerNewOutput(id);
+		return multiplexer.getDeferredOutput(id);
 	}
 
 	private static TestingWatermarkOutput createTestingWatermarkOutput() {