You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/24 22:12:28 UTC

[GitHub] [flink] StephanEwen opened a new pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

StephanEwen opened a new pull request #12306:
URL: https://github.com/apache/flink/pull/12306


   ## What is the purpose of the change
   
   This PR integrates the new Watermark Generators introduced as part of [FLIP-126](https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners) with the new Source Interface introduced in [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
   
   A discussion about the design approach is under the JIRA issue [FLINK-17899](https://issues.apache.org/jira/browse/FLINK-17899)
   
   ## Brief change log
   
   The main changes in this PR are:
   
     1. The introduction of `SourceOutputWithWatermarks` which is a `SourceOutput` that internally runs the `TimestampAssigner` and `WatermarkGenerator`
   
     2. The extension of the `SourceReader`'s output parameter from `SourceOutput` to `ReaderOutput`. The `ReaderOutput` extends the `SourceOutput` and additionally allows the creation of *"split-local"* Source Outputs, to run the watermark generation logic per split.
   
     3. Using the `WatermarkOutputMultiplexer` to combine the watermarks from the split-local outputs.
   
     4. A bunch of utils to create and setup these classes, considering streaming (generate watermarks) and batch (no watermarks).
   
   This PR also includes `[FLINK-17898]` (Remove Exceptions from signatures of SourceOutput) and `[FLINK-17897]` (Classify FLIP-27 source API as @PublicEvolving) as pre-processing, to avoid merge conflicts later.
   
   ## Verifying this change
   
   This PR is only covered in a shallow way by existing tests. More tests are needed.
   This PR currently served only as a base of review of the production code implementation.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **yes**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **yes**
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **yes**
     - If yes, how is the feature documented? **not documented**
   
   **Documentation of the new features is still undecided - it is possible that this feature stays a "hidden" feature in the upcoming release**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r430354715



##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
##########
@@ -0,0 +1,85 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+/**
+ * The interface provided by Flink task to the {@link SourceReader} to emit records
+ * to downstream operators for message processing.
+ */
+@PublicEvolving
+public interface ReaderOutput<T> extends SourceOutput<T> {
+
+	/**
+	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 *
+	 * @param record the record to emit.
+	 */
+	@Override
+	void collect(T record);

Review comment:
       I often do that to spare the user navigating to parent classes to see methods and read JavaDocs. But it is purely cosmetic. I don't feel strongly about this.

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
##########
@@ -0,0 +1,85 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+/**
+ * The interface provided by Flink task to the {@link SourceReader} to emit records
+ * to downstream operators for message processing.
+ */
+@PublicEvolving
+public interface ReaderOutput<T> extends SourceOutput<T> {
+
+	/**
+	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 *
+	 * @param record the record to emit.
+	 */
+	@Override
+	void collect(T record);
+
+	/**
+	 * Emit a record with timestamp.
+	 *
+	 * @param record the record to emit.
+	 * @param timestamp the timestamp of the record.
+	 */
+	@Override
+	void collect(T record, long timestamp);
+
+	/**
+	 * Emits the given watermark.
+	 *
+	 * <p>Emitting a watermark also implicitly marks the stream as <i>active</i>, ending
+	 * previously marked idleness.
+	 */
+	@Override
+	void emitWatermark(Watermark watermark);
+
+	/**
+	 * Marks this output as idle, meaning that downstream operations do not
+	 * wait for watermarks from this output.
+	 *
+	 * <p>An output becomes active again as soon as the next watermark is emitted.
+	 */
+	@Override
+	void markIdle();
+
+	/**
+	 * Creates a {@code SourceOutput} for a specific Source Split. Use these outputs if you want to
+	 * run split-local logic, like watermark generation.
+	 * Only one split-local output may be created per split.
+	 *
+	 * <p><b>IMPORTANT:</b> After the split has been finished, it is crucial to release the created
+	 * output again. Otherwise it will continue to contribute to the watermark generation like a
+	 * perpetually stalling source split, and may hold back the watermark indefinitely.
+	 *
+	 * @see #releaseOutputForSplit(String)
+	 */
+	SourceOutput<T> createOutputForSplit(String splitId);

Review comment:
       Yes, that is better. Will change this.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the SourceOutput. The records emitted to this output are pushed into a given
+ * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into the same output, or
+ * into a separate {@link WatermarkOutput}, if one is provided.
+ *
+ * <h2>Perdiodic Watermarks</h2>
+ *
+ * <p>This output does not implement periodic watermarks
+ * <h2>Note on Performance Considerations</h2>
+ *
+ * <p>The methods {@link SourceOutput#collect(Object)} and {@link SourceOutput#collect(Object, long)}
+ * are highly performance-critical (part of the hot loop). To make the code as JIT friendly as possible,
+ * we want to have only a single implementation of these two methods, across all classes.
+ * That way, the JIT compiler can de-virtualize (and inline) them better.
+ *
+ * <p>Currently, we have one implementation of these methods in the batch case (see class
+ * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this class). When the JVM
+ * is dedicated to a single job (or type of job) only one of these classes will be loaded. In mixed
+ * job setups, we still have a bimorphic method (rather than a poly/-/mega-morphic method).
+ *
+ * @param <T> The type of emitted records.
+ */
+public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
+
+	private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
+
+	private final TimestampAssigner<T> timestampAssigner;
+
+	private final WatermarkGenerator<T> watermarkGenerator;
+
+	private final WatermarkOutput onEventWatermarkOutput;
+
+	private final WatermarkOutput periodicWatermarkOutput;
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.
+	 */
+	protected SourceOutputWithWatermarks(
+			PushingAsyncDataInput.DataOutput<T> recordsOutput,
+			WatermarkOutput onEventWatermarkOutput,
+			WatermarkOutput periodicWatermarkOutput,
+			TimestampAssigner<T> timestampAssigner,
+			WatermarkGenerator<T> watermarkGenerator) {
+
+		this.recordsOutput = checkNotNull(recordsOutput);
+		this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput);
+		this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput);
+		this.timestampAssigner = checkNotNull(timestampAssigner);
+		this.watermarkGenerator = checkNotNull(watermarkGenerator);
+	}
+
+	// ------------------------------------------------------------------------
+	// SourceOutput Methods
+	//
+	// Note that the two methods below are final, as a partial enforcement
+	// of the performance design goal mentioned in the class-level comment.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void collect(T record) {
+		collect(record, Long.MIN_VALUE);

Review comment:
       Yes, it is, see comments in https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java
   
    Will replace this with a symbol to not have this "magic value" there.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the SourceOutput. The records emitted to this output are pushed into a given
+ * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into the same output, or
+ * into a separate {@link WatermarkOutput}, if one is provided.
+ *
+ * <h2>Perdiodic Watermarks</h2>
+ *
+ * <p>This output does not implement periodic watermarks
+ * <h2>Note on Performance Considerations</h2>
+ *
+ * <p>The methods {@link SourceOutput#collect(Object)} and {@link SourceOutput#collect(Object, long)}
+ * are highly performance-critical (part of the hot loop). To make the code as JIT friendly as possible,
+ * we want to have only a single implementation of these two methods, across all classes.
+ * That way, the JIT compiler can de-virtualize (and inline) them better.
+ *
+ * <p>Currently, we have one implementation of these methods in the batch case (see class
+ * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this class). When the JVM
+ * is dedicated to a single job (or type of job) only one of these classes will be loaded. In mixed
+ * job setups, we still have a bimorphic method (rather than a poly/-/mega-morphic method).
+ *
+ * @param <T> The type of emitted records.
+ */
+public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
+
+	private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
+
+	private final TimestampAssigner<T> timestampAssigner;
+
+	private final WatermarkGenerator<T> watermarkGenerator;
+
+	private final WatermarkOutput onEventWatermarkOutput;
+
+	private final WatermarkOutput periodicWatermarkOutput;
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.
+	 */
+	protected SourceOutputWithWatermarks(
+			PushingAsyncDataInput.DataOutput<T> recordsOutput,
+			WatermarkOutput onEventWatermarkOutput,
+			WatermarkOutput periodicWatermarkOutput,
+			TimestampAssigner<T> timestampAssigner,
+			WatermarkGenerator<T> watermarkGenerator) {
+
+		this.recordsOutput = checkNotNull(recordsOutput);
+		this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput);
+		this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput);
+		this.timestampAssigner = checkNotNull(timestampAssigner);
+		this.watermarkGenerator = checkNotNull(watermarkGenerator);
+	}
+
+	// ------------------------------------------------------------------------
+	// SourceOutput Methods
+	//
+	// Note that the two methods below are final, as a partial enforcement
+	// of the performance design goal mentioned in the class-level comment.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void collect(T record) {
+		collect(record, Long.MIN_VALUE);
+	}
+
+	@Override
+	public final void collect(T record, long timestamp) {
+		try {
+			final long assignedTimestamp = timestampAssigner.extractTimestamp(record, timestamp);
+
+			// IMPORTANT: The event must be emitted before the watermark generator is called.
+			recordsOutput.emitRecord(new StreamRecord<>(record, assignedTimestamp));
+			watermarkGenerator.onEvent(record, assignedTimestamp, onEventWatermarkOutput);
+		} catch (ExceptionInChainedOperatorException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExceptionInChainedOperatorException(e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// WatermarkOutput Methods
+	//
+	// These two methods are final as well, to enforce the contract that the
+	// watermarks from emitWatermark(Watermark) go to the same output as the
+	// watermarks from the watermarkGenerator.onEvent(...) calls in the collect(...)
+	// methods.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void emitWatermark(Watermark watermark) {
+		onEventWatermarkOutput.emitWatermark(watermark);
+	}
+
+	@Override
+	public final void markIdle() {
+		onEventWatermarkOutput.markIdle();
+	}
+
+	public final void emitPeriodicWatermark() {
+		watermarkGenerator.onPeriodicEmit(periodicWatermarkOutput);
+	}
+
+	// ------------------------------------------------------------------------
+	// Factories
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.

Review comment:
       For the per-partition watermarking. The records flow directly downstream, but watermarks go to the `WatermarkOutputMultiplexer` which merges them.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+
+/**
+ * Basic interface for the timestamp extraction and watermark generation logic for the
+ * {@link org.apache.flink.api.connector.source.SourceReader}.
+ *
+ * <p>Implementations of this class may or may not actually perform certain tasks, like watermark
+ * generation. For example, the batch-oriented implementation typically skips all watermark generation
+ * logic.
+ *
+ * @param <T> The type of the emitted records.
+ */
+public interface TimestampsAndWatermarks<T> {

Review comment:
       Good point, I will annotate all classes in that package with `@Internal`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
##########
@@ -61,9 +67,11 @@ public SourceOperatorFactory(Source<OUT, ?, ?> source, int numCoordinatorWorkerT
 		final SourceOperator<OUT, ?> sourceOperator = instantiateSourceOperator(
 				source::createReader,
 				gateway,
-				source.getSplitSerializer());
+				source.getSplitSerializer(),
+				eventTimeConfig);
 
 		sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+		sourceOperator.processingTimeService = parameters.getProcessingTimeService();

Review comment:
       Will do

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/EventTimeConfig.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * A collection of all information relevant to setting up timestamp extraction and watermark
+ * generation in a data source operator.
+ */
+public final class EventTimeConfig<T> implements Serializable {

Review comment:
       Maybe. I think this may actually serve a "config-style" purpose (at least Configs may be more than key/value pairs).

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/EventTimeConfig.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * A collection of all information relevant to setting up timestamp extraction and watermark
+ * generation in a data source operator.
+ */
+public final class EventTimeConfig<T> implements Serializable {

Review comment:
       Maybe. I think this may actually serve a "config-style" purpose (at least if Configs may be more than key/value pairs).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-633308345


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 7b43664f2f90c8291bd2d5175c91714eaddf6285 (Sun May 24 22:14:16 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on a change in pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
becketqin commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r429942801



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
##########
@@ -61,9 +67,11 @@ public SourceOperatorFactory(Source<OUT, ?, ?> source, int numCoordinatorWorkerT
 		final SourceOperator<OUT, ?> sourceOperator = instantiateSourceOperator(
 				source::createReader,
 				gateway,
-				source.getSplitSerializer());
+				source.getSplitSerializer(),
+				eventTimeConfig);
 
 		sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
+		sourceOperator.processingTimeService = parameters.getProcessingTimeService();

Review comment:
       Might be better to put in the constructor of `SourceOperator`?

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
##########
@@ -0,0 +1,85 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+/**
+ * The interface provided by Flink task to the {@link SourceReader} to emit records
+ * to downstream operators for message processing.
+ */
+@PublicEvolving
+public interface ReaderOutput<T> extends SourceOutput<T> {
+
+	/**
+	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 *
+	 * @param record the record to emit.
+	 */
+	@Override
+	void collect(T record);

Review comment:
       Is there a reason to override the method in the parent class here?

##########
File path: flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
##########
@@ -0,0 +1,85 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+/**
+ * The interface provided by Flink task to the {@link SourceReader} to emit records
+ * to downstream operators for message processing.
+ */
+@PublicEvolving
+public interface ReaderOutput<T> extends SourceOutput<T> {
+
+	/**
+	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 *
+	 * @param record the record to emit.
+	 */
+	@Override
+	void collect(T record);
+
+	/**
+	 * Emit a record with timestamp.
+	 *
+	 * @param record the record to emit.
+	 * @param timestamp the timestamp of the record.
+	 */
+	@Override
+	void collect(T record, long timestamp);
+
+	/**
+	 * Emits the given watermark.
+	 *
+	 * <p>Emitting a watermark also implicitly marks the stream as <i>active</i>, ending
+	 * previously marked idleness.
+	 */
+	@Override
+	void emitWatermark(Watermark watermark);
+
+	/**
+	 * Marks this output as idle, meaning that downstream operations do not
+	 * wait for watermarks from this output.
+	 *
+	 * <p>An output becomes active again as soon as the next watermark is emitted.
+	 */
+	@Override
+	void markIdle();
+
+	/**
+	 * Creates a {@code SourceOutput} for a specific Source Split. Use these outputs if you want to
+	 * run split-local logic, like watermark generation.
+	 * Only one split-local output may be created per split.
+	 *
+	 * <p><b>IMPORTANT:</b> After the split has been finished, it is crucial to release the created
+	 * output again. Otherwise it will continue to contribute to the watermark generation like a
+	 * perpetually stalling source split, and may hold back the watermark indefinitely.
+	 *
+	 * @see #releaseOutputForSplit(String)
+	 */
+	SourceOutput<T> createOutputForSplit(String splitId);

Review comment:
       Should we specify the behavior of this method to just return the existing `SourceOutput` if one is already created for the given split?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+
+/**
+ * Basic interface for the timestamp extraction and watermark generation logic for the
+ * {@link org.apache.flink.api.connector.source.SourceReader}.
+ *
+ * <p>Implementations of this class may or may not actually perform certain tasks, like watermark
+ * generation. For example, the batch-oriented implementation typically skips all watermark generation
+ * logic.
+ *
+ * @param <T> The type of the emitted records.
+ */
+public interface TimestampsAndWatermarks<T> {

Review comment:
       Is this an "Internal" class? I am wondering when should the annotations be put?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the SourceOutput. The records emitted to this output are pushed into a given
+ * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into the same output, or
+ * into a separate {@link WatermarkOutput}, if one is provided.
+ *
+ * <h2>Perdiodic Watermarks</h2>
+ *
+ * <p>This output does not implement periodic watermarks
+ * <h2>Note on Performance Considerations</h2>
+ *
+ * <p>The methods {@link SourceOutput#collect(Object)} and {@link SourceOutput#collect(Object, long)}
+ * are highly performance-critical (part of the hot loop). To make the code as JIT friendly as possible,
+ * we want to have only a single implementation of these two methods, across all classes.
+ * That way, the JIT compiler can de-virtualize (and inline) them better.
+ *
+ * <p>Currently, we have one implementation of these methods in the batch case (see class
+ * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this class). When the JVM
+ * is dedicated to a single job (or type of job) only one of these classes will be loaded. In mixed
+ * job setups, we still have a bimorphic method (rather than a poly/-/mega-morphic method).
+ *
+ * @param <T> The type of emitted records.
+ */
+public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
+
+	private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
+
+	private final TimestampAssigner<T> timestampAssigner;
+
+	private final WatermarkGenerator<T> watermarkGenerator;
+
+	private final WatermarkOutput onEventWatermarkOutput;
+
+	private final WatermarkOutput periodicWatermarkOutput;
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.
+	 */
+	protected SourceOutputWithWatermarks(
+			PushingAsyncDataInput.DataOutput<T> recordsOutput,
+			WatermarkOutput onEventWatermarkOutput,
+			WatermarkOutput periodicWatermarkOutput,
+			TimestampAssigner<T> timestampAssigner,
+			WatermarkGenerator<T> watermarkGenerator) {
+
+		this.recordsOutput = checkNotNull(recordsOutput);
+		this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput);
+		this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput);
+		this.timestampAssigner = checkNotNull(timestampAssigner);
+		this.watermarkGenerator = checkNotNull(watermarkGenerator);
+	}
+
+	// ------------------------------------------------------------------------
+	// SourceOutput Methods
+	//
+	// Note that the two methods below are final, as a partial enforcement
+	// of the performance design goal mentioned in the class-level comment.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void collect(T record) {
+		collect(record, Long.MIN_VALUE);

Review comment:
       Is this a symbolic special value of "NO_TIMESTAMP" or it might trigger retraction in the downstream operators?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the SourceOutput. The records emitted to this output are pushed into a given
+ * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into the same output, or
+ * into a separate {@link WatermarkOutput}, if one is provided.
+ *
+ * <h2>Perdiodic Watermarks</h2>
+ *
+ * <p>This output does not implement periodic watermarks
+ * <h2>Note on Performance Considerations</h2>
+ *
+ * <p>The methods {@link SourceOutput#collect(Object)} and {@link SourceOutput#collect(Object, long)}
+ * are highly performance-critical (part of the hot loop). To make the code as JIT friendly as possible,
+ * we want to have only a single implementation of these two methods, across all classes.
+ * That way, the JIT compiler can de-virtualize (and inline) them better.
+ *
+ * <p>Currently, we have one implementation of these methods in the batch case (see class
+ * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this class). When the JVM
+ * is dedicated to a single job (or type of job) only one of these classes will be loaded. In mixed
+ * job setups, we still have a bimorphic method (rather than a poly/-/mega-morphic method).
+ *
+ * @param <T> The type of emitted records.
+ */
+public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
+
+	private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
+
+	private final TimestampAssigner<T> timestampAssigner;
+
+	private final WatermarkGenerator<T> watermarkGenerator;
+
+	private final WatermarkOutput onEventWatermarkOutput;
+
+	private final WatermarkOutput periodicWatermarkOutput;
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.
+	 */
+	protected SourceOutputWithWatermarks(
+			PushingAsyncDataInput.DataOutput<T> recordsOutput,
+			WatermarkOutput onEventWatermarkOutput,
+			WatermarkOutput periodicWatermarkOutput,
+			TimestampAssigner<T> timestampAssigner,
+			WatermarkGenerator<T> watermarkGenerator) {
+
+		this.recordsOutput = checkNotNull(recordsOutput);
+		this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput);
+		this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput);
+		this.timestampAssigner = checkNotNull(timestampAssigner);
+		this.watermarkGenerator = checkNotNull(watermarkGenerator);
+	}
+
+	// ------------------------------------------------------------------------
+	// SourceOutput Methods
+	//
+	// Note that the two methods below are final, as a partial enforcement
+	// of the performance design goal mentioned in the class-level comment.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void collect(T record) {
+		collect(record, Long.MIN_VALUE);
+	}
+
+	@Override
+	public final void collect(T record, long timestamp) {
+		try {
+			final long assignedTimestamp = timestampAssigner.extractTimestamp(record, timestamp);
+
+			// IMPORTANT: The event must be emitted before the watermark generator is called.
+			recordsOutput.emitRecord(new StreamRecord<>(record, assignedTimestamp));
+			watermarkGenerator.onEvent(record, assignedTimestamp, onEventWatermarkOutput);
+		} catch (ExceptionInChainedOperatorException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExceptionInChainedOperatorException(e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// WatermarkOutput Methods
+	//
+	// These two methods are final as well, to enforce the contract that the
+	// watermarks from emitWatermark(Watermark) go to the same output as the
+	// watermarks from the watermarkGenerator.onEvent(...) calls in the collect(...)
+	// methods.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void emitWatermark(Watermark watermark) {
+		onEventWatermarkOutput.emitWatermark(watermark);
+	}
+
+	@Override
+	public final void markIdle() {
+		onEventWatermarkOutput.markIdle();
+	}
+
+	public final void emitPeriodicWatermark() {
+		watermarkGenerator.onPeriodicEmit(periodicWatermarkOutput);
+	}
+
+	// ------------------------------------------------------------------------
+	// Factories
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.

Review comment:
       Why is is a possibly different `WatermarkOutput`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/EventTimeConfig.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * A collection of all information relevant to setting up timestamp extraction and watermark
+ * generation in a data source operator.
+ */
+public final class EventTimeConfig<T> implements Serializable {

Review comment:
       `TimestampAndWatermarksFactory` seems a more consistent class name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r429846873



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -88,13 +85,17 @@ public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
 	 * an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
 	 * output.
 	 */
-	public int registerNewOutput() {
-		int newOutputId = nextOutputId;
-		nextOutputId++;
-		OutputState outputState = new OutputState();
-		watermarkPerOutputId.put(newOutputId, outputState);
+	public void registerNewOutput(String id) {
+		final OutputState outputState = new OutputState();
+
+		final OutputState previouslyRegistered = watermarkPerOutputId.putIfAbsent(id, outputState);
+		checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
+
 		watermarkOutputs.add(outputState);
-		return newOutputId;
+	}
+
+	public boolean unregisterOutput(String id) {
+		return watermarkPerOutputId.remove(id) != null;

Review comment:
       Why is it not possible to remove from `watermarkOutputs`? It is a linear operation (`List.remove()`), but then again, it doesn't happen very often.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-633310352


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2087",
       "triggerID" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7b43664f2f90c8291bd2d5175c91714eaddf6285 UNKNOWN
   * 1cf9d57642f568baf3a92d22ed7d320cd5b146b6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2087) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r431057279



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/EventTimeConfig.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * A collection of all information relevant to setting up timestamp extraction and watermark
+ * generation in a data source operator.
+ */
+public final class EventTimeConfig<T> implements Serializable {

Review comment:
       The class got dropped in the latest rework anyways. So, no more need to find a good name ;-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r429840970



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -88,13 +85,17 @@ public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
 	 * an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
 	 * output.
 	 */
-	public int registerNewOutput() {
-		int newOutputId = nextOutputId;
-		nextOutputId++;
-		OutputState outputState = new OutputState();
-		watermarkPerOutputId.put(newOutputId, outputState);
+	public void registerNewOutput(String id) {
+		final OutputState outputState = new OutputState();
+
+		final OutputState previouslyRegistered = watermarkPerOutputId.putIfAbsent(id, outputState);
+		checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
+
 		watermarkOutputs.add(outputState);
-		return newOutputId;
+	}
+
+	public boolean unregisterOutput(String id) {
+		return watermarkPerOutputId.remove(id) != null;

Review comment:
       This does not remove the output from `watermarkOutputs`. Please add a test that verifies correct behaviour when removing outputs.
   
   With the current design, it's actually not possible to remove the output from `watermarkOutputs`. One possible solution is to get rid of that list and always use the Map for iterating the outputs in `onPeriodicEmit()`. That would be a smidge slower but I think that's ok because periodic watermark emission does not happen super often.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-633310352


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2087",
       "triggerID" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7b43664f2f90c8291bd2d5175c91714eaddf6285 UNKNOWN
   * 1cf9d57642f568baf3a92d22ed7d320cd5b146b6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2087) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-633310352


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2087",
       "triggerID" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e71b50ee095aaa43abf46f994c8ee0f2f0f3771e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2226",
       "triggerID" : "e71b50ee095aaa43abf46f994c8ee0f2f0f3771e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4e18dbd8b3fce888e83e2f3519f107e3043dd9cb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4e18dbd8b3fce888e83e2f3519f107e3043dd9cb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b0954d3b81b93e68cd3bae202c7813d08a216d89",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2234",
       "triggerID" : "b0954d3b81b93e68cd3bae202c7813d08a216d89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5365e40b59fec77de126054746257f17a718fb5b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5365e40b59fec77de126054746257f17a718fb5b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7b43664f2f90c8291bd2d5175c91714eaddf6285 UNKNOWN
   * 4e18dbd8b3fce888e83e2f3519f107e3043dd9cb UNKNOWN
   * b0954d3b81b93e68cd3bae202c7813d08a216d89 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2234) 
   * 5365e40b59fec77de126054746257f17a718fb5b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on a change in pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
aljoscha commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r430206700



##########
File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -88,13 +85,17 @@ public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
 	 * an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
 	 * output.
 	 */
-	public int registerNewOutput() {
-		int newOutputId = nextOutputId;
-		nextOutputId++;
-		OutputState outputState = new OutputState();
-		watermarkPerOutputId.put(newOutputId, outputState);
+	public void registerNewOutput(String id) {
+		final OutputState outputState = new OutputState();
+
+		final OutputState previouslyRegistered = watermarkPerOutputId.putIfAbsent(id, outputState);
+		checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
+
 		watermarkOutputs.add(outputState);
-		return newOutputId;
+	}
+
+	public boolean unregisterOutput(String id) {
+		return watermarkPerOutputId.remove(id) != null;

Review comment:
       Ah no, I didn't think. It's of course possible because we're removing the same instance and `equals` therefore works...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-633310352


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7b43664f2f90c8291bd2d5175c91714eaddf6285 UNKNOWN
   * 1cf9d57642f568baf3a92d22ed7d320cd5b146b6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-633310352


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2087",
       "triggerID" : "1cf9d57642f568baf3a92d22ed7d320cd5b146b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e71b50ee095aaa43abf46f994c8ee0f2f0f3771e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2226",
       "triggerID" : "e71b50ee095aaa43abf46f994c8ee0f2f0f3771e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4e18dbd8b3fce888e83e2f3519f107e3043dd9cb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4e18dbd8b3fce888e83e2f3519f107e3043dd9cb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b0954d3b81b93e68cd3bae202c7813d08a216d89",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2234",
       "triggerID" : "b0954d3b81b93e68cd3bae202c7813d08a216d89",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5365e40b59fec77de126054746257f17a718fb5b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2300",
       "triggerID" : "5365e40b59fec77de126054746257f17a718fb5b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7b43664f2f90c8291bd2d5175c91714eaddf6285 UNKNOWN
   * 4e18dbd8b3fce888e83e2f3519f107e3043dd9cb UNKNOWN
   * b0954d3b81b93e68cd3bae202c7813d08a216d89 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2234) 
   * 5365e40b59fec77de126054746257f17a718fb5b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2300) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] asfgit merged pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
asfgit merged pull request #12306:
URL: https://github.com/apache/flink/pull/12306


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-633310352






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-633310352


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7b43664f2f90c8291bd2d5175c91714eaddf6285",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7b43664f2f90c8291bd2d5175c91714eaddf6285 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
becketqin commented on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-633606359


   BTW, I think we do need more tests for the patch.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on pull request #12306: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by GitBox <gi...@apache.org>.
becketqin commented on pull request #12306:
URL: https://github.com/apache/flink/pull/12306#issuecomment-634444121


   Thanks for updating the patch and fix the bugs. +1 to merge the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org