You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/25 09:55:49 UTC

[GitHub] [flink] StephanEwen commented on a change in pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

StephanEwen commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r429846873



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -88,13 +85,17 @@ public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
 	 * an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
 	 * output.
 	 */
-	public int registerNewOutput() {
-		int newOutputId = nextOutputId;
-		nextOutputId++;
-		OutputState outputState = new OutputState();
-		watermarkPerOutputId.put(newOutputId, outputState);
+	public void registerNewOutput(String id) {
+		final OutputState outputState = new OutputState();
+
+		final OutputState previouslyRegistered = watermarkPerOutputId.putIfAbsent(id, outputState);
+		checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
+
 		watermarkOutputs.add(outputState);
-		return newOutputId;
+	}
+
+	public boolean unregisterOutput(String id) {
+		return watermarkPerOutputId.remove(id) != null;

Review comment:
       Why is it not possible to remove from `watermarkOutputs`? It is a linear operation (`List.remove()`), but then again, it doesn't happen very often.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org