You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/27 20:02:44 UTC

[flink] branch release-1.11 updated (05b9792 -> a23d5b0)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 05b9792  [FLINK-17820][checkpointing] Respect fileSizeThreshold in FsCheckpointStateOutputStream.flush() (#12351)
     new 907c494  [hotfix][runtime] Annotate ProcessingTimeCallback as a FunctionalInterface
     new 76c46de  [hotfix][core] Add a constant for the special value "NO_TIMESTAMP".
     new 400a984  [hotfix][DataStream API] Minor code cleanups
     new cb6d3c3  [FLINK-17898][core] Remove Exceptions from signatures of SourceOutput methods
     new db837a4  [FLINK-17897][core] Classify FLIP-27 source API to @Experimental / @PublicEvolving
     new 26e4305  [FLINK-17903][core] WatermarkOutputMultiplexer supports String IDs and de-registration of outputs
     new 5353c47  [FLINK-17096][core] Simple performance improvements in WatermarkOutputMultiplexer
     new b03bbf6  [FLINK-17904][runtime] Add scheduleWithFixedDelay to ProcessingTimeService
     new b3031ac  [FLINK-17899][runtime][refactor] Make ProcessingTimeService always available to operators.
     new c0163d2  [FLINK-17899][core][refactor] Add a utility NoWatermarksGenerator
     new ae596d5  [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources
     new ac78993  [FLINK-17899][runtime] Remove unnecessary implementation of SourceContext in SourceOperatorStreamTask
     new 2e4a7fe  [FLINK-17899][runtime] Add WatermarkStrategies to countinuousSource() methods in the DataStream API
     new 0d2ac14  [FLINK-17950][Scala DataStream API] Fix StreamExecutionEnvironment.continuousSource(...) method
     new 527f02f  [hotfix][core] Improve JavaDocs for FLIP-27 sources.
     new a23d5b0  [hotfix] Adjust License Headers for FLIP-27 sources to be same as the remaining code base

The 16 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../base/source/event/NoMoreSplitsEvent.java       |  30 +--
 .../base/source/reader/RecordEmitter.java          |  30 +--
 .../base/source/reader/RecordsBySplits.java        |  30 +--
 .../base/source/reader/RecordsWithSplitIds.java    |  30 +--
 .../SingleThreadMultiplexSourceReaderBase.java     |  30 +--
 .../base/source/reader/SourceReaderBase.java       |  73 ++++--
 .../base/source/reader/SourceReaderOptions.java    |  30 +--
 .../base/source/reader/SplitsRecordIterator.java   |  30 +--
 .../base/source/reader/fetcher/AddSplitsTask.java  |  30 +--
 .../base/source/reader/fetcher/FetchTask.java      |  30 +--
 .../reader/fetcher/SingleThreadFetcherManager.java |  30 +--
 .../base/source/reader/fetcher/SplitFetcher.java   |  30 +--
 .../source/reader/fetcher/SplitFetcherManager.java |  30 +--
 .../source/reader/fetcher/SplitFetcherTask.java    |  30 +--
 .../source/reader/splitreader/SplitReader.java     |  30 +--
 .../source/reader/splitreader/SplitsAddition.java  |  30 +--
 .../source/reader/splitreader/SplitsChange.java    |  30 +--
 .../FutureCompletingBlockingQueue.java             |  30 +--
 .../reader/synchronization/FutureNotifier.java     |  30 +--
 .../source/reader/CoordinatedSourceITCase.java     |  46 ++--
 .../base/source/reader/SourceReaderBaseTest.java   |  30 +--
 .../base/source/reader/SourceReaderTestBase.java   |  45 ++--
 .../source/reader/fetcher/SplitFetcherTest.java    |  30 +--
 .../base/source/reader/mocks/MockBaseSource.java   |  30 +--
 .../source/reader/mocks/MockRecordEmitter.java     |  30 +--
 .../base/source/reader/mocks/MockSourceReader.java |  30 +--
 .../source/reader/mocks/MockSplitEnumerator.java   |  30 +--
 .../base/source/reader/mocks/MockSplitReader.java  |  30 +--
 .../reader/synchronization/FutureNotifierTest.java |  30 +--
 .../kafka/internals/AbstractFetcher.java           |  11 +-
 .../common/eventtime/NoWatermarksGenerator.java    |  26 +--
 .../api/common/eventtime/TimestampAssigner.java    |   9 +-
 .../eventtime/WatermarkOutputMultiplexer.java      |  62 +++--
 .../api/common/eventtime/WatermarkStrategies.java  |   8 +
 .../api/common/eventtime/WatermarkStrategy.java    |   2 +-
 .../flink/api/connector/source/Boundedness.java    |  34 +--
 .../flink/api/connector/source/ReaderInfo.java     |  34 +--
 .../flink/api/connector/source/ReaderOutput.java   | 117 ++++++++++
 .../apache/flink/api/connector/source/Source.java  |  34 +--
 .../flink/api/connector/source/SourceEvent.java    |  34 +--
 .../flink/api/connector/source/SourceOutput.java   |  68 ++++--
 .../flink/api/connector/source/SourceReader.java   |  36 +--
 .../api/connector/source/SourceReaderContext.java  |  34 +--
 .../flink/api/connector/source/SourceSplit.java    |  34 +--
 .../api/connector/source/SplitEnumerator.java      |  34 +--
 .../connector/source/SplitEnumeratorContext.java   |  34 +--
 .../api/connector/source/SplitsAssignment.java     |  34 +--
 .../eventtime/WatermarkOutputMultiplexerTest.java  | 101 +++++++-
 .../api/connector/source/mocks/MockSource.java     |  33 +--
 .../connector/source/mocks/MockSourceReader.java   |  34 +--
 .../connector/source/mocks/MockSourceSplit.java    |  30 +--
 .../source/mocks/MockSourceSplitSerializer.java    |  30 +--
 .../source/mocks/MockSplitEnumerator.java          |  30 +--
 .../MockSplitEnumeratorCheckpointSerializer.java   |  30 +--
 .../runtime/NeverFireProcessingTimeService.java    |   6 +
 .../streaming/api/datastream/DataStreamSource.java |  22 +-
 .../api/datastream/SingleOutputStreamOperator.java |   2 +-
 .../environment/StreamExecutionEnvironment.java    |  26 ++-
 .../streaming/api/operators/SourceOperator.java    |  60 ++++-
 .../api/operators/SourceOperatorFactory.java       |  34 ++-
 .../api/operators/StreamOperatorFactoryUtil.java   |  14 +-
 .../api/operators/StreamOperatorParameters.java    |  17 +-
 .../source/BatchTimestampsAndWatermarks.java       | 126 ++++++++++
 .../source/SourceOutputWithWatermarks.java         | 179 ++++++++++++++
 .../source/StreamingTimestampsAndWatermarks.java   | 247 ++++++++++++++++++++
 .../operators/source/TimestampsAndWatermarks.java  |  92 ++++++++
 .../source/TimestampsAndWatermarksContext.java}    |  34 +--
 .../operators/source/WatermarkToDataOutput.java    |  86 +++++++
 .../runtime/tasks/ProcessingTimeCallback.java      |   1 +
 .../runtime/tasks/ProcessingTimeService.java       |  17 ++
 .../runtime/tasks/ProcessingTimeServiceImpl.java   |  10 +
 .../runtime/tasks/SourceOperatorStreamTask.java    |  33 +--
 .../runtime/tasks/SystemProcessingTimeService.java |  20 +-
 .../runtime/tasks/TestProcessingTimeService.java   |   6 +
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  13 +-
 .../api/operators/SourceOperatorTest.java          |  32 +--
 .../api/operators/source/CollectingDataOutput.java |  56 +++++
 .../source/OnEventTestWatermarkGenerator.java}     |  30 +--
 .../source/OnPeriodicTestWatermarkGenerator.java}  |  39 ++--
 .../source/SourceOperatorEventTimeTest.java        | 257 +++++++++++++++++++++
 .../source/SourceOutputWithWatermarksTest.java     |  85 +++++++
 .../operators/source/TestingSourceOperator.java    |  94 ++++++++
 .../source/WatermarkToDataOutputTest.java          |  77 ++++++
 .../tasks/SourceOperatorStreamTaskTest.java        |  10 +-
 flink-streaming-scala/pom.xml                      |   8 +
 .../api/scala/StreamExecutionEnvironment.scala     |  11 +-
 .../api/scala/StreamExecutionEnvironmentTest.scala |  57 +++++
 87 files changed, 2719 insertions(+), 959 deletions(-)
 copy flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java => flink-core/src/main/java/org/apache/flink/api/common/eventtime/NoWatermarksGenerator.java (52%)
 create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
 copy flink-streaming-java/src/main/java/org/apache/flink/streaming/{runtime/tasks/ProcessingTimeCallback.java => api/operators/source/TimestampsAndWatermarksContext.java} (51%)
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
 copy flink-streaming-java/src/{main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java => test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java} (52%)
 copy flink-streaming-java/src/{main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java => test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java} (52%)
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java
 create mode 100644 flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala


[flink] 03/16: [hotfix][DataStream API] Minor code cleanups

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 400a9848f948e20fd881ada511632032b00a1450
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 26 17:25:08 2020 +0200

    [hotfix][DataStream API] Minor code cleanups
---
 .../flink/api/common/eventtime/WatermarkStrategy.java  |  2 +-
 .../streaming/api/datastream/DataStreamSource.java     | 18 +++++++++++++++---
 .../api/datastream/SingleOutputStreamOperator.java     |  2 +-
 .../api/environment/StreamExecutionEnvironment.java    |  1 -
 4 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
index 8027797..254afc1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
@@ -31,7 +31,7 @@ import java.io.Serializable;
  * to workers during distributed execution.
  */
 @Public
-public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
+public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
 
 	/**
 	 * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 4e0d626..69c8658 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -37,9 +37,15 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
 
 	boolean isParallel;
 
-	public DataStreamSource(StreamExecutionEnvironment environment,
-			TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
-			boolean isParallel, String sourceName) {
+	/**
+	 * The constructor used to create legacy sources.
+	 */
+	public DataStreamSource(
+			StreamExecutionEnvironment environment,
+			TypeInformation<T> outTypeInfo,
+			StreamSource<T, ?> operator,
+			boolean isParallel,
+			String sourceName) {
 		super(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
 
 		this.isParallel = isParallel;
@@ -48,11 +54,17 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
 		}
 	}
 
+	/**
+	 * Constructor for "deep" sources that manually set up (one or more) custom configured complex operators.
+	 */
 	public DataStreamSource(SingleOutputStreamOperator<T> operator) {
 		super(operator.environment, operator.getTransformation());
 		this.isParallel = true;
 	}
 
+	/**
+	 * Constructor for new Sources (FLIP-27).
+	 */
 	public DataStreamSource(
 			StreamExecutionEnvironment environment,
 			Source<T, ?, ?> source,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 669fa9e..6a70803 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -55,7 +55,7 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	 * we can catch the case when a side output with a matching id is requested for a different
 	 * type because this would lead to problems at runtime.
 	 */
-	private Map<OutputTag<?>, TypeInformation> requestedSideOutputs = new HashMap<>();
+	private Map<OutputTag<?>, TypeInformation<?>> requestedSideOutputs = new HashMap<>();
 
 	private boolean wasSplitApplied = false;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 680e285..c8d038a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1589,7 +1589,6 @@ public class StreamExecutionEnvironment {
 	 * 		the user defined type information for the stream
 	 * @return the data stream constructed
 	 */
-	@SuppressWarnings("unchecked")
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
 
 		TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);


[flink] 01/16: [hotfix][runtime] Annotate ProcessingTimeCallback as a FunctionalInterface

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 907c4945608ba7ae1341d32bac11b3be504a56e9
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 20:21:16 2020 +0200

    [hotfix][runtime] Annotate ProcessingTimeCallback as a FunctionalInterface
    
    The interface is used as such throught the runtime already, but is missing the annotation
    to guard the SAM nature.
---
 .../org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
index 9bccae0..4604380 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
@@ -25,6 +25,7 @@ import org.apache.flink.annotation.Internal;
  * {@link ProcessingTimeService}.
  */
 @Internal
+@FunctionalInterface
 public interface ProcessingTimeCallback {
 
 	/**


[flink] 12/16: [FLINK-17899][runtime] Remove unnecessary implementation of SourceContext in SourceOperatorStreamTask

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac78993878826cc3bd982940b7a115797f07895c
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 23:08:58 2020 +0200

    [FLINK-17899][runtime] Remove unnecessary implementation of SourceContext in SourceOperatorStreamTask
    
    Fewer implementations of the interface make de-virtualization of its methods easier.
---
 .../runtime/tasks/SourceOperatorStreamTask.java    | 33 +++-------------------
 1 file changed, 4 insertions(+), 29 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 19bba2d..47a1d73 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.SourceOperator;
@@ -31,7 +30,6 @@ import org.apache.flink.streaming.runtime.io.StreamTaskInput;
 import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -49,7 +47,7 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
 	@Override
 	public void init() {
 		StreamTaskInput<T> input = new StreamTaskSourceInput<>(headOperator);
-		DataOutput<T> output = new StreamTaskSourceOutput<>(
+		DataOutput<T> output = new AsyncDataOutputToOutput<>(
 			operatorChain.getChainEntryPoint(),
 			getStreamStatusMaintainer());
 
@@ -60,14 +58,13 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
 	}
 
 	/**
-	 * Implementation of {@link DataOutput} that wraps a specific {@link Output} to emit
-	 * stream elements for {@link SourceOperator}.
+	 * Implementation of {@link DataOutput} that wraps a specific {@link Output}.
 	 */
-	private static class StreamTaskSourceOutput<T> extends AbstractDataOutput<T> implements SourceOutput<T> {
+	private static class AsyncDataOutputToOutput<T> extends AbstractDataOutput<T> {
 
 		private final Output<StreamRecord<T>> output;
 
-		StreamTaskSourceOutput(
+		AsyncDataOutputToOutput(
 				Output<StreamRecord<T>> output,
 				StreamStatusMaintainer streamStatusMaintainer) {
 			super(streamStatusMaintainer);
@@ -89,27 +86,5 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
 		public void emitWatermark(Watermark watermark) {
 			output.emitWatermark(watermark);
 		}
-
-		// ------------------- methods from SourceOutput -------------
-
-		@Override
-		public void collect(T record) {
-			output.collect(new StreamRecord<>(record));
-		}
-
-		@Override
-		public void collect(T record, long timestamp) {
-			output.collect(new StreamRecord<>(record, timestamp));
-		}
-
-		@Override
-		public void emitWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
-			output.emitWatermark(new Watermark(watermark.getTimestamp()));
-		}
-
-		@Override
-		public void markIdle() {
-			emitStreamStatus(StreamStatus.IDLE);
-		}
 	}
 }


[flink] 15/16: [hotfix][core] Improve JavaDocs for FLIP-27 sources.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 527f02f9bc8e60e0bf21bbfb2a9c476b76500ea2
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 27 13:40:33 2020 +0200

    [hotfix][core] Improve JavaDocs for FLIP-27 sources.
---
 .../flink/api/connector/source/ReaderOutput.java   | 40 +++++++++++++++++++---
 .../flink/api/connector/source/SourceOutput.java   | 30 +++++++++++++---
 2 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
index 1774a7c..dbfcba8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
@@ -19,17 +19,38 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.Watermark;
 
 /**
- * The interface provided by Flink task to the {@link SourceReader} to emit records
- * to downstream operators for message processing.
+ * The interface provided by the Flink runtime to the {@link SourceReader} to emit records, and
+ * optionally watermarks, to downstream operators for message processing.
+ *
+ * <p>The {@code ReaderOutput} is a {@link SourceOutput} and can be used directly to emit the stream
+ * of events from the source. This is recommended for source where the SourceReader processes only a
+ * single split, or where NO split-specific characteristics are required (like per-split watermarks
+ * and idleness, split-specific event-time skew handling, etc.).
+ * As a special case, this is true for sources that are purely supporting bounded/batch data processing.
+ *
+ * <p>For most streaming sources, the {@code SourceReader} should use split-specific outputs, to allow
+ * the processing logic to run per-split watermark generators, idleness detection, etc.
+ * To create a split-specific {@code SourceOutput} use the {@link ReaderOutput#createOutputForSplit(String)}
+ * method, using the Source Split's ID. Make sure to release the output again once the source has finished
+ * processing that split.
  */
 @PublicEvolving
 public interface ReaderOutput<T> extends SourceOutput<T> {
 
 	/**
-	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 * Emit a record without a timestamp.
+	 *
+	 * <p>Use this method if the source system does not have a notion of records with timestamps.
+	 *
+	 * <p>The events later pass through a {@link TimestampAssigner}, which attaches a timestamp
+	 * to the event based on the event's contents. For example a file source with JSON records would not
+	 * have a generic timestamp from the file reading and JSON parsing process, and thus use this
+	 * method to produce initially a record without a timestamp. The {@code TimestampAssigner} in
+	 * the next step would be used to extract timestamp from a field of the JSON object.
 	 *
 	 * @param record the record to emit.
 	 */
@@ -37,9 +58,18 @@ public interface ReaderOutput<T> extends SourceOutput<T> {
 	void collect(T record);
 
 	/**
-	 * Emit a record with timestamp.
+	 * Emit a record with a timestamp.
 	 *
-	 * @param record the record to emit.
+	 * <p>Use this method if the source system has timestamps attached to records. Typical examples
+	 * would be Logs, PubSubs, or Message Queues, like Kafka or Kinesis, which store a timestamp with
+	 * each event.
+	 *
+	 * <p>The events typically still pass through a {@link TimestampAssigner}, which may decide to
+	 * either use this source-provided timestamp, or replace it with a timestamp stored within the
+	 * event (for example if the event was a JSON object one could configure aTimestampAssigner that
+	 * extracts one of the object's fields and uses that as a timestamp).
+	 *
+	 * @param record    the record to emit.
 	 * @param timestamp the timestamp of the record.
 	 */
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
index ccf560b..ff088d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
@@ -19,24 +19,46 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 
 /**
- * The interface provided by Flink task to the {@link SourceReader} to emit records
- * to downstream operators for message processing.
+ * The {@code SourceOutput} is the gateway for a {@link SourceReader}) to emit the produced
+ * records and watermarks.
+ *
+ * <p>A {@code SourceReader} may have multiple SourceOutputs, scoped to individual <i>Source Splits</i>. That
+ * way, streams of events from different splits can be identified and treated separately, for example
+ * for watermark generation, or event-time skew handling.
  */
 @PublicEvolving
 public interface SourceOutput<T> extends WatermarkOutput {
 
 	/**
-	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 * Emit a record without a timestamp.
+	 *
+	 * <p>Use this method if the source system does not have a notion of records with timestamps.
+	 *
+	 * <p>The events later pass through a {@link TimestampAssigner}, which attaches a timestamp
+	 * to the event based on the event's contents. For example a file source with JSON records would not
+	 * have a generic timestamp from the file reading and JSON parsing process, and thus use this
+	 * method to produce initially a record without a timestamp. The {@code TimestampAssigner} in
+	 * the next step would be used to extract timestamp from a field of the JSON object.
 	 *
 	 * @param record the record to emit.
 	 */
 	void collect(T record);
 
 	/**
-	 * Emit a record with timestamp.
+	 * Emit a record with a timestamp.
+	 *
+	 * <p>Use this method if the source system has timestamps attached to records. Typical examples
+	 * would be Logs, PubSubs, or Message Queues, like Kafka or Kinesis, which store a timestamp with
+	 * each event.
+	 *
+	 * <p>The events typically still pass through a {@link TimestampAssigner}, which may decide to
+	 * either use this source-provided timestamp, or replace it with a timestamp stored within the
+	 * event (for example if the event was a JSON object one could configure aTimestampAssigner that
+	 * extracts one of the object's fields and uses that as a timestamp).
 	 *
 	 * @param record the record to emit.
 	 * @param timestamp the timestamp of the record.


[flink] 14/16: [FLINK-17950][Scala DataStream API] Fix StreamExecutionEnvironment.continuousSource(...) method

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d2ac14657298f1f749d7de9867f4d46008ce2ab
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 26 18:58:35 2020 +0200

    [FLINK-17950][Scala DataStream API] Fix StreamExecutionEnvironment.continuousSource(...) method
    
      - Fix return type from Unit to DataStream
      - Forward inferred TypeInformation
      - Add test
---
 .../api/connector/source/mocks/MockSource.java     |  3 ++
 flink-streaming-scala/pom.xml                      |  8 +++
 .../api/scala/StreamExecutionEnvironment.scala     |  6 ++-
 .../api/scala/StreamExecutionEnvironmentTest.scala | 57 ++++++++++++++++++++++
 4 files changed, 72 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
index 4d21776..7866e6e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
@@ -35,6 +35,9 @@ import java.util.Set;
  * A mock {@link Source} for unit tests.
  */
 public class MockSource implements Source<Integer, MockSourceSplit, Set<MockSourceSplit>> {
+
+	private static final long serialVersionUID = 1L;
+
 	private final Boundedness boundedness;
 	private final int numSplits;
 	private List<MockSourceReader> createdReaders;
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index 4d575ad..b516254 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -85,6 +85,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 2b23845..925d571 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -669,8 +669,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def continuousSource[T: TypeInformation](
       source: Source[T, _ <: SourceSplit, _],
       watermarkStrategy: WatermarkStrategy[T],
-      sourceName: String): Unit = {
-    asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName))
+      sourceName: String): DataStream[T] = {
+
+    val typeInfo = implicitly[TypeInformation[T]]
+    asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName, typeInfo))
   }
 
   /**
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
new file mode 100644
index 0000000..f493497
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategies
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.connector.source.Boundedness
+import org.apache.flink.api.connector.source.mocks.MockSource
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+/**
+ * Tests for the [[StreamExecutionEnvironment]].
+ */
+class StreamExecutionEnvironmentTest {
+
+  /**
+   * Verifies that calls to timeWindow() instantiate a regular
+   * windowOperator instead of an aligned one.
+   */
+  @Test
+  def testAlignedWindowDeprecation(): Unit = {
+    implicit val typeInfo: TypeInformation[Integer] = new MockTypeInfo()
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val stream = env.continuousSource(
+      new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1),
+      WatermarkStrategies.noWatermarks[Integer]().build(),
+      "test source")
+
+    assertEquals(typeInfo, stream.dataType)
+  }
+
+  // --------------------------------------------------------------------------
+  //  mocks
+  // --------------------------------------------------------------------------
+
+  private class MockTypeInfo extends GenericTypeInfo[Integer](classOf[Integer]) {}
+}


[flink] 13/16: [FLINK-17899][runtime] Add WatermarkStrategies to countinuousSource() methods in the DataStream API

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e4a7fe3ed46a729420d12db3e714f594f84d03d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 26 18:47:48 2020 +0200

    [FLINK-17899][runtime] Add WatermarkStrategies to countinuousSource() methods in the DataStream API
---
 .../base/source/reader/CoordinatedSourceITCase.java  | 16 +++++++++++++---
 .../streaming/api/datastream/DataStreamSource.java   |  4 +++-
 .../api/environment/StreamExecutionEnvironment.java  | 20 ++++++++++++++++----
 .../api/operators/SourceOperatorFactory.java         | 17 +++++++++++------
 .../api/graph/StreamingJobGraphGeneratorTest.java    | 13 +++++++++----
 .../api/scala/StreamExecutionEnvironment.scala       |  4 +++-
 6 files changed, 55 insertions(+), 19 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index 1227e9f..4877def 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.configuration.Configuration;
@@ -44,7 +45,10 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
 	public void testEnumeratorReaderCommunication() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
-		DataStream<Integer> stream = env.continuousSource(source, "TestingSource");
+		DataStream<Integer> stream = env.continuousSource(
+				source,
+				WatermarkStrategies.<Integer>noWatermarks().build(),
+				"TestingSource");
 		executeAndVerify(env, stream, 20);
 	}
 
@@ -53,8 +57,14 @@ public class CoordinatedSourceITCase extends AbstractTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
 		MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
-		DataStream<Integer> stream1 = env.continuousSource(source1, "TestingSource1");
-		DataStream<Integer> stream2 = env.continuousSource(source2, "TestingSource2");
+		DataStream<Integer> stream1 = env.continuousSource(
+				source1,
+				WatermarkStrategies.<Integer>noWatermarks().build(),
+				"TestingSource1");
+		DataStream<Integer> stream2 = env.continuousSource(
+				source2,
+				WatermarkStrategies.<Integer>noWatermarks().build(),
+				"TestingSource2");
 		executeAndVerify(env, stream1.union(stream2), 40);
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 69c8658..9fb2c26 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Source;
@@ -68,12 +69,13 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
 	public DataStreamSource(
 			StreamExecutionEnvironment environment,
 			Source<T, ?, ?> source,
+			WatermarkStrategy<T> timestampsAndWatermarks,
 			TypeInformation<T> outTypeInfo,
 			String sourceName) {
 		super(environment,
 				new SourceTransformation<>(
 						sourceName,
-						new SourceOperatorFactory<>(source),
+						new SourceOperatorFactory<>(source, timestampsAndWatermarks),
 						outTypeInfo,
 						environment.getParallelism()));
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 52204cf..a47eaed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FilePathFilter;
@@ -1614,8 +1615,11 @@ public class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	@Experimental
-	public <OUT> DataStreamSource<OUT> continuousSource(Source<OUT, ?, ?> source, String sourceName) {
-		return continuousSource(source, sourceName, null);
+	public <OUT> DataStreamSource<OUT> continuousSource(
+			Source<OUT, ?, ?> source,
+			WatermarkStrategy<OUT> timestampsAndWatermarks,
+			String sourceName) {
+		return continuousSource(source, timestampsAndWatermarks, sourceName, null);
 	}
 
 	/**
@@ -1634,10 +1638,18 @@ public class StreamExecutionEnvironment {
 	@Experimental
 	public <OUT> DataStreamSource<OUT> continuousSource(
 			Source<OUT, ?, ?> source,
+			WatermarkStrategy<OUT> timestampsAndWatermarks,
 			String sourceName,
 			TypeInformation<OUT> typeInfo) {
-		TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo);
-		return new DataStreamSource<>(this, source, resolvedTypeInfo, sourceName);
+
+		final TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo);
+
+		return new DataStreamSource<>(
+				this,
+				checkNotNull(source, "source"),
+				checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"),
+				checkNotNull(resolvedTypeInfo),
+				checkNotNull(sourceName));
 	}
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 02c7927..d1636cd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -28,12 +28,13 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
-import org.apache.flink.streaming.api.operators.source.NoOpWatermarkGenerator;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
 
 import java.util.function.Function;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The Factory class for {@link SourceOperator}.
  */
@@ -46,17 +47,21 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	private final Source<OUT, ?, ?> source;
 
 	/** The event time setup (timestamp assigners, watermark generators, etc.). */
-	private final WatermarkStrategy<OUT> watermarkStrategy = (ctx) -> new NoOpWatermarkGenerator<>();
+	private final WatermarkStrategy<OUT> watermarkStrategy;
 
 	/** The number of worker thread for the source coordinator. */
 	private final int numCoordinatorWorkerThread;
 
-	public SourceOperatorFactory(Source<OUT, ?, ?> source) {
-		this(source, 1);
+	public SourceOperatorFactory(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy) {
+		this(source, watermarkStrategy, 1);
 	}
 
-	public SourceOperatorFactory(Source<OUT, ?, ?> source, int numCoordinatorWorkerThread) {
-		this.source = source;
+	public SourceOperatorFactory(
+			Source<OUT, ?, ?> source,
+			WatermarkStrategy<OUT> watermarkStrategy,
+			int numCoordinatorWorkerThread) {
+		this.source = checkNotNull(source);
+		this.watermarkStrategy = checkNotNull(watermarkStrategy);
 		this.numCoordinatorWorkerThread = numCoordinatorWorkerThread;
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 1fb977a..fec7b0d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -250,8 +251,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testOperatorCoordinatorAddedToJobVertex() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Integer> stream =
-				env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestingSource");
+		DataStream<Integer> stream = env.continuousSource(
+				new MockSource(Boundedness.BOUNDED, 1),
+				WatermarkStrategies.<Integer>noWatermarks().build(),
+				"TestingSource");
 
 		OneInputTransformation<Integer, Integer> resultTransform = new OneInputTransformation<Integer, Integer>(
 				stream.getTransformation(),
@@ -458,8 +461,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testCoordinatedOperator() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		DataStream<Integer> source =
-				env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestSource");
+		DataStream<Integer> source = env.continuousSource(
+				new MockSource(Boundedness.BOUNDED, 1),
+				WatermarkStrategies.<Integer>noWatermarks().build(),
+				"TestSource");
 		source.addSink(new DiscardingSink<>());
 
 		StreamGraph streamGraph = env.getStreamGraph();
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 509ab65..2b23845 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving}
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
 import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -667,8 +668,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   @Experimental
   def continuousSource[T: TypeInformation](
       source: Source[T, _ <: SourceSplit, _],
+      watermarkStrategy: WatermarkStrategy[T],
       sourceName: String): Unit = {
-    asScalaStream(javaEnv.continuousSource(source, sourceName))
+    asScalaStream(javaEnv.continuousSource(source, watermarkStrategy, sourceName))
   }
 
   /**


[flink] 04/16: [FLINK-17898][core] Remove Exceptions from signatures of SourceOutput methods

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cb6d3c390ac05f01162b6636735472162a8077ea
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat May 23 23:00:15 2020 +0200

    [FLINK-17898][core] Remove Exceptions from signatures of SourceOutput methods
---
 .../main/java/org/apache/flink/api/connector/source/SourceOutput.java | 4 ++--
 .../flink/streaming/runtime/tasks/SourceOperatorStreamTask.java       | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
index 4d9f065..3896b15 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
@@ -33,7 +33,7 @@ public interface SourceOutput<T> extends WatermarkOutput {
 	 *
 	 * @param record the record to emit.
 	 */
-	void collect(T record) throws Exception;
+	void collect(T record);
 
 	/**
 	 * Emit a record with timestamp.
@@ -41,5 +41,5 @@ public interface SourceOutput<T> extends WatermarkOutput {
 	 * @param record the record to emit.
 	 * @param timestamp the timestamp of the record.
 	 */
-	void collect(T record, long timestamp) throws Exception;
+	void collect(T record, long timestamp);
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 5898aa9..19bba2d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -93,12 +93,12 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T,
 		// ------------------- methods from SourceOutput -------------
 
 		@Override
-		public void collect(T record) throws Exception {
+		public void collect(T record) {
 			output.collect(new StreamRecord<>(record));
 		}
 
 		@Override
-		public void collect(T record, long timestamp) throws Exception {
+		public void collect(T record, long timestamp) {
 			output.collect(new StreamRecord<>(record, timestamp));
 		}
 


[flink] 10/16: [FLINK-17899][core][refactor] Add a utility NoWatermarksGenerator

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c0163d29d25555e4552b2b88d1ca0aabad46edbe
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 26 18:41:17 2020 +0200

    [FLINK-17899][core][refactor] Add a utility NoWatermarksGenerator
---
 .../common/eventtime/NoWatermarksGenerator.java    | 34 ++++++++++++++++++++++
 .../api/common/eventtime/WatermarkStrategies.java  |  8 +++++
 2 files changed, 42 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/NoWatermarksGenerator.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/NoWatermarksGenerator.java
new file mode 100644
index 0000000..b20a13f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/NoWatermarksGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * An implementation of a {@link WatermarkGenerator} that generates no Watermarks.
+ */
+@Public
+public final class NoWatermarksGenerator<E> implements WatermarkGenerator<E> {
+
+	@Override
+	public void onEvent(E event, long eventTimestamp, WatermarkOutput output) {}
+
+	@Override
+	public void onPeriodicEmit(WatermarkOutput output) {}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java
index 44b5474..880edc7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java
@@ -175,6 +175,14 @@ public final class WatermarkStrategies<T> {
 		return new WatermarkStrategies<>(new FromWatermarkGeneratorSupplier<>(generatorSupplier));
 	}
 
+	/**
+	 * Starts building a watermark strategy that generates no watermarks at all.
+	 * This may be useful in scenarios that do pure processing-time based stream processing.
+	 */
+	public static <T> WatermarkStrategies<T> noWatermarks() {
+		return new WatermarkStrategies<>((ctx) -> new NoWatermarksGenerator<>());
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class FromWatermarkGeneratorSupplier<T> implements WatermarkStrategy<T> {


[flink] 02/16: [hotfix][core] Add a constant for the special value "NO_TIMESTAMP".

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 76c46de5d1e7a7a506e8196a35a095e05dab1d4f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 26 13:28:23 2020 +0200

    [hotfix][core] Add a constant for the special value "NO_TIMESTAMP".
---
 .../org/apache/flink/api/common/eventtime/TimestampAssigner.java | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java
index c5bc828..7265cea 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java
@@ -36,12 +36,19 @@ import org.apache.flink.annotation.Public;
 public interface TimestampAssigner<T> {
 
 	/**
+	 * The value that is passed to {@link #extractTimestamp} when there is no previous timestamp
+	 * attached to the record.
+	 */
+	long NO_TIMESTAMP = Long.MIN_VALUE;
+
+	/**
 	 * Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of
 	 * any particular time zone or calendar.
 	 *
 	 * <p>The method is passed the previously assigned timestamp of the element.
 	 * That previous timestamp may have been assigned from a previous assigner. If the element did
-	 * not carry a timestamp before, this value is {@code Long.MIN_VALUE}.
+	 * not carry a timestamp before, this value is {@link #NO_TIMESTAMP} (= {@code Long.MIN_VALUE}:
+	 * {@value Long#MIN_VALUE}).
 	 *
 	 * @param element The element that the timestamp will be assigned to.
 	 * @param recordTimestamp The current internal timestamp of the element,


[flink] 05/16: [FLINK-17897][core] Classify FLIP-27 source API to @Experimental / @PublicEvolving

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit db837a41c5f15dc689c7e3e9c27d99d98bd6be6e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 00:36:30 2020 +0200

    [FLINK-17897][core] Classify FLIP-27 source API to @Experimental / @PublicEvolving
---
 .../main/java/org/apache/flink/api/connector/source/Boundedness.java | 4 ++--
 .../main/java/org/apache/flink/api/connector/source/ReaderInfo.java  | 4 ++--
 .../src/main/java/org/apache/flink/api/connector/source/Source.java  | 4 ++--
 .../main/java/org/apache/flink/api/connector/source/SourceEvent.java | 4 ++--
 .../java/org/apache/flink/api/connector/source/SourceOutput.java     | 4 ++--
 .../java/org/apache/flink/api/connector/source/SourceReader.java     | 4 ++--
 .../org/apache/flink/api/connector/source/SourceReaderContext.java   | 4 ++--
 .../main/java/org/apache/flink/api/connector/source/SourceSplit.java | 4 ++--
 .../java/org/apache/flink/api/connector/source/SplitEnumerator.java  | 4 ++--
 .../apache/flink/api/connector/source/SplitEnumeratorContext.java    | 4 ++--
 .../java/org/apache/flink/api/connector/source/SplitsAssignment.java | 4 ++--
 .../flink/streaming/api/environment/StreamExecutionEnvironment.java  | 5 +++--
 .../flink/streaming/api/scala/StreamExecutionEnvironment.scala       | 3 ++-
 13 files changed, 27 insertions(+), 25 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java
index 5b5c4a6..d09318e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
  * The boundedness of a stream. A stream could either be "bounded" (a stream with finite records) or
  * "unbounded" (a stream with infinite records).
  */
-@Public
+@PublicEvolving
 public enum Boundedness {
 	/**
 	 * A BOUNDED stream is a stream with finite records.
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
index f4a8f8d..f8535e59 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.Serializable;
 import java.util.Objects;
@@ -26,7 +26,7 @@ import java.util.Objects;
 /**
  * A container class hosting the information of a {@link SourceReader}.
  */
-@Public
+@PublicEvolving
 public final class ReaderInfo implements Serializable {
 
 	private static final long serialVersionUID = 1L;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
index 1b8a1ca..53fc065 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
@@ -33,7 +33,7 @@ import java.io.Serializable;
  * @param <SplitT>   The type of splits handled by the source.
  * @param <EnumChkT> The type of the enumerator checkpoints.
  */
-@Public
+@PublicEvolving
 public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends Serializable {
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java
index 2377bb7..c21aaba 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.Serializable;
 
 /**
  * An base class for the events passed between the SourceReaders and Enumerators.
  */
-@Public
+@PublicEvolving
 public interface SourceEvent extends Serializable {
 
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
index 3896b15..ccf560b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 
 /**
  * The interface provided by Flink task to the {@link SourceReader} to emit records
  * to downstream operators for message processing.
  */
-@Public
+@PublicEvolving
 public interface SourceOutput<T> extends WatermarkOutput {
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index 66a1229..6a26d70 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.InputStatus;
 
 import java.util.List;
@@ -31,7 +31,7 @@ import java.util.concurrent.CompletableFuture;
  * @param <T> The type of the record emitted by this source reader.
  * @param <SplitT> The type of the the source splits.
  */
-@Public
+@PublicEvolving
 public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable {
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 2936649..ead9ae2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
 
 /**
  * The class that expose some context from runtime to the {@link SourceReader}.
  */
-@Public
+@PublicEvolving
 public interface SourceReaderContext {
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceSplit.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceSplit.java
index d317f0e..7f2040a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceSplit.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
  * An interface for all the Split types to extend.
  */
-@Public
+@PublicEvolving
 public interface SourceSplit {
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index 16b3938..517a492 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.IOException;
 import java.util.List;
@@ -28,7 +28,7 @@ import java.util.List;
  * 1. discover the splits for the {@link SourceReader} to read.
  * 2. assign the splits to the source reader.
  */
-@Public
+@PublicEvolving
 public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index db6ccad..33f4a4c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
 
 import java.util.Map;
@@ -34,7 +34,7 @@ import java.util.function.BiConsumer;
  *
  * @param <SplitT> the type of the splits.
  */
-@Public
+@PublicEvolving
 public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
 
 	MetricGroup metricGroup();
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
index aad3577..33d8a13 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.connector.source;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 
 import java.util.List;
 import java.util.Map;
@@ -29,7 +29,7 @@ import java.util.Map;
  * <p>The assignment is always incremental. In another word, splits in the assignment are simply
  * added to the existing assignment.
  */
-@Public
+@PublicEvolving
 public final class SplitsAssignment<SplitT extends SourceSplit> {
 	private final Map<Integer, List<SplitT>> assignment;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c8d038a..52204cf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -1612,7 +1613,7 @@ public class StreamExecutionEnvironment {
 	 * 		type of the returned stream
 	 * @return the data stream constructed
 	 */
-	@PublicEvolving
+	@Experimental
 	public <OUT> DataStreamSource<OUT> continuousSource(Source<OUT, ?, ?> source, String sourceName) {
 		return continuousSource(source, sourceName, null);
 	}
@@ -1630,7 +1631,7 @@ public class StreamExecutionEnvironment {
 	 * 		the user defined type information for the stream
 	 * @return the data stream constructed
 	 */
-	@PublicEvolving
+	@Experimental
 	public <OUT> DataStreamSource<OUT> continuousSource(
 			Source<OUT, ?, ?> source,
 			String sourceName,
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 37bb4f9..509ab65 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.scala
 
 import com.esotericsoftware.kryo.Serializer
-import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
+import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -664,6 +664,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   /**
     * Create a DataStream using a [[Source]].
     */
+  @Experimental
   def continuousSource[T: TypeInformation](
       source: Source[T, _ <: SourceSplit, _],
       sourceName: String): Unit = {


[flink] 08/16: [FLINK-17904][runtime] Add scheduleWithFixedDelay to ProcessingTimeService

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b03bbf69305dc0a65214e66357126455162eaf9d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 16:25:47 2020 +0200

    [FLINK-17904][runtime] Add scheduleWithFixedDelay to ProcessingTimeService
---
 .../api/runtime/NeverFireProcessingTimeService.java  |  6 ++++++
 .../runtime/tasks/ProcessingTimeService.java         | 17 +++++++++++++++++
 .../runtime/tasks/ProcessingTimeServiceImpl.java     | 10 ++++++++++
 .../runtime/tasks/SystemProcessingTimeService.java   | 20 ++++++++++++++------
 .../runtime/tasks/TestProcessingTimeService.java     |  6 ++++++
 5 files changed, 53 insertions(+), 6 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
index 4e16fcc..5b32bd0 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
@@ -52,6 +52,12 @@ public final class NeverFireProcessingTimeService implements TimerService {
 	}
 
 	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(
+		ProcessingTimeCallback callback, long initialDelay, long period) {
+		return FUTURE;
+	}
+
+	@Override
 	public boolean isTerminated() {
 		return shutdown.get();
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 3f37ff5..9ce97f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Defines the current processing time and handles all related actions,
@@ -48,6 +49,9 @@ public interface ProcessingTimeService {
 	/**
 	 * Registers a task to be executed repeatedly at a fixed rate.
 	 *
+	 * <p>This call behaves similar to
+	 * {@link org.apache.flink.runtime.concurrent.ScheduledExecutor#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
+	 *
 	 * @param callback to be executed after the initial delay and then after each period
 	 * @param initialDelay initial delay to start executing callback
 	 * @param period after the initial delay after which the callback is executed
@@ -56,6 +60,19 @@ public interface ProcessingTimeService {
 	ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period);
 
 	/**
+	 * Registers a task to be executed repeatedly with a fixed delay.
+	 *
+	 * <p>This call behaves similar to
+	 * {@link org.apache.flink.runtime.concurrent.ScheduledExecutor#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
+	 *
+	 * @param callback to be executed after the initial delay and then after each period
+	 * @param initialDelay initial delay to start executing callback
+	 * @param period after the initial delay after which the callback is executed
+	 * @return Scheduled future representing the task to be executed repeatedly
+	 */
+	ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period);
+
+	/**
 	 * This method puts the service into a state where it does not register new timers, but
 	 * returns for each call to {@link #registerTimer} or {@link #scheduleAtFixedRate} a "mock"
 	 * future and the "mock" future will be never completed. Furthermore, the timers registered
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
index 61ae9f2..456d0a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
@@ -77,6 +77,16 @@ class ProcessingTimeServiceImpl implements ProcessingTimeService {
 	}
 
 	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
+		if (isQuiesced()) {
+			return new NeverCompleteFuture(initialDelay);
+		}
+
+		return timerService.scheduleWithFixedDelay(
+			addQuiesceProcessingToCallback(processingTimeCallbackWrapper.apply(callback)), initialDelay, period);
+	}
+
+	@Override
 	public CompletableFuture<Void> quiesce() {
 		if (!quiesced) {
 			quiesced = true;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index c0ecd85..fc53870 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -126,16 +126,24 @@ public class SystemProcessingTimeService implements TimerService {
 
 	@Override
 	public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
-		long nextTimestamp = getCurrentProcessingTime() + initialDelay;
+		return scheduleRepeatedly(callback, initialDelay, period, false);
+	}
+
+	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
+		return scheduleRepeatedly(callback, initialDelay, period, true);
+	}
+
+	private ScheduledFuture<?> scheduleRepeatedly(ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) {
+		final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
+		final Runnable task = wrapOnTimerCallback(callback, nextTimestamp, period);
 
 		// we directly try to register the timer and only react to the status on exception
 		// that way we save unnecessary volatile accesses for each timer
 		try {
-			return timerService.scheduleAtFixedRate(
-				wrapOnTimerCallback(callback, nextTimestamp, period),
-				initialDelay,
-				period,
-				TimeUnit.MILLISECONDS);
+			return fixedDelay
+					? timerService.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.MILLISECONDS)
+					: timerService.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
 		} catch (RejectedExecutionException e) {
 			final int status = this.status.get();
 			if (status == STATUS_QUIESCED) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 67a0ef7..7c5742c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -113,6 +113,12 @@ public class TestProcessingTimeService implements TimerService {
 	}
 
 	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeCallback callback, long initialDelay, long period) {
+		// for all testing purposed, there is no difference between the fixed rate and fixed delay
+		return scheduleAtFixedRate(callback, initialDelay, period);
+	}
+
+	@Override
 	public boolean isTerminated() {
 		return isTerminated;
 	}


[flink] 16/16: [hotfix] Adjust License Headers for FLIP-27 sources to be same as the remaining code base

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a23d5b07de842e97e8907ccce87f5a59dc4ed752
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 27 13:47:41 2020 +0200

    [hotfix] Adjust License Headers for FLIP-27 sources to be same as the remaining code base
---
 .../base/source/event/NoMoreSplitsEvent.java       | 30 +++++++++++-----------
 .../base/source/reader/RecordEmitter.java          | 30 +++++++++++-----------
 .../base/source/reader/RecordsBySplits.java        | 30 +++++++++++-----------
 .../base/source/reader/RecordsWithSplitIds.java    | 30 +++++++++++-----------
 .../SingleThreadMultiplexSourceReaderBase.java     | 30 +++++++++++-----------
 .../base/source/reader/SourceReaderBase.java       | 30 +++++++++++-----------
 .../base/source/reader/SourceReaderOptions.java    | 30 +++++++++++-----------
 .../base/source/reader/SplitsRecordIterator.java   | 30 +++++++++++-----------
 .../base/source/reader/fetcher/AddSplitsTask.java  | 30 +++++++++++-----------
 .../base/source/reader/fetcher/FetchTask.java      | 30 +++++++++++-----------
 .../reader/fetcher/SingleThreadFetcherManager.java | 30 +++++++++++-----------
 .../base/source/reader/fetcher/SplitFetcher.java   | 30 +++++++++++-----------
 .../source/reader/fetcher/SplitFetcherManager.java | 30 +++++++++++-----------
 .../source/reader/fetcher/SplitFetcherTask.java    | 30 +++++++++++-----------
 .../source/reader/splitreader/SplitReader.java     | 30 +++++++++++-----------
 .../source/reader/splitreader/SplitsAddition.java  | 30 +++++++++++-----------
 .../source/reader/splitreader/SplitsChange.java    | 30 +++++++++++-----------
 .../FutureCompletingBlockingQueue.java             | 30 +++++++++++-----------
 .../reader/synchronization/FutureNotifier.java     | 30 +++++++++++-----------
 .../source/reader/CoordinatedSourceITCase.java     | 30 +++++++++++-----------
 .../base/source/reader/SourceReaderBaseTest.java   | 30 +++++++++++-----------
 .../base/source/reader/SourceReaderTestBase.java   | 30 +++++++++++-----------
 .../source/reader/fetcher/SplitFetcherTest.java    | 30 +++++++++++-----------
 .../base/source/reader/mocks/MockBaseSource.java   | 30 +++++++++++-----------
 .../source/reader/mocks/MockRecordEmitter.java     | 30 +++++++++++-----------
 .../base/source/reader/mocks/MockSourceReader.java | 30 +++++++++++-----------
 .../source/reader/mocks/MockSplitEnumerator.java   | 30 +++++++++++-----------
 .../base/source/reader/mocks/MockSplitReader.java  | 30 +++++++++++-----------
 .../reader/synchronization/FutureNotifierTest.java | 30 +++++++++++-----------
 .../flink/api/connector/source/Boundedness.java    | 30 +++++++++++-----------
 .../flink/api/connector/source/ReaderInfo.java     | 30 +++++++++++-----------
 .../flink/api/connector/source/ReaderOutput.java   | 30 +++++++++++-----------
 .../apache/flink/api/connector/source/Source.java  | 30 +++++++++++-----------
 .../flink/api/connector/source/SourceEvent.java    | 30 +++++++++++-----------
 .../flink/api/connector/source/SourceOutput.java   | 30 +++++++++++-----------
 .../flink/api/connector/source/SourceReader.java   | 30 +++++++++++-----------
 .../api/connector/source/SourceReaderContext.java  | 30 +++++++++++-----------
 .../flink/api/connector/source/SourceSplit.java    | 30 +++++++++++-----------
 .../api/connector/source/SplitEnumerator.java      | 30 +++++++++++-----------
 .../connector/source/SplitEnumeratorContext.java   | 30 +++++++++++-----------
 .../api/connector/source/SplitsAssignment.java     | 30 +++++++++++-----------
 .../api/connector/source/mocks/MockSource.java     | 30 +++++++++++-----------
 .../connector/source/mocks/MockSourceReader.java   | 30 +++++++++++-----------
 .../connector/source/mocks/MockSourceSplit.java    | 30 +++++++++++-----------
 .../source/mocks/MockSourceSplitSerializer.java    | 30 +++++++++++-----------
 .../source/mocks/MockSplitEnumerator.java          | 30 +++++++++++-----------
 .../MockSplitEnumeratorCheckpointSerializer.java   | 30 +++++++++++-----------
 47 files changed, 705 insertions(+), 705 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java
index ed3663b..e2ca2e4 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.event;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java
index 6a26aa5..9dbd23a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordEmitter.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
index bdd16fff..77cb594 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index 813c6a5..f616125 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 0e22ef0..546e20a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 1da08d1..e01180e 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
index 10eb58b..508b347 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
index 47dfd87..d7b7b76 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index 8f9fdc5..19b15b5 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.fetcher;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 7db615f..aff21fd 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.fetcher;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index 2745a36..bd5879f 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.fetcher;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 703945d..d006bb0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.fetcher;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 3b04bac..61bada1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.fetcher;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index 997cb65..716d2e2 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.fetcher;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index e8c1987..89cf81b 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.splitreader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
index a6b6b4e..ebd2330 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.splitreader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java
index 1af330bd..eae7ad0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsChange.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.splitreader;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index 407f305..6a6dfac 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.synchronization;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
index 53e60f3..9330407 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.synchronization;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index 4877def..a7f4c22 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index d3c9bd7..26504cb 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index 5f58836..2acd4e1 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 7ded7e5..953d7aa 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.fetcher;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index 137afd9..ae46286 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.mocks;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
index 66fa3bc..f1061dc 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockRecordEmitter.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.mocks;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index aada92c..92a19ef 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.mocks;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
index b3334a2..2c84a70 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.mocks;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index e0edabd..3c6d8df 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.mocks;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
index b775254..b257ebf 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.connector.base.source.reader.synchronization;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java
index d09318e..89e07c5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
index f8535e59..7e7e3f0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
index dbfcba8..6a45d03 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
index 53fc065..fe81db8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java
index c21aaba..d07c660 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
index ff088d6..02d41f5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index b09a724..3e623dd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index ead9ae2..658034b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceSplit.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceSplit.java
index 7f2040a..95a5f8d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceSplit.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index 517a492..1e429bb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 33f4a4c..5aee6dd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
index 33d8a13..6331788 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source;
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
index 7866e6e..c48b380 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source.mocks;
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index 0bdcdec..bdccf88 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source.mocks;
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
index bafc5b2..dc8ce80 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source.mocks;
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplitSerializer.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplitSerializer.java
index 331e6ddd..621b4b9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplitSerializer.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplitSerializer.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source.mocks;
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
index 64ec8af..93f6c77 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source.mocks;
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
index 4879b18..6763a62 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
@@ -1,19 +1,19 @@
 /*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.api.connector.source.mocks;


[flink] 07/16: [FLINK-17096][core] Simple performance improvements in WatermarkOutputMultiplexer

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5353c47b6627e067a5c18d81bc8ff389a7de18d8
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 23:55:24 2020 +0200

    [FLINK-17096][core] Simple performance improvements in WatermarkOutputMultiplexer
---
 .../eventtime/WatermarkOutputMultiplexer.java      | 27 ++++++++--------------
 1 file changed, 9 insertions(+), 18 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
index 44e1e35..545f78a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
@@ -111,11 +111,8 @@ public class WatermarkOutputMultiplexer {
 	 * outputs.
 	 */
 	public WatermarkOutput getImmediateOutput(String outputId) {
-		Preconditions.checkArgument(
-				watermarkPerOutputId.containsKey(outputId),
-				"no output registered under id " + outputId);
-
-		OutputState outputState = watermarkPerOutputId.get(outputId);
+		final OutputState outputState = watermarkPerOutputId.get(outputId);
+		Preconditions.checkArgument(outputState != null, "no output registered under id %s", outputId);
 		return new ImmediateOutput(outputState);
 	}
 
@@ -126,11 +123,8 @@ public class WatermarkOutputMultiplexer {
 	 * outputs.
 	 */
 	public WatermarkOutput getDeferredOutput(String outputId) {
-		Preconditions.checkArgument(
-				watermarkPerOutputId.containsKey(outputId),
-				"no output registered under id " + outputId);
-
-		OutputState outputState = watermarkPerOutputId.get(outputId);
+		final OutputState outputState = watermarkPerOutputId.get(outputId);
+		Preconditions.checkArgument(outputState != null, "no output registered under id %s", outputId);
 		return new DeferredOutput(outputState);
 	}
 
@@ -178,8 +172,8 @@ public class WatermarkOutputMultiplexer {
 	 * Per-output watermark state.
 	 */
 	private static class OutputState {
-		private volatile long watermark = Long.MIN_VALUE;
-		private volatile boolean idle = false;
+		private long watermark = Long.MIN_VALUE;
+		private boolean idle = false;
 
 		/**
 		 * Returns the current watermark timestamp. This will throw {@link IllegalStateException} if
@@ -198,12 +192,9 @@ public class WatermarkOutputMultiplexer {
 		 */
 		public boolean setWatermark(long watermark) {
 			this.idle = false;
-			if (watermark > this.watermark) {
-				this.watermark = watermark;
-				return true;
-			} else {
-				return false;
-			}
+			final boolean updated = watermark > this.watermark;
+			this.watermark = Math.max(watermark, this.watermark);
+			return updated;
 		}
 
 		public boolean isIdle() {


[flink] 06/16: [FLINK-17903][core] WatermarkOutputMultiplexer supports String IDs and de-registration of outputs

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 26e43051d74a261746cfdea0ccde356ed7dfeb53
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 24 23:14:05 2020 +0200

    [FLINK-17903][core] WatermarkOutputMultiplexer supports String IDs and de-registration of outputs
---
 .../kafka/internals/AbstractFetcher.java           |  11 ++-
 .../eventtime/WatermarkOutputMultiplexer.java      |  35 ++++---
 .../eventtime/WatermarkOutputMultiplexerTest.java  | 101 +++++++++++++++++++--
 3 files changed, 119 insertions(+), 28 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 978cd97..1a10582 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -394,15 +394,18 @@ public abstract class AbstractFetcher<T, KPH> {
 
 			case WITH_WATERMARK_GENERATOR: {
 				for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
-					KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
+					final KafkaTopicPartition kafkaTopicPartition = partitionEntry.getKey();
+					KPH kafkaHandle = createKafkaPartitionHandle(kafkaTopicPartition);
 					WatermarkStrategy<T> deserializedWatermarkStrategy = watermarkStrategy.deserializeValue(
 							userCodeClassLoader);
 
-					int outputId = watermarkOutputMultiplexer.registerNewOutput();
+					// the format of the ID does not matter, as long as it is unique
+					final String partitionId = kafkaTopicPartition.getTopic() + '-' + kafkaTopicPartition.getPartition();
+					watermarkOutputMultiplexer.registerNewOutput(partitionId);
 					WatermarkOutput immediateOutput =
-							watermarkOutputMultiplexer.getImmediateOutput(outputId);
+							watermarkOutputMultiplexer.getImmediateOutput(partitionId);
 					WatermarkOutput deferredOutput =
-							watermarkOutputMultiplexer.getDeferredOutput(outputId);
+							watermarkOutputMultiplexer.getDeferredOutput(partitionId);
 
 					KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH> partitionState =
 							new KafkaTopicPartitionStateWithWatermarkGenerator<>(
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
index 83976e3..44e1e35 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
@@ -40,8 +40,8 @@ import static org.apache.flink.util.Preconditions.checkState;
  * #onPeriodicEmit()} is called will the deferred updates be combined and forwarded to the
  * underlying output.
  *
- * <p>For registering a new multiplexed output, you must first call {@link #registerNewOutput()}
- * and then call {@link #getImmediateOutput(int)} or {@link #getDeferredOutput(int)} with the output
+ * <p>For registering a new multiplexed output, you must first call {@link #registerNewOutput(String)}
+ * and then call {@link #getImmediateOutput(String)} or {@link #getDeferredOutput(String)} with the output
  * ID you get from that. You can get both an immediate and deferred output for a given output ID,
  * you can also call the getters multiple times.
  *
@@ -57,16 +57,13 @@ public class WatermarkOutputMultiplexer {
 	 */
 	private final WatermarkOutput underlyingOutput;
 
-	/** The id to use for the next registered output. */
-	private int nextOutputId = 0;
-
 	/** The combined watermark over the per-output watermarks. */
 	private long combinedWatermark = Long.MIN_VALUE;
 
 	/**
 	 * Map view, to allow finding them when requesting the {@link WatermarkOutput} for a given id.
 	 */
-	private final Map<Integer, OutputState> watermarkPerOutputId;
+	private final Map<String, OutputState> watermarkPerOutputId;
 
 	/**
 	 * List of all watermark outputs, for efficient access.
@@ -88,13 +85,23 @@ public class WatermarkOutputMultiplexer {
 	 * an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
 	 * output.
 	 */
-	public int registerNewOutput() {
-		int newOutputId = nextOutputId;
-		nextOutputId++;
-		OutputState outputState = new OutputState();
-		watermarkPerOutputId.put(newOutputId, outputState);
+	public void registerNewOutput(String id) {
+		final OutputState outputState = new OutputState();
+
+		final OutputState previouslyRegistered = watermarkPerOutputId.putIfAbsent(id, outputState);
+		checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);
+
 		watermarkOutputs.add(outputState);
-		return newOutputId;
+	}
+
+	public boolean unregisterOutput(String id) {
+		final OutputState output = watermarkPerOutputId.remove(id);
+		if (output != null) {
+			watermarkOutputs.remove(output);
+			return true;
+		} else {
+			return false;
+		}
 	}
 
 	/**
@@ -103,7 +110,7 @@ public class WatermarkOutputMultiplexer {
 	 * <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
 	 * outputs.
 	 */
-	public WatermarkOutput getImmediateOutput(int outputId) {
+	public WatermarkOutput getImmediateOutput(String outputId) {
 		Preconditions.checkArgument(
 				watermarkPerOutputId.containsKey(outputId),
 				"no output registered under id " + outputId);
@@ -118,7 +125,7 @@ public class WatermarkOutputMultiplexer {
 	 * <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
 	 * outputs.
 	 */
-	public WatermarkOutput getDeferredOutput(int outputId) {
+	public WatermarkOutput getDeferredOutput(String outputId) {
 		Preconditions.checkArgument(
 				watermarkPerOutputId.containsKey(outputId),
 				"no output registered under id " + outputId);
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
index 224bf48..59bb6df 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexerTest.java
@@ -20,10 +20,15 @@ package org.apache.flink.api.common.eventtime;
 
 import org.junit.Test;
 
+import java.util.UUID;
+
 import static org.apache.flink.api.common.eventtime.WatermarkMatchers.watermark;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the {@link WatermarkOutputMultiplexer}.
@@ -261,9 +266,10 @@ public class WatermarkOutputMultiplexerTest {
 		WatermarkOutputMultiplexer multiplexer =
 				new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
 
-		int outputId = multiplexer.registerNewOutput();
-		WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
-		WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
+		final String id = "test-id";
+		multiplexer.registerNewOutput(id);
+		WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
+		WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
 
 		deferredOutput.emitWatermark(new Watermark(5));
 		multiplexer.onPeriodicEmit();
@@ -284,9 +290,10 @@ public class WatermarkOutputMultiplexerTest {
 		WatermarkOutputMultiplexer multiplexer =
 				new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
 
-		int outputId = multiplexer.registerNewOutput();
-		WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
-		WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
+		final String id = "1234-test";
+		multiplexer.registerNewOutput(id);
+		WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
+		WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);
 
 		deferredOutput.emitWatermark(new Watermark(5));
 		immediateOutput.emitWatermark(new Watermark(2));
@@ -294,13 +301,86 @@ public class WatermarkOutputMultiplexerTest {
 		assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue()));
 	}
 
+	@Test
+	public void testRemoveUnblocksWatermarks() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+		final long lowTimestamp = 156765L;
+		final long highTimestamp = lowTimestamp + 10;
+
+		multiplexer.registerNewOutput("lower");
+		multiplexer.registerNewOutput("higher");
+		multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
+
+		multiplexer.unregisterOutput("lower");
+		multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
+
+		assertEquals(highTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
+	}
+
+	@Test
+	public void testRemoveOfLowestDoesNotImmediatelyAdvanceWatermark() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+		final long lowTimestamp = -4343L;
+		final long highTimestamp = lowTimestamp + 10;
+
+		multiplexer.registerNewOutput("lower");
+		multiplexer.registerNewOutput("higher");
+		multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
+		multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
+
+		multiplexer.unregisterOutput("lower");
+
+		assertEquals(lowTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
+	}
+
+	@Test
+	public void testRemoveOfHighestDoesNotRetractWatermark() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+		final long lowTimestamp = 1L;
+		final long highTimestamp = 2L;
+
+		multiplexer.registerNewOutput("higher");
+		multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
+		multiplexer.unregisterOutput("higher");
+
+		multiplexer.registerNewOutput("lower");
+		multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
+
+		assertEquals(highTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
+	}
+
+	@Test
+	public void testRemoveRegisteredReturnValue() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+		multiplexer.registerNewOutput("does-exist");
+
+		final boolean unregistered = multiplexer.unregisterOutput("does-exist");
+
+		assertTrue(unregistered);
+	}
+
+	@Test
+	public void testRemoveNotRegisteredReturnValue() {
+		final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
+		final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
+
+		final boolean unregistered = multiplexer.unregisterOutput("does-not-exist");
+
+		assertFalse(unregistered);
+	}
+
 	/**
 	 * Convenience method so we don't have to go through the output ID dance when we only want an
 	 * immediate output for a given output ID.
 	 */
 	private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer multiplexer) {
-		int outputId = multiplexer.registerNewOutput();
-		return multiplexer.getImmediateOutput(outputId);
+		final String id = UUID.randomUUID().toString();
+		multiplexer.registerNewOutput(id);
+		return multiplexer.getImmediateOutput(id);
 	}
 
 	/**
@@ -308,8 +388,9 @@ public class WatermarkOutputMultiplexerTest {
 	 * deferred output for a given output ID.
 	 */
 	private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer multiplexer) {
-		int outputId = multiplexer.registerNewOutput();
-		return multiplexer.getDeferredOutput(outputId);
+		final String id = UUID.randomUUID().toString();
+		multiplexer.registerNewOutput(id);
+		return multiplexer.getDeferredOutput(id);
 	}
 
 	private static TestingWatermarkOutput createTestingWatermarkOutput() {


[flink] 11/16: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae596d5338f8ff060f26641657239bacac2712b2
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 19 20:37:31 2020 +0200

    [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources
---
 .../base/source/reader/SourceReaderBase.java       |  43 +++-
 .../base/source/reader/SourceReaderTestBase.java   |  15 +-
 .../flink/api/connector/source/ReaderOutput.java   |  87 +++++++
 .../flink/api/connector/source/SourceReader.java   |   2 +-
 .../connector/source/mocks/MockSourceReader.java   |   4 +-
 .../streaming/api/operators/SourceOperator.java    |  60 ++++-
 .../api/operators/SourceOperatorFactory.java       |  21 +-
 .../source/BatchTimestampsAndWatermarks.java       | 126 ++++++++++
 .../source/SourceOutputWithWatermarks.java         | 179 ++++++++++++++
 .../source/StreamingTimestampsAndWatermarks.java   | 247 ++++++++++++++++++++
 .../operators/source/TimestampsAndWatermarks.java  |  92 ++++++++
 .../source/TimestampsAndWatermarksContext.java     |  46 ++++
 .../operators/source/WatermarkToDataOutput.java    |  86 +++++++
 .../api/operators/SourceOperatorTest.java          |  32 +--
 .../api/operators/source/CollectingDataOutput.java |  56 +++++
 .../source/OnEventTestWatermarkGenerator.java      |  34 +++
 .../source/OnPeriodicTestWatermarkGenerator.java   |  43 ++++
 .../source/SourceOperatorEventTimeTest.java        | 257 +++++++++++++++++++++
 .../source/SourceOutputWithWatermarksTest.java     |  85 +++++++
 .../operators/source/TestingSourceOperator.java    |  94 ++++++++
 .../source/WatermarkToDataOutputTest.java          |  77 ++++++
 .../tasks/SourceOperatorStreamTaskTest.java        |  10 +-
 22 files changed, 1637 insertions(+), 59 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index b5781de..1da08d1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -63,7 +64,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 
 	/** The state of the splits. */
-	private final Map<String, SplitStateT> splitStates;
+	private final Map<String, SplitContext<T, SplitStateT>> splitStates;
 
 	/** The record emitter to handle the records read by the SplitReaders. */
 	protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
@@ -111,7 +112,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	}
 
 	@Override
-	public InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception {
+	public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
 		splitFetcherManager.checkErrors();
 		// poll from the queue if the last element was successfully handled. Otherwise
 		// just pass the last element again.
@@ -133,14 +134,19 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 			// Process one record.
 			if (splitIter.hasNext()) {
 				// emit the record.
-				E record = splitIter.next();
-				recordEmitter.emitRecord(record, sourceOutput, splitStates.get(splitIter.currentSplitId()));
+				final E record = splitIter.next();
+				final SplitContext<T, SplitStateT> splitContext = splitStates.get(splitIter.currentSplitId());
+				final SourceOutput<T> splitOutput = splitContext.getOrCreateSplitOutput(output);
+				recordEmitter.emitRecord(record, splitOutput, splitContext.state);
 				LOG.trace("Emitted record: {}", record);
 			}
 			// Do some cleanup if the all the records in the current splitIter have been processed.
 			if (!splitIter.hasNext()) {
 				// First remove the state of the split.
-				splitIter.finishedSplitIds().forEach(splitStates::remove);
+				splitIter.finishedSplitIds().forEach((id) -> {
+					splitStates.remove(id);
+					output.releaseOutputForSplit(id);
+				});
 				// Handle the finished splits.
 				onSplitFinished(splitIter.finishedSplitIds());
 				// Prepare the return status based on the availability of the next element.
@@ -173,7 +179,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	@Override
 	public List<SplitT> snapshotState() {
 		List<SplitT> splits = new ArrayList<>();
-		splitStates.forEach((id, state) -> splits.add(toSplitType(id, state)));
+		splitStates.forEach((id, context) -> splits.add(toSplitType(id, context.state)));
 		return splits;
 	}
 
@@ -181,7 +187,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	public void addSplits(List<SplitT> splits) {
 		LOG.trace("Adding splits {}", splits);
 		// Initialize the state for each split.
-		splits.forEach(s -> splitStates.put(s.splitId(), initializedState(s)));
+		splits.forEach(s -> splitStates.put(s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
 		// Hand over the splits to the split fetcher to start fetch.
 		splitFetcherManager.addSplits(splits);
 	}
@@ -201,6 +207,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 		splitFetcherManager.close(options.sourceReaderCloseTimeout);
 	}
 
+
+
 	// -------------------- Abstract method to allow different implementations ------------------
 	/**
 	 * Handles the finished splits to clean the state if needed.
@@ -233,4 +241,25 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 			return InputStatus.NOTHING_AVAILABLE;
 		}
 	}
+
+	// ------------------ private helper classes ---------------------
+
+	private static final class SplitContext<T, SplitStateT> {
+
+		final String splitId;
+		final SplitStateT state;
+		SourceOutput<T> sourceOutput;
+
+		private SplitContext(String splitId, SplitStateT state) {
+			this.state = state;
+			this.splitId = splitId;
+		}
+
+		SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
+			if (sourceOutput == null) {
+				sourceOutput = mainOutput.createOutputForSplit(splitId);
+			}
+			return sourceOutput;
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index fcf44fa..5f58836 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
@@ -170,7 +171,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
 	/**
 	 * A source output that validates the output.
 	 */
-	protected static class ValidatingSourceOutput implements SourceOutput<Integer> {
+	protected static class ValidatingSourceOutput implements ReaderOutput<Integer> {
 		private Set<Integer> consumedValues = new HashSet<>();
 		private int max = Integer.MIN_VALUE;
 		private int min = Integer.MAX_VALUE;
@@ -204,13 +205,17 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
 		}
 
 		@Override
-		public void emitWatermark(Watermark watermark) {
-
-		}
+		public void emitWatermark(Watermark watermark) {}
 
 		@Override
-		public void markIdle() {
+		public void markIdle() {}
 
+		@Override
+		public SourceOutput<Integer> createOutputForSplit(String splitId) {
+			return this;
 		}
+
+		@Override
+		public void releaseOutputForSplit(String splitId) {}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
new file mode 100644
index 0000000..1774a7c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
@@ -0,0 +1,87 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+/**
+ * The interface provided by Flink task to the {@link SourceReader} to emit records
+ * to downstream operators for message processing.
+ */
+@PublicEvolving
+public interface ReaderOutput<T> extends SourceOutput<T> {
+
+	/**
+	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 *
+	 * @param record the record to emit.
+	 */
+	@Override
+	void collect(T record);
+
+	/**
+	 * Emit a record with timestamp.
+	 *
+	 * @param record the record to emit.
+	 * @param timestamp the timestamp of the record.
+	 */
+	@Override
+	void collect(T record, long timestamp);
+
+	/**
+	 * Emits the given watermark.
+	 *
+	 * <p>Emitting a watermark also implicitly marks the stream as <i>active</i>, ending
+	 * previously marked idleness.
+	 */
+	@Override
+	void emitWatermark(Watermark watermark);
+
+	/**
+	 * Marks this output as idle, meaning that downstream operations do not
+	 * wait for watermarks from this output.
+	 *
+	 * <p>An output becomes active again as soon as the next watermark is emitted.
+	 */
+	@Override
+	void markIdle();
+
+	/**
+	 * Creates a {@code SourceOutput} for a specific Source Split. Use these outputs if you want to
+	 * run split-local logic, like watermark generation.
+	 *
+	 * <p>If a split-local output was already created for this split-ID, the method will return that instance,
+	 * so that only one split-local output exists per split-ID.
+	 *
+	 * <p><b>IMPORTANT:</b> After the split has been finished, it is crucial to release the created
+	 * output again. Otherwise it will continue to contribute to the watermark generation like a
+	 * perpetually stalling source split, and may hold back the watermark indefinitely.
+	 *
+	 * @see #releaseOutputForSplit(String)
+	 */
+	SourceOutput<T> createOutputForSplit(String splitId);
+
+	/**
+	 * Releases the {@code SourceOutput} created for the split with the given ID.
+	 *
+	 * @see #createOutputForSplit(String)
+	 */
+	void releaseOutputForSplit(String splitId);
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index 6a26d70..b09a724 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -51,7 +51,7 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
 	 *
 	 * @return The InputStatus of the SourceReader after the method invocation.
 	 */
-	InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception;
+	InputStatus pollNext(ReaderOutput<T> output) throws Exception;
 
 	/**
 	 * Checkpoint on the state of the source.
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index 9a7a327..0bdcdec 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.connector.source.mocks;
 
+import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.core.io.InputStatus;
 
@@ -53,7 +53,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit>
 	}
 
 	@Override
-	public InputStatus pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
+	public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception {
 		boolean finished = true;
 		currentSplitIndex = 0;
 		// Find first splits with available records.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 5d4b2d6..9ac80d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -19,11 +19,12 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
@@ -38,8 +39,10 @@ import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
@@ -81,30 +84,47 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	/** The event gateway through which this operator talks to its coordinator. */
 	private final OperatorEventGateway operatorEventGateway;
 
-	// ---- lazily initialized fields ----
+	/** The factory for timestamps and watermark generators. */
+	private final WatermarkStrategy<OUT> watermarkStrategy;
+
+	// ---- lazily initialized fields (these fields are the "hot" fields) ----
 
 	/** The source reader that does most of the work. */
 	private SourceReader<OUT, SplitT> sourceReader;
 
+	private ReaderOutput<OUT> currentMainOutput;
+
+	private DataOutput<OUT> lastInvokedOutput;
+
 	/** The state that holds the currently assigned splits. */
 	private ListState<SplitT> readerState;
 
+	/** The event time and watermarking logic. Ideally this would be eagerly passed into this operator,
+	 * but we currently need to instantiate this lazily, because the metric groups exist only later. */
+	private TimestampsAndWatermarks<OUT> eventTimeLogic;
+
 	public SourceOperator(
 			Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory,
 			OperatorEventGateway operatorEventGateway,
-			SimpleVersionedSerializer<SplitT> splitSerializer) {
+			SimpleVersionedSerializer<SplitT> splitSerializer,
+			WatermarkStrategy<OUT> watermarkStrategy,
+			ProcessingTimeService timeService) {
 
 		this.readerFactory = checkNotNull(readerFactory);
 		this.operatorEventGateway = checkNotNull(operatorEventGateway);
 		this.splitSerializer = checkNotNull(splitSerializer);
+		this.watermarkStrategy = checkNotNull(watermarkStrategy);
+		this.processingTimeService = timeService;
 	}
 
 	@Override
 	public void open() throws Exception {
+		final MetricGroup metricGroup = getMetricGroup();
+
 		final SourceReaderContext context = new SourceReaderContext() {
 			@Override
 			public MetricGroup metricGroup() {
-				return getRuntimeContext().getMetricGroup();
+				return metricGroup;
 			}
 
 			@Override
@@ -113,6 +133,15 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 			}
 		};
 
+		// in the future when we support both batch and streaming modes for the source operator,
+		// and when this one is migrated to the "eager initialization" operator (StreamOperatorV2),
+		// then we should evaluate this during operator construction.
+		eventTimeLogic = TimestampsAndWatermarks.createStreamingEventTimeLogic(
+				watermarkStrategy,
+				metricGroup,
+				getProcessingTimeService(),
+				getExecutionConfig().getAutoWatermarkInterval());
+
 		sourceReader = readerFactory.apply(context);
 
 		// restore the state if necessary.
@@ -125,12 +154,31 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 		sourceReader.start();
 		// Register the reader to the coordinator.
 		registerReader();
+
+		eventTimeLogic.startPeriodicWatermarkEmits();
+	}
+
+	@Override
+	public void close() throws Exception {
+		eventTimeLogic.stopPeriodicWatermarkEmits();
+		super.close();
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
-		return sourceReader.pollNext((SourceOutput<OUT>) output);
+		// guarding an assumptions we currently make due to the fact that certain classes
+		// assume a constant output
+		assert lastInvokedOutput == output || lastInvokedOutput == null;
+
+		// short circuit the common case (every invocation except the first)
+		if (currentMainOutput != null) {
+			return sourceReader.pollNext(currentMainOutput);
+		}
+
+		// this creates a batch or streaming output based on the runtime mode
+		currentMainOutput = eventTimeLogic.createMainOutput(output);
+		lastInvokedOutput = output;
+		return sourceReader.pollNext(currentMainOutput);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index c30a0a7..02c7927 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -27,6 +28,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
+import org.apache.flink.streaming.api.operators.source.NoOpWatermarkGenerator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
 
 import java.util.function.Function;
 
@@ -34,13 +38,16 @@ import java.util.function.Function;
  * The Factory class for {@link SourceOperator}.
  */
 public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT>
-		implements CoordinatedOperatorFactory<OUT> {
+		implements CoordinatedOperatorFactory<OUT>, ProcessingTimeServiceAware {
 
 	private static final long serialVersionUID = 1L;
 
 	/** The {@link Source} to create the {@link SourceOperator}. */
 	private final Source<OUT, ?, ?> source;
 
+	/** The event time setup (timestamp assigners, watermark generators, etc.). */
+	private final WatermarkStrategy<OUT> watermarkStrategy = (ctx) -> new NoOpWatermarkGenerator<>();
+
 	/** The number of worker thread for the source coordinator. */
 	private final int numCoordinatorWorkerThread;
 
@@ -61,7 +68,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 		final SourceOperator<OUT, ?> sourceOperator = instantiateSourceOperator(
 				source::createReader,
 				gateway,
-				source.getSplitSerializer());
+				source.getSplitSerializer(),
+				watermarkStrategy,
+				parameters.getProcessingTimeService());
 
 		sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
 		parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
@@ -99,7 +108,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(
 			Function<SourceReaderContext, SourceReader<T, ?>> readerFactory,
 			OperatorEventGateway eventGateway,
-			SimpleVersionedSerializer<?> splitSerializer) {
+			SimpleVersionedSerializer<?> splitSerializer,
+			WatermarkStrategy<T> watermarkStrategy,
+			ProcessingTimeService timeService) {
 
 		// jumping through generics hoops: cast the generics away to then cast them back more strictly typed
 		final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
@@ -110,6 +121,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 		return new SourceOperator<>(
 				typedReaderFactory,
 				eventGateway,
-				typedSplitSerializer);
+				typedSplitSerializer,
+				watermarkStrategy,
+				timeService);
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
new file mode 100644
index 0000000..135cad9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of {@link TimestampsAndWatermarks} to be used during batch execution of a
+ * program. Batch execution has no watermarks, so all watermark related operations in this
+ * implementation are no-ops.
+ *
+ * @param <T> The type of the emitted records.
+ */
+@Internal
+public class BatchTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T> {
+
+	private final TimestampAssigner<T> timestamps;
+
+	/**
+	 * Creates a new BatchTimestampsAndWatermarks with the given TimestampAssigner.
+	 */
+	public BatchTimestampsAndWatermarks(TimestampAssigner<T> timestamps) {
+		this.timestamps = checkNotNull(timestamps);
+	}
+
+	@Override
+	public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output) {
+		checkNotNull(output);
+		return new TimestampsOnlyOutput<>(output, timestamps);
+	}
+
+	@Override
+	public void startPeriodicWatermarkEmits() {
+		// no periodic watermarks in batch processing
+	}
+
+	@Override
+	public void stopPeriodicWatermarkEmits() {
+		// no periodic watermarks in batch processing
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A simple implementation of {@link SourceOutput} and {@link ReaderOutput} that extracts
+	 * timestamps but has no watermarking logic. Because only watermarking logic has state per
+	 * Source Split, the same instance gets shared across all Source Splits.
+	 *
+	 * @param <T> The type of the emitted records.
+	 */
+	private static final class TimestampsOnlyOutput<T> implements ReaderOutput<T> {
+
+		private final PushingAsyncDataInput.DataOutput<T> output;
+		private final TimestampAssigner<T> timestampAssigner;
+
+		private TimestampsOnlyOutput(
+				PushingAsyncDataInput.DataOutput<T> output,
+				TimestampAssigner<T> timestampAssigner) {
+
+			this.output = output;
+			this.timestampAssigner = timestampAssigner;
+		}
+
+		@Override
+		public void collect(T record) {
+			collect(record, TimestampAssigner.NO_TIMESTAMP);
+		}
+
+		@Override
+		public void collect(T record, long timestamp) {
+			try {
+				output.emitRecord(new StreamRecord<>(record, timestampAssigner.extractTimestamp(record, timestamp)));
+			} catch (ExceptionInChainedOperatorException e) {
+				throw e;
+			} catch (Exception e) {
+				throw new ExceptionInChainedOperatorException(e);
+			}
+		}
+
+		@Override
+		public void emitWatermark(Watermark watermark) {
+			// do nothing, this does not forward any watermarks manually emitted by the source directly
+		}
+
+		@Override
+		public void markIdle() {
+			// do nothing, because without watermarks there is no idleness
+		}
+
+		@Override
+		public SourceOutput<T> createOutputForSplit(String splitId) {
+			// we don't need per-partition instances, because we do not generate watermarks
+			return this;
+		}
+
+		@Override
+		public void releaseOutputForSplit(String splitId) {
+			// nothing to release, because we do not create per-partition instances
+		}
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
new file mode 100644
index 0000000..3a8396e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the SourceOutput. The records emitted to this output are pushed into a given
+ * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into the same output, or
+ * into a separate {@link WatermarkOutput}, if one is provided.
+ *
+ * <h2>Periodic Watermarks</h2>
+ *
+ * <p>This output does not implement automatic periodic watermark emission. The
+ * method {@link SourceOutputWithWatermarks#emitPeriodicWatermark()} needs to be called periodically.
+ *
+ * <h2>Note on Performance Considerations</h2>
+ *
+ * <p>The methods {@link SourceOutput#collect(Object)} and {@link SourceOutput#collect(Object, long)}
+ * are highly performance-critical (part of the hot loop). To make the code as JIT friendly as possible,
+ * we want to have only a single implementation of these two methods, across all classes.
+ * That way, the JIT compiler can de-virtualize (and inline) them better.
+ *
+ * <p>Currently, we have one implementation of these methods in the batch case (see class
+ * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this class). When the JVM
+ * is dedicated to a single job (or type of job) only one of these classes will be loaded. In mixed
+ * job setups, we still have a bimorphic method (rather than a poly/-/mega-morphic method).
+ *
+ * @param <T> The type of emitted records.
+ */
+@Internal
+public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
+
+	private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
+
+	private final TimestampAssigner<T> timestampAssigner;
+
+	private final WatermarkGenerator<T> watermarkGenerator;
+
+	private final WatermarkOutput onEventWatermarkOutput;
+
+	private final WatermarkOutput periodicWatermarkOutput;
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.
+	 */
+	protected SourceOutputWithWatermarks(
+			PushingAsyncDataInput.DataOutput<T> recordsOutput,
+			WatermarkOutput onEventWatermarkOutput,
+			WatermarkOutput periodicWatermarkOutput,
+			TimestampAssigner<T> timestampAssigner,
+			WatermarkGenerator<T> watermarkGenerator) {
+
+		this.recordsOutput = checkNotNull(recordsOutput);
+		this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput);
+		this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput);
+		this.timestampAssigner = checkNotNull(timestampAssigner);
+		this.watermarkGenerator = checkNotNull(watermarkGenerator);
+	}
+
+	// ------------------------------------------------------------------------
+	// SourceOutput Methods
+	//
+	// Note that the two methods below are final, as a partial enforcement
+	// of the performance design goal mentioned in the class-level comment.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void collect(T record) {
+		collect(record, TimestampAssigner.NO_TIMESTAMP);
+	}
+
+	@Override
+	public final void collect(T record, long timestamp) {
+		try {
+			final long assignedTimestamp = timestampAssigner.extractTimestamp(record, timestamp);
+
+			// IMPORTANT: The event must be emitted before the watermark generator is called.
+			recordsOutput.emitRecord(new StreamRecord<>(record, assignedTimestamp));
+			watermarkGenerator.onEvent(record, assignedTimestamp, onEventWatermarkOutput);
+		} catch (ExceptionInChainedOperatorException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExceptionInChainedOperatorException(e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// WatermarkOutput Methods
+	//
+	// These two methods are final as well, to enforce the contract that the
+	// watermarks from emitWatermark(Watermark) go to the same output as the
+	// watermarks from the watermarkGenerator.onEvent(...) calls in the collect(...)
+	// methods.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void emitWatermark(Watermark watermark) {
+		onEventWatermarkOutput.emitWatermark(watermark);
+	}
+
+	@Override
+	public final void markIdle() {
+		onEventWatermarkOutput.markIdle();
+	}
+
+	public final void emitPeriodicWatermark() {
+		watermarkGenerator.onPeriodicEmit(periodicWatermarkOutput);
+	}
+
+	// ------------------------------------------------------------------------
+	// Factories
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.
+	 */
+	public static <E> SourceOutputWithWatermarks<E> createWithSameOutputs(
+			PushingAsyncDataInput.DataOutput<E> recordsAndWatermarksOutput,
+			TimestampAssigner<E> timestampAssigner,
+			WatermarkGenerator<E> watermarkGenerator) {
+
+		final WatermarkOutput watermarkOutput = new WatermarkToDataOutput(recordsAndWatermarksOutput);
+
+		return new SourceOutputWithWatermarks<>(
+				recordsAndWatermarksOutput,
+				watermarkOutput,
+				watermarkOutput,
+				timestampAssigner,
+				watermarkGenerator);
+	}
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the different WatermarkOutputs.
+	 */
+	public static <E> SourceOutputWithWatermarks<E> createWithSeparateOutputs(
+		PushingAsyncDataInput.DataOutput<E> recordsOutput,
+		WatermarkOutput onEventWatermarkOutput,
+		WatermarkOutput periodicWatermarkOutput,
+		TimestampAssigner<E> timestampAssigner,
+		WatermarkGenerator<E> watermarkGenerator) {
+
+		return new SourceOutputWithWatermarks<>(
+			recordsOutput,
+			onEventWatermarkOutput,
+			periodicWatermarkOutput,
+			timestampAssigner,
+			watermarkGenerator);
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java
new file mode 100644
index 0000000..af51f414
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of timestamp extraction and watermark generation logic for streaming sources.
+ *
+ * @param <T> The type of the emitted records.
+ */
+@Internal
+public class StreamingTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T> {
+
+	private final TimestampAssigner<T> timestampAssigner;
+
+	private final WatermarkGeneratorSupplier<T> watermarksFactory;
+
+	private final WatermarkGeneratorSupplier.Context watermarksContext;
+
+	private final ProcessingTimeService timeService;
+
+	private final long periodicWatermarkInterval;
+
+	@Nullable
+	private SplitLocalOutputs<T> currentPerSplitOutputs;
+
+	@Nullable
+	private StreamingReaderOutput<T> currentMainOutput;
+
+	@Nullable
+	private ScheduledFuture<?> periodicEmitHandle;
+
+	public StreamingTimestampsAndWatermarks(
+			TimestampAssigner<T> timestampAssigner,
+			WatermarkGeneratorSupplier<T> watermarksFactory,
+			WatermarkGeneratorSupplier.Context watermarksContext,
+			ProcessingTimeService timeService,
+			Duration periodicWatermarkInterval) {
+
+		this.timestampAssigner = timestampAssigner;
+		this.watermarksFactory = watermarksFactory;
+		this.watermarksContext = watermarksContext;
+		this.timeService = timeService;
+
+		long periodicWatermarkIntervalMillis;
+		try {
+			periodicWatermarkIntervalMillis = periodicWatermarkInterval.toMillis();
+		} catch (ArithmeticException ignored) {
+			// long integer overflow
+			periodicWatermarkIntervalMillis = Long.MAX_VALUE;
+		}
+		this.periodicWatermarkInterval = periodicWatermarkIntervalMillis;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output) {
+		// At the moment, we assume only one output is ever created!
+		// This assumption is strict, currently, because many of the classes in this implementation do not
+		// support re-assigning the underlying output
+		checkState(currentMainOutput == null && currentPerSplitOutputs == null, "already created a main output");
+
+		final WatermarkOutput watermarkOutput = new WatermarkToDataOutput(output);
+		final WatermarkGenerator<T> watermarkGenerator = watermarksFactory.createWatermarkGenerator(watermarksContext);
+
+		currentPerSplitOutputs = new SplitLocalOutputs<>(
+				output,
+				watermarkOutput,
+				timestampAssigner,
+				watermarksFactory,
+				watermarksContext);
+
+		currentMainOutput = new StreamingReaderOutput<>(
+				output,
+				watermarkOutput,
+				timestampAssigner,
+				watermarkGenerator,
+				currentPerSplitOutputs);
+
+		return currentMainOutput;
+	}
+
+	@Override
+	public void startPeriodicWatermarkEmits() {
+		checkState(periodicEmitHandle == null, "periodic emitter already started");
+
+		if (periodicWatermarkInterval == 0) {
+			// a value of zero means not activated
+			return;
+		}
+
+		periodicEmitHandle = timeService.scheduleWithFixedDelay(
+				this::triggerPeriodicEmit,
+				periodicWatermarkInterval,
+				periodicWatermarkInterval);
+	}
+
+	@Override
+	public void stopPeriodicWatermarkEmits() {
+		if (periodicEmitHandle != null) {
+			periodicEmitHandle.cancel(false);
+			periodicEmitHandle = null;
+		}
+	}
+
+	void triggerPeriodicEmit(@SuppressWarnings("unused") long wallClockTimestamp) {
+		if (currentPerSplitOutputs != null) {
+			currentPerSplitOutputs.emitPeriodicWatermark();
+		}
+		if (currentMainOutput != null) {
+			currentMainOutput.emitPeriodicWatermark();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class StreamingReaderOutput<T> extends SourceOutputWithWatermarks<T> implements ReaderOutput<T> {
+
+		private final SplitLocalOutputs<T> splitLocalOutputs;
+
+		StreamingReaderOutput(
+				PushingAsyncDataInput.DataOutput<T> output,
+				WatermarkOutput watermarkOutput,
+				TimestampAssigner<T> timestampAssigner,
+				WatermarkGenerator<T> watermarkGenerator,
+				SplitLocalOutputs<T> splitLocalOutputs) {
+
+			super(output, watermarkOutput, watermarkOutput, timestampAssigner, watermarkGenerator);
+			this.splitLocalOutputs = splitLocalOutputs;
+		}
+
+		@Override
+		public SourceOutput<T> createOutputForSplit(String splitId) {
+			return splitLocalOutputs.createOutputForSplit(splitId);
+		}
+
+		@Override
+		public void releaseOutputForSplit(String splitId) {
+			splitLocalOutputs.releaseOutputForSplit(splitId);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A holder and factory for split-local {@link SourceOutput}s. The split-local outputs maintain
+	 * local watermark generators with their own state, to facilitate per-split watermarking logic.
+	 *
+	 * @param <T> The type of the emitted records.
+	 */
+	private static final class SplitLocalOutputs<T> {
+
+		private final WatermarkOutputMultiplexer watermarkMultiplexer;
+		private final Map<String, SourceOutputWithWatermarks<T>> localOutputs;
+		private final PushingAsyncDataInput.DataOutput<T> recordOutput;
+		private final TimestampAssigner<T> timestampAssigner;
+		private final WatermarkGeneratorSupplier<T> watermarksFactory;
+		private final WatermarkGeneratorSupplier.Context watermarkContext;
+
+		private SplitLocalOutputs(
+				PushingAsyncDataInput.DataOutput<T> recordOutput,
+				WatermarkOutput watermarkOutput,
+				TimestampAssigner<T> timestampAssigner,
+				WatermarkGeneratorSupplier<T> watermarksFactory,
+				WatermarkGeneratorSupplier.Context watermarkContext) {
+
+			this.recordOutput = recordOutput;
+			this.timestampAssigner = timestampAssigner;
+			this.watermarksFactory = watermarksFactory;
+			this.watermarkContext = watermarkContext;
+
+			this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
+			this.localOutputs = new LinkedHashMap<>(); // we use a LinkedHashMap because it iterates faster
+		}
+
+		SourceOutput<T> createOutputForSplit(String splitId) {
+			final SourceOutputWithWatermarks<T> previous = localOutputs.get(splitId);
+			if (previous != null) {
+				return previous;
+			}
+
+			watermarkMultiplexer.registerNewOutput(splitId);
+			final WatermarkOutput onEventOutput = watermarkMultiplexer.getImmediateOutput(splitId);
+			final WatermarkOutput periodicOutput = watermarkMultiplexer.getDeferredOutput(splitId);
+
+			final WatermarkGenerator<T> watermarks = watermarksFactory.createWatermarkGenerator(watermarkContext);
+
+			final SourceOutputWithWatermarks<T> localOutput = SourceOutputWithWatermarks.createWithSeparateOutputs(
+					recordOutput, onEventOutput, periodicOutput, timestampAssigner, watermarks);
+
+			localOutputs.put(splitId, localOutput);
+			return localOutput;
+		}
+
+		void releaseOutputForSplit(String splitId) {
+			localOutputs.remove(splitId);
+			watermarkMultiplexer.unregisterOutput(splitId);
+		}
+
+		void emitPeriodicWatermark() {
+			// The call in the loop only records the next watermark candidate for each local output.
+			// The call to 'watermarkMultiplexer.onPeriodicEmit()' actually merges the watermarks.
+			// That way, we save inefficient repeated merging of (partially outdated) watermarks before
+			// all local generators have emitted their candidates.
+			for (SourceOutputWithWatermarks<?> output : localOutputs.values()) {
+				output.emitPeriodicWatermark();
+			}
+			watermarkMultiplexer.onPeriodicEmit();
+		}
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
new file mode 100644
index 0000000..f95ec55
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.time.Duration;
+
+/**
+ * Basic interface for the timestamp extraction and watermark generation logic for the
+ * {@link org.apache.flink.api.connector.source.SourceReader}.
+ *
+ * <p>Implementations of this class may or may not actually perform certain tasks, like watermark
+ * generation. For example, the batch-oriented implementation typically skips all watermark generation
+ * logic.
+ *
+ * @param <T> The type of the emitted records.
+ */
+@Internal
+public interface TimestampsAndWatermarks<T> {
+
+	/**
+	 * Creates the ReaderOutput for the source reader, than internally runs the timestamp extraction and
+	 * watermark generation.
+	 */
+	ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output);
+
+	/**
+	 * Starts emitting periodic watermarks, if this implementation produces watermarks, and if
+	 * periodic watermarks are configured.
+	 *
+	 * <p>Periodic watermarks are produced by periodically calling the
+	 * {@link org.apache.flink.api.common.eventtime.WatermarkGenerator#onPeriodicEmit(WatermarkOutput)} method
+	 * of the underlying Watermark Generators.
+	 */
+	void startPeriodicWatermarkEmits();
+
+	/**
+	 * Stops emitting periodic watermarks.
+	 */
+	void stopPeriodicWatermarkEmits();
+
+	// ------------------------------------------------------------------------
+	//  factories
+	// ------------------------------------------------------------------------
+
+	static <E> TimestampsAndWatermarks<E> createStreamingEventTimeLogic(
+			WatermarkStrategy<E> watermarkStrategy,
+			MetricGroup metrics,
+			ProcessingTimeService timeService,
+			long periodicWatermarkIntervalMillis) {
+
+		final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics);
+		final TimestampAssigner<E> timestampAssigner = watermarkStrategy.createTimestampAssigner(context);
+
+		return new StreamingTimestampsAndWatermarks<>(
+			timestampAssigner, watermarkStrategy, context, timeService, Duration.ofMillis(periodicWatermarkIntervalMillis));
+	}
+
+	static <E> TimestampsAndWatermarks<E> createBatchEventTimeLogic(
+			WatermarkStrategy<E> watermarkStrategy,
+			MetricGroup metrics) {
+
+		final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics);
+		final TimestampAssigner<E> timestampAssigner = watermarkStrategy.createTimestampAssigner(context);
+
+		return new BatchTimestampsAndWatermarks<>(timestampAssigner);
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
new file mode 100644
index 0000000..201b901
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple implementation of a context that is both {@link TimestampAssignerSupplier.Context}
+ * and {@link WatermarkGeneratorSupplier.Context}.
+ */
+@Internal
+public final class TimestampsAndWatermarksContext
+		implements TimestampAssignerSupplier.Context, WatermarkGeneratorSupplier.Context {
+
+	private final MetricGroup metricGroup;
+
+	public TimestampsAndWatermarksContext(MetricGroup metricGroup) {
+		this.metricGroup = checkNotNull(metricGroup);
+	}
+
+	@Override
+	public MetricGroup getMetricGroup() {
+		return metricGroup;
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
new file mode 100644
index 0000000..3e3b255
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An adapter that exposes a {@link WatermarkOutput} based on a {@link PushingAsyncDataInput.DataOutput}.
+ */
+@Internal
+public final class WatermarkToDataOutput implements WatermarkOutput {
+
+	private final PushingAsyncDataInput.DataOutput<?> output;
+	private long maxWatermarkSoFar;
+	private boolean isIdle;
+
+	/**
+	 * Creates a new WatermarkOutput against the given DataOutput.
+	 */
+	public WatermarkToDataOutput(PushingAsyncDataInput.DataOutput<?> output) {
+		this.output = checkNotNull(output);
+		this.maxWatermarkSoFar = Long.MIN_VALUE;
+	}
+
+	@Override
+	public void emitWatermark(Watermark watermark) {
+		final long newWatermark = watermark.getTimestamp();
+		if (newWatermark <= maxWatermarkSoFar) {
+			return;
+		}
+
+		maxWatermarkSoFar = newWatermark;
+
+		try {
+			if (isIdle) {
+				output.emitStreamStatus(StreamStatus.ACTIVE);
+				isIdle = false;
+			}
+
+			output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(newWatermark));
+		} catch (ExceptionInChainedOperatorException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExceptionInChainedOperatorException(e);
+		}
+	}
+
+	@Override
+	public void markIdle() {
+		if (isIdle) {
+			return;
+		}
+
+		try {
+			output.emitStreamStatus(StreamStatus.IDLE);
+			isIdle = true;
+		} catch (ExceptionInChainedOperatorException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExceptionInChainedOperatorException(e);
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 34b7962..93ced72 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
@@ -28,7 +27,6 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -39,7 +37,7 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.Before;
@@ -165,32 +163,4 @@ public class SourceOperatorTest {
 		return abstractStateBackend.createOperatorStateBackend(
 				env, "test-operator", Collections.emptyList(), cancelStreamRegistry);
 	}
-
-	// -------------- Testing SourceOperator class -------------
-
-	/**
-	 * A testing class that overrides the getRuntimeContext() Method.
-	 */
-	private static class TestingSourceOperator<OUT> extends SourceOperator<OUT, MockSourceSplit> {
-
-		private final int subtaskIndex;
-
-		TestingSourceOperator(
-				SourceReader<OUT, MockSourceSplit> reader,
-				OperatorEventGateway eventGateway,
-				int subtaskIndex) {
-
-			super(
-					(context) -> reader,
-					eventGateway,
-					new MockSourceSplitSerializer());
-
-			this.subtaskIndex = subtaskIndex;
-		}
-
-		@Override
-		public StreamingRuntimeContext getRuntimeContext() {
-			return new MockStreamingRuntimeContext(false, 5, subtaskIndex);
-		}
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
new file mode 100644
index 0000000..a536553
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A test utility implementation of {@link PushingAsyncDataInput.DataOutput} that collects all events.
+ */
+final class CollectingDataOutput<E> implements PushingAsyncDataInput.DataOutput<E> {
+
+	final List<Object> events = new ArrayList<>();
+
+	@Override
+	public void emitWatermark(Watermark watermark) throws Exception {
+		events.add(watermark);
+	}
+
+	@Override
+	public void emitStreamStatus(StreamStatus streamStatus) throws Exception {
+		events.add(streamStatus);
+	}
+
+	@Override
+	public void emitRecord(StreamRecord<E> streamRecord) throws Exception {
+		events.add(streamRecord);
+	}
+
+	@Override
+	public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+		events.add(latencyMarker);
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java
new file mode 100644
index 0000000..55141c1
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+
+final class OnEventTestWatermarkGenerator<T> implements WatermarkGenerator<T> {
+
+	@Override
+	public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
+		output.emitWatermark(new Watermark(eventTimestamp));
+	}
+
+	@Override
+	public void onPeriodicEmit(WatermarkOutput output) {}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java
new file mode 100644
index 0000000..1342457
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+
+import javax.annotation.Nullable;
+
+final class OnPeriodicTestWatermarkGenerator<T> implements WatermarkGenerator<T> {
+
+	@Nullable
+	private Long lastTimestamp;
+
+	@Override
+	public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
+		lastTimestamp = eventTimestamp;
+	}
+
+	@Override
+	public void onPeriodicEmit(WatermarkOutput output) {
+		if (lastTimestamp != null) {
+			output.emitWatermark(new Watermark(lastTimestamp));
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
new file mode 100644
index 0000000..347975c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests that validate correct handling of watermark generation in the {@link ReaderOutput} as created
+ * by the {@link StreamingTimestampsAndWatermarks}.
+ */
+public class SourceOperatorEventTimeTest {
+
+	@Test
+	public void testMainOutputPeriodicWatermarks() throws Exception {
+		final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+			.<Integer>forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>())
+			.build();
+
+		final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+			(output) -> output.collect(0, 100L),
+			(output) -> output.collect(0, 120L),
+			(output) -> output.collect(0, 110L)
+		);
+
+		assertThat(result, contains(
+			new Watermark(100L),
+			new Watermark(120L)
+		));
+	}
+
+	@Test
+	public void testMainOutputEventWatermarks() throws Exception {
+		final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+			.<Integer>forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>())
+			.build();
+
+		final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+			(output) -> output.collect(0, 100L),
+			(output) -> output.collect(0, 120L),
+			(output) -> output.collect(0, 110L)
+		);
+
+		assertThat(result, contains(
+			new Watermark(100L),
+			new Watermark(120L)
+		));
+	}
+
+	@Test
+	public void testPerSplitOutputPeriodicWatermarks() throws Exception {
+		final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+			.<Integer>forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>())
+			.build();
+
+		final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+			(output) -> {
+				output.createOutputForSplit("A");
+				output.createOutputForSplit("B");
+			},
+			(output) -> output.createOutputForSplit("A").collect(0, 100L),
+			(output) -> output.createOutputForSplit("B").collect(0, 200L),
+			(output) -> output.createOutputForSplit("A").collect(0, 150L),
+			(output) -> output.releaseOutputForSplit("A"),
+			(output) -> output.createOutputForSplit("B").collect(0, 200L)
+		);
+
+		assertThat(result, contains(
+			new Watermark(100L),
+			new Watermark(150L),
+			new Watermark(200L)
+		));
+	}
+
+	@Test
+	public void testPerSplitOutputEventWatermarks() throws Exception {
+		final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+				.<Integer>forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>())
+				.build();
+
+		final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+			(output) -> {
+				output.createOutputForSplit("one");
+				output.createOutputForSplit("two");
+			},
+			(output) -> output.createOutputForSplit("one").collect(0, 100L),
+			(output) -> output.createOutputForSplit("two").collect(0, 200L),
+			(output) -> output.createOutputForSplit("one").collect(0, 150L),
+			(output) -> output.releaseOutputForSplit("one"),
+			(output) -> output.createOutputForSplit("two").collect(0, 200L)
+		);
+
+		assertThat(result, contains(
+			new Watermark(100L),
+			new Watermark(150L),
+			new Watermark(200L)
+		));
+	}
+
+	// ------------------------------------------------------------------------
+	//   test execution helpers
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("FinalPrivateMethod")
+	@SafeVarargs
+	private final List<Watermark> testSequenceOfWatermarks(
+			final WatermarkStrategy<Integer> watermarkStrategy,
+			final Consumer<ReaderOutput<Integer>>... actions) throws Exception {
+
+		final List<Object> allEvents = testSequenceOfEvents(watermarkStrategy, actions);
+
+		return allEvents.stream()
+				.filter((evt) -> evt instanceof org.apache.flink.streaming.api.watermark.Watermark)
+				.map((evt) -> new Watermark(((org.apache.flink.streaming.api.watermark.Watermark) evt).getTimestamp()))
+				.collect(Collectors.toList());
+	}
+
+	@SuppressWarnings("FinalPrivateMethod")
+	@SafeVarargs
+	private final List<Object> testSequenceOfEvents(
+			WatermarkStrategy<Integer> watermarkStrategy,
+			final Consumer<ReaderOutput<Integer>>... actions) throws Exception {
+
+		final CollectingDataOutput<Integer> out = new CollectingDataOutput<>();
+
+		final TestProcessingTimeService timeService = new TestProcessingTimeService();
+		timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that is not zero
+
+		final SourceReader<Integer, MockSourceSplit> reader = new InterpretingSourceReader(actions);
+
+		final SourceOperator<Integer, MockSourceSplit> sourceOperator =
+				createTestOperator(reader, watermarkStrategy, timeService);
+
+		while (sourceOperator.emitNext(out) != InputStatus.END_OF_INPUT) {
+			timeService.setCurrentTime(timeService.getCurrentProcessingTime() + 100);
+		}
+
+		return out.events;
+	}
+
+	// ------------------------------------------------------------------------
+	//   test setup helpers
+	// ------------------------------------------------------------------------
+
+	private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
+			SourceReader<T, MockSourceSplit> reader,
+			WatermarkStrategy<T> watermarkStrategy,
+			ProcessingTimeService timeService) throws Exception {
+
+		final OperatorStateStore operatorStateStore =
+				new MemoryStateBackend().createOperatorStateBackend(
+						new MockEnvironmentBuilder().build(),
+						"test-operator",
+						Collections.emptyList(),
+						new CloseableRegistry());
+
+		final StateInitializationContext stateContext = new StateInitializationContextImpl(
+			false, operatorStateStore, null, null, null);
+
+		final SourceOperator<T, MockSourceSplit> sourceOperator =
+				new TestingSourceOperator<>(reader, watermarkStrategy, timeService);
+		sourceOperator.initializeState(stateContext);
+		sourceOperator.open();
+
+		return sourceOperator;
+	}
+
+	// ------------------------------------------------------------------------
+	//   test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class InterpretingSourceReader implements SourceReader<Integer, MockSourceSplit> {
+
+		private final Iterator<Consumer<ReaderOutput<Integer>>> actions;
+
+		@SafeVarargs
+		private InterpretingSourceReader(Consumer<ReaderOutput<Integer>>... actions) {
+			this.actions = Arrays.asList(actions).iterator();
+		}
+
+		@Override
+		public void start() {}
+
+		@Override
+		public InputStatus pollNext(ReaderOutput<Integer> output) {
+			if (actions.hasNext()) {
+				actions.next().accept(output);
+				return InputStatus.MORE_AVAILABLE;
+			} else {
+				return InputStatus.END_OF_INPUT;
+			}
+		}
+
+		@Override
+		public List<MockSourceSplit> snapshotState() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public CompletableFuture<Void> isAvailable() {
+			return CompletableFuture.completedFuture(null);
+		}
+
+		@Override
+		public void addSplits(List<MockSourceSplit> splits) {}
+
+		@Override
+		public void handleSourceEvents(SourceEvent sourceEvent) {}
+
+		@Override
+		public void close() {}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java
new file mode 100644
index 0000000..b3f079c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
+import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link SourceOutputWithWatermarks}.
+ */
+public class SourceOutputWithWatermarksTest {
+
+	@Test
+	public void testNoTimestampValue() {
+		final CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();
+		final SourceOutputWithWatermarks<Integer> out = SourceOutputWithWatermarks.createWithSameOutputs(
+				dataOutput, new RecordTimestampAssigner<>(), new NoWatermarksGenerator<>());
+
+		out.collect(17);
+
+		final Object event = dataOutput.events.get(0);
+		assertThat(event, instanceOf(StreamRecord.class));
+		assertEquals(TimestampAssigner.NO_TIMESTAMP, ((StreamRecord<?>) event).getTimestamp());
+	}
+
+	@Test
+	public void eventsAreBeforeWatermarks() {
+		final CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();
+		final SourceOutputWithWatermarks<Integer> out = SourceOutputWithWatermarks.createWithSameOutputs(
+				dataOutput, new RecordTimestampAssigner<>(), new TestWatermarkGenerator<>());
+
+		out.collect(42, 12345L);
+
+		assertThat(dataOutput.events, contains(
+				new StreamRecord<>(42, 12345L),
+				new org.apache.flink.streaming.api.watermark.Watermark(12345L)
+		));
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestWatermarkGenerator<T> implements WatermarkGenerator<T> {
+
+		private long lastTimestamp;
+
+		@Override
+		public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
+			lastTimestamp = eventTimestamp;
+			output.emitWatermark(new Watermark(eventTimestamp));
+		}
+
+		@Override
+		public void onPeriodicEmit(WatermarkOutput output) {
+			output.emitWatermark(new Watermark(lastTimestamp));
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
new file mode 100644
index 0000000..43992dc
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+/**
+ * A SourceOperator extension to simplify test setup.
+ */
+public class TestingSourceOperator<T>  extends SourceOperator<T, MockSourceSplit> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final int subtaskIndex;
+	private final int parallelism;
+
+	public TestingSourceOperator(
+			SourceReader<T, MockSourceSplit> reader,
+			WatermarkStrategy<T> watermarkStrategy,
+			ProcessingTimeService timeService) {
+
+		this(reader, watermarkStrategy, timeService, new MockOperatorEventGateway(), 1, 5);
+	}
+
+	public TestingSourceOperator(
+			SourceReader<T, MockSourceSplit> reader,
+			OperatorEventGateway eventGateway,
+			int subtaskIndex) {
+
+		this(reader, WatermarkStrategies.<T>noWatermarks().build(), new TestProcessingTimeService(), eventGateway, subtaskIndex, 5);
+	}
+
+	public TestingSourceOperator(
+			SourceReader<T, MockSourceSplit> reader,
+			WatermarkStrategy<T> watermarkStrategy,
+			ProcessingTimeService timeService,
+			OperatorEventGateway eventGateway,
+			int subtaskIndex,
+			int parallelism) {
+
+		super(
+			(context) -> reader,
+			eventGateway,
+			new MockSourceSplitSerializer(),
+			watermarkStrategy,
+			timeService);
+
+		this.subtaskIndex = subtaskIndex;
+		this.parallelism = parallelism;
+		this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+	}
+
+	@Override
+	public StreamingRuntimeContext getRuntimeContext() {
+		return new MockStreamingRuntimeContext(false, parallelism, subtaskIndex);
+	}
+
+	// this is overridden to avoid complex mock injection through the "containingTask"
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		ExecutionConfig cfg = new ExecutionConfig();
+		cfg.setAutoWatermarkInterval(100);
+		return cfg;
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java
new file mode 100644
index 0000000..0fb3f81
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for the {@link WatermarkToDataOutput}.
+ */
+public class WatermarkToDataOutputTest {
+
+	@Test
+	public void testInitialZeroWatermark() {
+		final CollectingDataOutput<Object> testingOutput = new CollectingDataOutput<>();
+		final WatermarkToDataOutput wmOutput = new WatermarkToDataOutput(testingOutput);
+
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(0L));
+
+		assertThat(testingOutput.events, contains(new Watermark(0L)));
+	}
+
+	@Test
+	public void testWatermarksDoNotRegress() {
+		final CollectingDataOutput<Object> testingOutput = new CollectingDataOutput<>();
+		final WatermarkToDataOutput wmOutput = new WatermarkToDataOutput(testingOutput);
+
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(12L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(17L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(10L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(18L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(17L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(18L));
+
+		assertThat(testingOutput.events, contains(
+			new Watermark(12L),
+			new Watermark(17L),
+			new Watermark(18L)
+		));
+	}
+
+	@Test
+	public void becomingActiveEmitsStatus() {
+		final CollectingDataOutput<Object> testingOutput = new CollectingDataOutput<>();
+		final WatermarkToDataOutput wmOutput = new WatermarkToDataOutput(testingOutput);
+
+		wmOutput.markIdle();
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(100L));
+
+		assertThat(testingOutput.events, contains(
+			StreamStatus.IDLE,
+			StreamStatus.ACTIVE,
+			new Watermark(100L)
+		));
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index b1e75b2..515bd51 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
@@ -107,7 +109,7 @@ public class SourceOperatorStreamTaskTest {
 
 			// Build expected output to verify the results
 			Queue<Object> expectedOutput = new LinkedList<>();
-			expectedRecords.forEach(r -> expectedOutput.offer(new StreamRecord<>(r)));
+			expectedRecords.forEach(r -> expectedOutput.offer(new StreamRecord<>(r, TimestampAssigner.NO_TIMESTAMP)));
 			// Add barrier to the expected output.
 			expectedOutput.add(new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions));
 
@@ -128,8 +130,10 @@ public class SourceOperatorStreamTaskTest {
 			long checkpointId,
 			TaskStateSnapshot snapshot) throws Exception {
 		// get a source operator.
-		SourceOperatorFactory<Integer> sourceOperatorFactory =
-				new SourceOperatorFactory<>(new MockSource(Boundedness.BOUNDED, 1));
+		SourceOperatorFactory<Integer> sourceOperatorFactory = new SourceOperatorFactory<>(
+				new MockSource(Boundedness.BOUNDED, 1),
+				WatermarkStrategies.<Integer>noWatermarks().build());
+
 		// build a test harness.
 		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
 				new MultipleInputStreamTaskTestHarnessBuilder<>(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);


[flink] 09/16: [FLINK-17899][runtime][refactor] Make ProcessingTimeService always available to operators.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3031accafc9ff1d6a3ee16e28f05d0df9c2da22
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 19 20:09:17 2020 +0200

    [FLINK-17899][runtime][refactor] Make ProcessingTimeService always available to operators.
    
    Previously, it was not always supplied to operators to not create it unless needed.
    This change puts a lazy factory into the parameters supplied to all operators.
---
 .../api/operators/StreamOperatorFactoryUtil.java        | 14 ++++++++++----
 .../api/operators/StreamOperatorParameters.java         | 17 ++++++++++++++---
 2 files changed, 24 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
index fb0917f..8adb1c1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import java.util.Optional;
+import java.util.function.Supplier;
 
 /**
  * A utility to instantiate new operators with a given factory.
@@ -51,13 +52,18 @@ public class StreamOperatorFactoryUtil {
 		MailboxExecutor mailboxExecutor = containingTask.getMailboxExecutorFactory().createExecutor(configuration.getChainIndex());
 
 		if (operatorFactory instanceof YieldingOperatorFactory) {
-			((YieldingOperatorFactory) operatorFactory).setMailboxExecutor(mailboxExecutor);
+			((YieldingOperatorFactory<?>) operatorFactory).setMailboxExecutor(mailboxExecutor);
 		}
 
-		ProcessingTimeService processingTimeService = null;
+		final Supplier<ProcessingTimeService> processingTimeServiceFactory =
+				() -> containingTask.getProcessingTimeServiceFactory().createProcessingTimeService(mailboxExecutor);
+
+		final ProcessingTimeService processingTimeService;
 		if (operatorFactory instanceof ProcessingTimeServiceAware) {
-			processingTimeService = containingTask.getProcessingTimeServiceFactory().createProcessingTimeService(mailboxExecutor);
+			processingTimeService = processingTimeServiceFactory.get();
 			((ProcessingTimeServiceAware) operatorFactory).setProcessingTimeService(processingTimeService);
+		} else {
+			processingTimeService = null;
 		}
 
 		// TODO: what to do with ProcessingTimeServiceAware?
@@ -66,7 +72,7 @@ public class StreamOperatorFactoryUtil {
 				containingTask,
 				configuration,
 				output,
-				processingTimeService,
+				processingTimeService != null ? () -> processingTimeService : processingTimeServiceFactory,
 				operatorEventDispatcher));
 		return new Tuple2<>(op, Optional.ofNullable(processingTimeService));
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
index 70df0e4..3213e8a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
@@ -25,6 +25,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
+import javax.annotation.Nullable;
+
+import java.util.function.Supplier;
+
 /**
  * Helper  class to construct {@link AbstractStreamOperatorV2}. Wraps couple of internal parameters
  * to simplify for users construction of classes extending {@link AbstractStreamOperatorV2} and to
@@ -37,19 +41,23 @@ public class StreamOperatorParameters<OUT> {
 	private final StreamTask<?, ?> containingTask;
 	private final StreamConfig config;
 	private final Output<StreamRecord<OUT>> output;
-	private final ProcessingTimeService processingTimeService;
+	private final Supplier<ProcessingTimeService> processingTimeServiceFactory;
 	private final OperatorEventDispatcher operatorEventDispatcher;
 
+	/** The ProcessingTimeService, lazily created, but cached so that we don't create more than one. */
+	@Nullable
+	private ProcessingTimeService processingTimeService;
+
 	public StreamOperatorParameters(
 			StreamTask<?, ?> containingTask,
 			StreamConfig config,
 			Output<StreamRecord<OUT>> output,
-			ProcessingTimeService processingTimeService,
+			Supplier<ProcessingTimeService> processingTimeServiceFactory,
 			OperatorEventDispatcher operatorEventDispatcher) {
 		this.containingTask = containingTask;
 		this.config = config;
 		this.output = output;
-		this.processingTimeService = processingTimeService;
+		this.processingTimeServiceFactory = processingTimeServiceFactory;
 		this.operatorEventDispatcher = operatorEventDispatcher;
 	}
 
@@ -66,6 +74,9 @@ public class StreamOperatorParameters<OUT> {
 	}
 
 	public ProcessingTimeService getProcessingTimeService() {
+		if (processingTimeService == null) {
+			processingTimeService = processingTimeServiceFactory.get();
+		}
 		return processingTimeService;
 	}