You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/25 09:43:46 UTC

[GitHub] [flink] aljoscha commented on a change in pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

aljoscha commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r429840970



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -88,13 +85,17 @@ public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
 	 * an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
 	 * output.
 	 */
-	public int registerNewOutput() {
-		int newOutputId = nextOutputId;
-		nextOutputId++;
-		OutputState outputState = new OutputState();
-		watermarkPerOutputId.put(newOutputId, outputState);
+	public void registerNewOutput(String id) {
+		final OutputState outputState = new OutputState();
+
+		final OutputState previouslyRegistered = watermarkPerOutputId.putIfAbsent(id, outputState);
+		checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
+
 		watermarkOutputs.add(outputState);
-		return newOutputId;
+	}
+
+	public boolean unregisterOutput(String id) {
+		return watermarkPerOutputId.remove(id) != null;

Review comment:
       This does not remove the output from `watermarkOutputs`. Please add a test that verifies correct behaviour when removing outputs.
   
   With the current design, it's actually not possible to remove the output from `watermarkOutputs`. One possible solution is to get rid of that list and always use the Map for iterating the outputs in `onPeriodicEmit()`. That would be a smidge slower but I think that's ok because periodic watermark emission does not happen super often.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org