You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/27 20:01:07 UTC

[flink] 11/16: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54f1a4c8071a6d71111185449e795b2f00fa49e9
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 19 20:37:31 2020 +0200

    [FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources
---
 .../base/source/reader/SourceReaderBase.java       |  43 +++-
 .../base/source/reader/SourceReaderTestBase.java   |  15 +-
 .../flink/api/connector/source/ReaderOutput.java   |  87 +++++++
 .../flink/api/connector/source/SourceReader.java   |   2 +-
 .../connector/source/mocks/MockSourceReader.java   |   4 +-
 .../streaming/api/operators/SourceOperator.java    |  60 ++++-
 .../api/operators/SourceOperatorFactory.java       |  21 +-
 .../source/BatchTimestampsAndWatermarks.java       | 126 ++++++++++
 .../source/SourceOutputWithWatermarks.java         | 179 ++++++++++++++
 .../source/StreamingTimestampsAndWatermarks.java   | 247 ++++++++++++++++++++
 .../operators/source/TimestampsAndWatermarks.java  |  92 ++++++++
 .../source/TimestampsAndWatermarksContext.java     |  46 ++++
 .../operators/source/WatermarkToDataOutput.java    |  86 +++++++
 .../api/operators/SourceOperatorTest.java          |  32 +--
 .../api/operators/source/CollectingDataOutput.java |  56 +++++
 .../source/OnEventTestWatermarkGenerator.java      |  34 +++
 .../source/OnPeriodicTestWatermarkGenerator.java   |  43 ++++
 .../source/SourceOperatorEventTimeTest.java        | 257 +++++++++++++++++++++
 .../source/SourceOutputWithWatermarksTest.java     |  85 +++++++
 .../operators/source/TestingSourceOperator.java    |  94 ++++++++
 .../source/WatermarkToDataOutputTest.java          |  77 ++++++
 .../tasks/SourceOperatorStreamTaskTest.java        |  10 +-
 22 files changed, 1637 insertions(+), 59 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index b5781de..1da08d1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader;
 
+import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -63,7 +64,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 
 	/** The state of the splits. */
-	private final Map<String, SplitStateT> splitStates;
+	private final Map<String, SplitContext<T, SplitStateT>> splitStates;
 
 	/** The record emitter to handle the records read by the SplitReaders. */
 	protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
@@ -111,7 +112,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	}
 
 	@Override
-	public InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception {
+	public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
 		splitFetcherManager.checkErrors();
 		// poll from the queue if the last element was successfully handled. Otherwise
 		// just pass the last element again.
@@ -133,14 +134,19 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 			// Process one record.
 			if (splitIter.hasNext()) {
 				// emit the record.
-				E record = splitIter.next();
-				recordEmitter.emitRecord(record, sourceOutput, splitStates.get(splitIter.currentSplitId()));
+				final E record = splitIter.next();
+				final SplitContext<T, SplitStateT> splitContext = splitStates.get(splitIter.currentSplitId());
+				final SourceOutput<T> splitOutput = splitContext.getOrCreateSplitOutput(output);
+				recordEmitter.emitRecord(record, splitOutput, splitContext.state);
 				LOG.trace("Emitted record: {}", record);
 			}
 			// Do some cleanup if the all the records in the current splitIter have been processed.
 			if (!splitIter.hasNext()) {
 				// First remove the state of the split.
-				splitIter.finishedSplitIds().forEach(splitStates::remove);
+				splitIter.finishedSplitIds().forEach((id) -> {
+					splitStates.remove(id);
+					output.releaseOutputForSplit(id);
+				});
 				// Handle the finished splits.
 				onSplitFinished(splitIter.finishedSplitIds());
 				// Prepare the return status based on the availability of the next element.
@@ -173,7 +179,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	@Override
 	public List<SplitT> snapshotState() {
 		List<SplitT> splits = new ArrayList<>();
-		splitStates.forEach((id, state) -> splits.add(toSplitType(id, state)));
+		splitStates.forEach((id, context) -> splits.add(toSplitType(id, context.state)));
 		return splits;
 	}
 
@@ -181,7 +187,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	public void addSplits(List<SplitT> splits) {
 		LOG.trace("Adding splits {}", splits);
 		// Initialize the state for each split.
-		splits.forEach(s -> splitStates.put(s.splitId(), initializedState(s)));
+		splits.forEach(s -> splitStates.put(s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
 		// Hand over the splits to the split fetcher to start fetch.
 		splitFetcherManager.addSplits(splits);
 	}
@@ -201,6 +207,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 		splitFetcherManager.close(options.sourceReaderCloseTimeout);
 	}
 
+
+
 	// -------------------- Abstract method to allow different implementations ------------------
 	/**
 	 * Handles the finished splits to clean the state if needed.
@@ -233,4 +241,25 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 			return InputStatus.NOTHING_AVAILABLE;
 		}
 	}
+
+	// ------------------ private helper classes ---------------------
+
+	private static final class SplitContext<T, SplitStateT> {
+
+		final String splitId;
+		final SplitStateT state;
+		SourceOutput<T> sourceOutput;
+
+		private SplitContext(String splitId, SplitStateT state) {
+			this.state = state;
+			this.splitId = splitId;
+		}
+
+		SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
+			if (sourceOutput == null) {
+				sourceOutput = mainOutput.createOutputForSplit(splitId);
+			}
+			return sourceOutput;
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index fcf44fa..5f58836 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
@@ -170,7 +171,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
 	/**
 	 * A source output that validates the output.
 	 */
-	protected static class ValidatingSourceOutput implements SourceOutput<Integer> {
+	protected static class ValidatingSourceOutput implements ReaderOutput<Integer> {
 		private Set<Integer> consumedValues = new HashSet<>();
 		private int max = Integer.MIN_VALUE;
 		private int min = Integer.MAX_VALUE;
@@ -204,13 +205,17 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
 		}
 
 		@Override
-		public void emitWatermark(Watermark watermark) {
-
-		}
+		public void emitWatermark(Watermark watermark) {}
 
 		@Override
-		public void markIdle() {
+		public void markIdle() {}
 
+		@Override
+		public SourceOutput<Integer> createOutputForSplit(String splitId) {
+			return this;
 		}
+
+		@Override
+		public void releaseOutputForSplit(String splitId) {}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
new file mode 100644
index 0000000..1774a7c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
@@ -0,0 +1,87 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+/**
+ * The interface provided by Flink task to the {@link SourceReader} to emit records
+ * to downstream operators for message processing.
+ */
+@PublicEvolving
+public interface ReaderOutput<T> extends SourceOutput<T> {
+
+	/**
+	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 *
+	 * @param record the record to emit.
+	 */
+	@Override
+	void collect(T record);
+
+	/**
+	 * Emit a record with timestamp.
+	 *
+	 * @param record the record to emit.
+	 * @param timestamp the timestamp of the record.
+	 */
+	@Override
+	void collect(T record, long timestamp);
+
+	/**
+	 * Emits the given watermark.
+	 *
+	 * <p>Emitting a watermark also implicitly marks the stream as <i>active</i>, ending
+	 * previously marked idleness.
+	 */
+	@Override
+	void emitWatermark(Watermark watermark);
+
+	/**
+	 * Marks this output as idle, meaning that downstream operations do not
+	 * wait for watermarks from this output.
+	 *
+	 * <p>An output becomes active again as soon as the next watermark is emitted.
+	 */
+	@Override
+	void markIdle();
+
+	/**
+	 * Creates a {@code SourceOutput} for a specific Source Split. Use these outputs if you want to
+	 * run split-local logic, like watermark generation.
+	 *
+	 * <p>If a split-local output was already created for this split-ID, the method will return that instance,
+	 * so that only one split-local output exists per split-ID.
+	 *
+	 * <p><b>IMPORTANT:</b> After the split has been finished, it is crucial to release the created
+	 * output again. Otherwise it will continue to contribute to the watermark generation like a
+	 * perpetually stalling source split, and may hold back the watermark indefinitely.
+	 *
+	 * @see #releaseOutputForSplit(String)
+	 */
+	SourceOutput<T> createOutputForSplit(String splitId);
+
+	/**
+	 * Releases the {@code SourceOutput} created for the split with the given ID.
+	 *
+	 * @see #createOutputForSplit(String)
+	 */
+	void releaseOutputForSplit(String splitId);
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index 6a26d70..b09a724 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -51,7 +51,7 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
 	 *
 	 * @return The InputStatus of the SourceReader after the method invocation.
 	 */
-	InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception;
+	InputStatus pollNext(ReaderOutput<T> output) throws Exception;
 
 	/**
 	 * Checkpoint on the state of the source.
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index 9a7a327..0bdcdec 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.connector.source.mocks;
 
+import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.core.io.InputStatus;
 
@@ -53,7 +53,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit>
 	}
 
 	@Override
-	public InputStatus pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
+	public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception {
 		boolean finished = true;
 		currentSplitIndex = 0;
 		// Find first splits with available records.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 5d4b2d6..9ac80d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -19,11 +19,12 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
@@ -38,8 +39,10 @@ import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
@@ -81,30 +84,47 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	/** The event gateway through which this operator talks to its coordinator. */
 	private final OperatorEventGateway operatorEventGateway;
 
-	// ---- lazily initialized fields ----
+	/** The factory for timestamps and watermark generators. */
+	private final WatermarkStrategy<OUT> watermarkStrategy;
+
+	// ---- lazily initialized fields (these fields are the "hot" fields) ----
 
 	/** The source reader that does most of the work. */
 	private SourceReader<OUT, SplitT> sourceReader;
 
+	private ReaderOutput<OUT> currentMainOutput;
+
+	private DataOutput<OUT> lastInvokedOutput;
+
 	/** The state that holds the currently assigned splits. */
 	private ListState<SplitT> readerState;
 
+	/** The event time and watermarking logic. Ideally this would be eagerly passed into this operator,
+	 * but we currently need to instantiate this lazily, because the metric groups exist only later. */
+	private TimestampsAndWatermarks<OUT> eventTimeLogic;
+
 	public SourceOperator(
 			Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory,
 			OperatorEventGateway operatorEventGateway,
-			SimpleVersionedSerializer<SplitT> splitSerializer) {
+			SimpleVersionedSerializer<SplitT> splitSerializer,
+			WatermarkStrategy<OUT> watermarkStrategy,
+			ProcessingTimeService timeService) {
 
 		this.readerFactory = checkNotNull(readerFactory);
 		this.operatorEventGateway = checkNotNull(operatorEventGateway);
 		this.splitSerializer = checkNotNull(splitSerializer);
+		this.watermarkStrategy = checkNotNull(watermarkStrategy);
+		this.processingTimeService = timeService;
 	}
 
 	@Override
 	public void open() throws Exception {
+		final MetricGroup metricGroup = getMetricGroup();
+
 		final SourceReaderContext context = new SourceReaderContext() {
 			@Override
 			public MetricGroup metricGroup() {
-				return getRuntimeContext().getMetricGroup();
+				return metricGroup;
 			}
 
 			@Override
@@ -113,6 +133,15 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 			}
 		};
 
+		// in the future when we support both batch and streaming modes for the source operator,
+		// and when this one is migrated to the "eager initialization" operator (StreamOperatorV2),
+		// then we should evaluate this during operator construction.
+		eventTimeLogic = TimestampsAndWatermarks.createStreamingEventTimeLogic(
+				watermarkStrategy,
+				metricGroup,
+				getProcessingTimeService(),
+				getExecutionConfig().getAutoWatermarkInterval());
+
 		sourceReader = readerFactory.apply(context);
 
 		// restore the state if necessary.
@@ -125,12 +154,31 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 		sourceReader.start();
 		// Register the reader to the coordinator.
 		registerReader();
+
+		eventTimeLogic.startPeriodicWatermarkEmits();
+	}
+
+	@Override
+	public void close() throws Exception {
+		eventTimeLogic.stopPeriodicWatermarkEmits();
+		super.close();
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
-		return sourceReader.pollNext((SourceOutput<OUT>) output);
+		// guarding an assumptions we currently make due to the fact that certain classes
+		// assume a constant output
+		assert lastInvokedOutput == output || lastInvokedOutput == null;
+
+		// short circuit the common case (every invocation except the first)
+		if (currentMainOutput != null) {
+			return sourceReader.pollNext(currentMainOutput);
+		}
+
+		// this creates a batch or streaming output based on the runtime mode
+		currentMainOutput = eventTimeLogic.createMainOutput(output);
+		lastInvokedOutput = output;
+		return sourceReader.pollNext(currentMainOutput);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index c30a0a7..02c7927 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -27,6 +28,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
+import org.apache.flink.streaming.api.operators.source.NoOpWatermarkGenerator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
 
 import java.util.function.Function;
 
@@ -34,13 +38,16 @@ import java.util.function.Function;
  * The Factory class for {@link SourceOperator}.
  */
 public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT>
-		implements CoordinatedOperatorFactory<OUT> {
+		implements CoordinatedOperatorFactory<OUT>, ProcessingTimeServiceAware {
 
 	private static final long serialVersionUID = 1L;
 
 	/** The {@link Source} to create the {@link SourceOperator}. */
 	private final Source<OUT, ?, ?> source;
 
+	/** The event time setup (timestamp assigners, watermark generators, etc.). */
+	private final WatermarkStrategy<OUT> watermarkStrategy = (ctx) -> new NoOpWatermarkGenerator<>();
+
 	/** The number of worker thread for the source coordinator. */
 	private final int numCoordinatorWorkerThread;
 
@@ -61,7 +68,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 		final SourceOperator<OUT, ?> sourceOperator = instantiateSourceOperator(
 				source::createReader,
 				gateway,
-				source.getSplitSerializer());
+				source.getSplitSerializer(),
+				watermarkStrategy,
+				parameters.getProcessingTimeService());
 
 		sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
 		parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
@@ -99,7 +108,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(
 			Function<SourceReaderContext, SourceReader<T, ?>> readerFactory,
 			OperatorEventGateway eventGateway,
-			SimpleVersionedSerializer<?> splitSerializer) {
+			SimpleVersionedSerializer<?> splitSerializer,
+			WatermarkStrategy<T> watermarkStrategy,
+			ProcessingTimeService timeService) {
 
 		// jumping through generics hoops: cast the generics away to then cast them back more strictly typed
 		final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
@@ -110,6 +121,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 		return new SourceOperator<>(
 				typedReaderFactory,
 				eventGateway,
-				typedSplitSerializer);
+				typedSplitSerializer,
+				watermarkStrategy,
+				timeService);
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
new file mode 100644
index 0000000..135cad9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of {@link TimestampsAndWatermarks} to be used during batch execution of a
+ * program. Batch execution has no watermarks, so all watermark related operations in this
+ * implementation are no-ops.
+ *
+ * @param <T> The type of the emitted records.
+ */
+@Internal
+public class BatchTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T> {
+
+	private final TimestampAssigner<T> timestamps;
+
+	/**
+	 * Creates a new BatchTimestampsAndWatermarks with the given TimestampAssigner.
+	 */
+	public BatchTimestampsAndWatermarks(TimestampAssigner<T> timestamps) {
+		this.timestamps = checkNotNull(timestamps);
+	}
+
+	@Override
+	public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output) {
+		checkNotNull(output);
+		return new TimestampsOnlyOutput<>(output, timestamps);
+	}
+
+	@Override
+	public void startPeriodicWatermarkEmits() {
+		// no periodic watermarks in batch processing
+	}
+
+	@Override
+	public void stopPeriodicWatermarkEmits() {
+		// no periodic watermarks in batch processing
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A simple implementation of {@link SourceOutput} and {@link ReaderOutput} that extracts
+	 * timestamps but has no watermarking logic. Because only watermarking logic has state per
+	 * Source Split, the same instance gets shared across all Source Splits.
+	 *
+	 * @param <T> The type of the emitted records.
+	 */
+	private static final class TimestampsOnlyOutput<T> implements ReaderOutput<T> {
+
+		private final PushingAsyncDataInput.DataOutput<T> output;
+		private final TimestampAssigner<T> timestampAssigner;
+
+		private TimestampsOnlyOutput(
+				PushingAsyncDataInput.DataOutput<T> output,
+				TimestampAssigner<T> timestampAssigner) {
+
+			this.output = output;
+			this.timestampAssigner = timestampAssigner;
+		}
+
+		@Override
+		public void collect(T record) {
+			collect(record, TimestampAssigner.NO_TIMESTAMP);
+		}
+
+		@Override
+		public void collect(T record, long timestamp) {
+			try {
+				output.emitRecord(new StreamRecord<>(record, timestampAssigner.extractTimestamp(record, timestamp)));
+			} catch (ExceptionInChainedOperatorException e) {
+				throw e;
+			} catch (Exception e) {
+				throw new ExceptionInChainedOperatorException(e);
+			}
+		}
+
+		@Override
+		public void emitWatermark(Watermark watermark) {
+			// do nothing, this does not forward any watermarks manually emitted by the source directly
+		}
+
+		@Override
+		public void markIdle() {
+			// do nothing, because without watermarks there is no idleness
+		}
+
+		@Override
+		public SourceOutput<T> createOutputForSplit(String splitId) {
+			// we don't need per-partition instances, because we do not generate watermarks
+			return this;
+		}
+
+		@Override
+		public void releaseOutputForSplit(String splitId) {
+			// nothing to release, because we do not create per-partition instances
+		}
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
new file mode 100644
index 0000000..3a8396e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the SourceOutput. The records emitted to this output are pushed into a given
+ * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into the same output, or
+ * into a separate {@link WatermarkOutput}, if one is provided.
+ *
+ * <h2>Periodic Watermarks</h2>
+ *
+ * <p>This output does not implement automatic periodic watermark emission. The
+ * method {@link SourceOutputWithWatermarks#emitPeriodicWatermark()} needs to be called periodically.
+ *
+ * <h2>Note on Performance Considerations</h2>
+ *
+ * <p>The methods {@link SourceOutput#collect(Object)} and {@link SourceOutput#collect(Object, long)}
+ * are highly performance-critical (part of the hot loop). To make the code as JIT friendly as possible,
+ * we want to have only a single implementation of these two methods, across all classes.
+ * That way, the JIT compiler can de-virtualize (and inline) them better.
+ *
+ * <p>Currently, we have one implementation of these methods in the batch case (see class
+ * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this class). When the JVM
+ * is dedicated to a single job (or type of job) only one of these classes will be loaded. In mixed
+ * job setups, we still have a bimorphic method (rather than a poly/-/mega-morphic method).
+ *
+ * @param <T> The type of emitted records.
+ */
+@Internal
+public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
+
+	private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
+
+	private final TimestampAssigner<T> timestampAssigner;
+
+	private final WatermarkGenerator<T> watermarkGenerator;
+
+	private final WatermarkOutput onEventWatermarkOutput;
+
+	private final WatermarkOutput periodicWatermarkOutput;
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.
+	 */
+	protected SourceOutputWithWatermarks(
+			PushingAsyncDataInput.DataOutput<T> recordsOutput,
+			WatermarkOutput onEventWatermarkOutput,
+			WatermarkOutput periodicWatermarkOutput,
+			TimestampAssigner<T> timestampAssigner,
+			WatermarkGenerator<T> watermarkGenerator) {
+
+		this.recordsOutput = checkNotNull(recordsOutput);
+		this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput);
+		this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput);
+		this.timestampAssigner = checkNotNull(timestampAssigner);
+		this.watermarkGenerator = checkNotNull(watermarkGenerator);
+	}
+
+	// ------------------------------------------------------------------------
+	// SourceOutput Methods
+	//
+	// Note that the two methods below are final, as a partial enforcement
+	// of the performance design goal mentioned in the class-level comment.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void collect(T record) {
+		collect(record, TimestampAssigner.NO_TIMESTAMP);
+	}
+
+	@Override
+	public final void collect(T record, long timestamp) {
+		try {
+			final long assignedTimestamp = timestampAssigner.extractTimestamp(record, timestamp);
+
+			// IMPORTANT: The event must be emitted before the watermark generator is called.
+			recordsOutput.emitRecord(new StreamRecord<>(record, assignedTimestamp));
+			watermarkGenerator.onEvent(record, assignedTimestamp, onEventWatermarkOutput);
+		} catch (ExceptionInChainedOperatorException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExceptionInChainedOperatorException(e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// WatermarkOutput Methods
+	//
+	// These two methods are final as well, to enforce the contract that the
+	// watermarks from emitWatermark(Watermark) go to the same output as the
+	// watermarks from the watermarkGenerator.onEvent(...) calls in the collect(...)
+	// methods.
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final void emitWatermark(Watermark watermark) {
+		onEventWatermarkOutput.emitWatermark(watermark);
+	}
+
+	@Override
+	public final void markIdle() {
+		onEventWatermarkOutput.markIdle();
+	}
+
+	public final void emitPeriodicWatermark() {
+		watermarkGenerator.onPeriodicEmit(periodicWatermarkOutput);
+	}
+
+	// ------------------------------------------------------------------------
+	// Factories
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the (possibly different) WatermarkOutput.
+	 */
+	public static <E> SourceOutputWithWatermarks<E> createWithSameOutputs(
+			PushingAsyncDataInput.DataOutput<E> recordsAndWatermarksOutput,
+			TimestampAssigner<E> timestampAssigner,
+			WatermarkGenerator<E> watermarkGenerator) {
+
+		final WatermarkOutput watermarkOutput = new WatermarkToDataOutput(recordsAndWatermarksOutput);
+
+		return new SourceOutputWithWatermarks<>(
+				recordsAndWatermarksOutput,
+				watermarkOutput,
+				watermarkOutput,
+				timestampAssigner,
+				watermarkGenerator);
+	}
+
+	/**
+	 * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+	 * and watermarks to the different WatermarkOutputs.
+	 */
+	public static <E> SourceOutputWithWatermarks<E> createWithSeparateOutputs(
+		PushingAsyncDataInput.DataOutput<E> recordsOutput,
+		WatermarkOutput onEventWatermarkOutput,
+		WatermarkOutput periodicWatermarkOutput,
+		TimestampAssigner<E> timestampAssigner,
+		WatermarkGenerator<E> watermarkGenerator) {
+
+		return new SourceOutputWithWatermarks<>(
+			recordsOutput,
+			onEventWatermarkOutput,
+			periodicWatermarkOutput,
+			timestampAssigner,
+			watermarkGenerator);
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java
new file mode 100644
index 0000000..af51f414
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of timestamp extraction and watermark generation logic for streaming sources.
+ *
+ * @param <T> The type of the emitted records.
+ */
+@Internal
+public class StreamingTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T> {
+
+	private final TimestampAssigner<T> timestampAssigner;
+
+	private final WatermarkGeneratorSupplier<T> watermarksFactory;
+
+	private final WatermarkGeneratorSupplier.Context watermarksContext;
+
+	private final ProcessingTimeService timeService;
+
+	private final long periodicWatermarkInterval;
+
+	@Nullable
+	private SplitLocalOutputs<T> currentPerSplitOutputs;
+
+	@Nullable
+	private StreamingReaderOutput<T> currentMainOutput;
+
+	@Nullable
+	private ScheduledFuture<?> periodicEmitHandle;
+
+	public StreamingTimestampsAndWatermarks(
+			TimestampAssigner<T> timestampAssigner,
+			WatermarkGeneratorSupplier<T> watermarksFactory,
+			WatermarkGeneratorSupplier.Context watermarksContext,
+			ProcessingTimeService timeService,
+			Duration periodicWatermarkInterval) {
+
+		this.timestampAssigner = timestampAssigner;
+		this.watermarksFactory = watermarksFactory;
+		this.watermarksContext = watermarksContext;
+		this.timeService = timeService;
+
+		long periodicWatermarkIntervalMillis;
+		try {
+			periodicWatermarkIntervalMillis = periodicWatermarkInterval.toMillis();
+		} catch (ArithmeticException ignored) {
+			// long integer overflow
+			periodicWatermarkIntervalMillis = Long.MAX_VALUE;
+		}
+		this.periodicWatermarkInterval = periodicWatermarkIntervalMillis;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output) {
+		// At the moment, we assume only one output is ever created!
+		// This assumption is strict, currently, because many of the classes in this implementation do not
+		// support re-assigning the underlying output
+		checkState(currentMainOutput == null && currentPerSplitOutputs == null, "already created a main output");
+
+		final WatermarkOutput watermarkOutput = new WatermarkToDataOutput(output);
+		final WatermarkGenerator<T> watermarkGenerator = watermarksFactory.createWatermarkGenerator(watermarksContext);
+
+		currentPerSplitOutputs = new SplitLocalOutputs<>(
+				output,
+				watermarkOutput,
+				timestampAssigner,
+				watermarksFactory,
+				watermarksContext);
+
+		currentMainOutput = new StreamingReaderOutput<>(
+				output,
+				watermarkOutput,
+				timestampAssigner,
+				watermarkGenerator,
+				currentPerSplitOutputs);
+
+		return currentMainOutput;
+	}
+
+	@Override
+	public void startPeriodicWatermarkEmits() {
+		checkState(periodicEmitHandle == null, "periodic emitter already started");
+
+		if (periodicWatermarkInterval == 0) {
+			// a value of zero means not activated
+			return;
+		}
+
+		periodicEmitHandle = timeService.scheduleWithFixedDelay(
+				this::triggerPeriodicEmit,
+				periodicWatermarkInterval,
+				periodicWatermarkInterval);
+	}
+
+	@Override
+	public void stopPeriodicWatermarkEmits() {
+		if (periodicEmitHandle != null) {
+			periodicEmitHandle.cancel(false);
+			periodicEmitHandle = null;
+		}
+	}
+
+	void triggerPeriodicEmit(@SuppressWarnings("unused") long wallClockTimestamp) {
+		if (currentPerSplitOutputs != null) {
+			currentPerSplitOutputs.emitPeriodicWatermark();
+		}
+		if (currentMainOutput != null) {
+			currentMainOutput.emitPeriodicWatermark();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class StreamingReaderOutput<T> extends SourceOutputWithWatermarks<T> implements ReaderOutput<T> {
+
+		private final SplitLocalOutputs<T> splitLocalOutputs;
+
+		StreamingReaderOutput(
+				PushingAsyncDataInput.DataOutput<T> output,
+				WatermarkOutput watermarkOutput,
+				TimestampAssigner<T> timestampAssigner,
+				WatermarkGenerator<T> watermarkGenerator,
+				SplitLocalOutputs<T> splitLocalOutputs) {
+
+			super(output, watermarkOutput, watermarkOutput, timestampAssigner, watermarkGenerator);
+			this.splitLocalOutputs = splitLocalOutputs;
+		}
+
+		@Override
+		public SourceOutput<T> createOutputForSplit(String splitId) {
+			return splitLocalOutputs.createOutputForSplit(splitId);
+		}
+
+		@Override
+		public void releaseOutputForSplit(String splitId) {
+			splitLocalOutputs.releaseOutputForSplit(splitId);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A holder and factory for split-local {@link SourceOutput}s. The split-local outputs maintain
+	 * local watermark generators with their own state, to facilitate per-split watermarking logic.
+	 *
+	 * @param <T> The type of the emitted records.
+	 */
+	private static final class SplitLocalOutputs<T> {
+
+		private final WatermarkOutputMultiplexer watermarkMultiplexer;
+		private final Map<String, SourceOutputWithWatermarks<T>> localOutputs;
+		private final PushingAsyncDataInput.DataOutput<T> recordOutput;
+		private final TimestampAssigner<T> timestampAssigner;
+		private final WatermarkGeneratorSupplier<T> watermarksFactory;
+		private final WatermarkGeneratorSupplier.Context watermarkContext;
+
+		private SplitLocalOutputs(
+				PushingAsyncDataInput.DataOutput<T> recordOutput,
+				WatermarkOutput watermarkOutput,
+				TimestampAssigner<T> timestampAssigner,
+				WatermarkGeneratorSupplier<T> watermarksFactory,
+				WatermarkGeneratorSupplier.Context watermarkContext) {
+
+			this.recordOutput = recordOutput;
+			this.timestampAssigner = timestampAssigner;
+			this.watermarksFactory = watermarksFactory;
+			this.watermarkContext = watermarkContext;
+
+			this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
+			this.localOutputs = new LinkedHashMap<>(); // we use a LinkedHashMap because it iterates faster
+		}
+
+		SourceOutput<T> createOutputForSplit(String splitId) {
+			final SourceOutputWithWatermarks<T> previous = localOutputs.get(splitId);
+			if (previous != null) {
+				return previous;
+			}
+
+			watermarkMultiplexer.registerNewOutput(splitId);
+			final WatermarkOutput onEventOutput = watermarkMultiplexer.getImmediateOutput(splitId);
+			final WatermarkOutput periodicOutput = watermarkMultiplexer.getDeferredOutput(splitId);
+
+			final WatermarkGenerator<T> watermarks = watermarksFactory.createWatermarkGenerator(watermarkContext);
+
+			final SourceOutputWithWatermarks<T> localOutput = SourceOutputWithWatermarks.createWithSeparateOutputs(
+					recordOutput, onEventOutput, periodicOutput, timestampAssigner, watermarks);
+
+			localOutputs.put(splitId, localOutput);
+			return localOutput;
+		}
+
+		void releaseOutputForSplit(String splitId) {
+			localOutputs.remove(splitId);
+			watermarkMultiplexer.unregisterOutput(splitId);
+		}
+
+		void emitPeriodicWatermark() {
+			// The call in the loop only records the next watermark candidate for each local output.
+			// The call to 'watermarkMultiplexer.onPeriodicEmit()' actually merges the watermarks.
+			// That way, we save inefficient repeated merging of (partially outdated) watermarks before
+			// all local generators have emitted their candidates.
+			for (SourceOutputWithWatermarks<?> output : localOutputs.values()) {
+				output.emitPeriodicWatermark();
+			}
+			watermarkMultiplexer.onPeriodicEmit();
+		}
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
new file mode 100644
index 0000000..f95ec55
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.time.Duration;
+
+/**
+ * Basic interface for the timestamp extraction and watermark generation logic for the
+ * {@link org.apache.flink.api.connector.source.SourceReader}.
+ *
+ * <p>Implementations of this class may or may not actually perform certain tasks, like watermark
+ * generation. For example, the batch-oriented implementation typically skips all watermark generation
+ * logic.
+ *
+ * @param <T> The type of the emitted records.
+ */
+@Internal
+public interface TimestampsAndWatermarks<T> {
+
+	/**
+	 * Creates the ReaderOutput for the source reader, than internally runs the timestamp extraction and
+	 * watermark generation.
+	 */
+	ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output);
+
+	/**
+	 * Starts emitting periodic watermarks, if this implementation produces watermarks, and if
+	 * periodic watermarks are configured.
+	 *
+	 * <p>Periodic watermarks are produced by periodically calling the
+	 * {@link org.apache.flink.api.common.eventtime.WatermarkGenerator#onPeriodicEmit(WatermarkOutput)} method
+	 * of the underlying Watermark Generators.
+	 */
+	void startPeriodicWatermarkEmits();
+
+	/**
+	 * Stops emitting periodic watermarks.
+	 */
+	void stopPeriodicWatermarkEmits();
+
+	// ------------------------------------------------------------------------
+	//  factories
+	// ------------------------------------------------------------------------
+
+	static <E> TimestampsAndWatermarks<E> createStreamingEventTimeLogic(
+			WatermarkStrategy<E> watermarkStrategy,
+			MetricGroup metrics,
+			ProcessingTimeService timeService,
+			long periodicWatermarkIntervalMillis) {
+
+		final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics);
+		final TimestampAssigner<E> timestampAssigner = watermarkStrategy.createTimestampAssigner(context);
+
+		return new StreamingTimestampsAndWatermarks<>(
+			timestampAssigner, watermarkStrategy, context, timeService, Duration.ofMillis(periodicWatermarkIntervalMillis));
+	}
+
+	static <E> TimestampsAndWatermarks<E> createBatchEventTimeLogic(
+			WatermarkStrategy<E> watermarkStrategy,
+			MetricGroup metrics) {
+
+		final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics);
+		final TimestampAssigner<E> timestampAssigner = watermarkStrategy.createTimestampAssigner(context);
+
+		return new BatchTimestampsAndWatermarks<>(timestampAssigner);
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
new file mode 100644
index 0000000..201b901
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple implementation of a context that is both {@link TimestampAssignerSupplier.Context}
+ * and {@link WatermarkGeneratorSupplier.Context}.
+ */
+@Internal
+public final class TimestampsAndWatermarksContext
+		implements TimestampAssignerSupplier.Context, WatermarkGeneratorSupplier.Context {
+
+	private final MetricGroup metricGroup;
+
+	public TimestampsAndWatermarksContext(MetricGroup metricGroup) {
+		this.metricGroup = checkNotNull(metricGroup);
+	}
+
+	@Override
+	public MetricGroup getMetricGroup() {
+		return metricGroup;
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
new file mode 100644
index 0000000..3e3b255
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An adapter that exposes a {@link WatermarkOutput} based on a {@link PushingAsyncDataInput.DataOutput}.
+ */
+@Internal
+public final class WatermarkToDataOutput implements WatermarkOutput {
+
+	private final PushingAsyncDataInput.DataOutput<?> output;
+	private long maxWatermarkSoFar;
+	private boolean isIdle;
+
+	/**
+	 * Creates a new WatermarkOutput against the given DataOutput.
+	 */
+	public WatermarkToDataOutput(PushingAsyncDataInput.DataOutput<?> output) {
+		this.output = checkNotNull(output);
+		this.maxWatermarkSoFar = Long.MIN_VALUE;
+	}
+
+	@Override
+	public void emitWatermark(Watermark watermark) {
+		final long newWatermark = watermark.getTimestamp();
+		if (newWatermark <= maxWatermarkSoFar) {
+			return;
+		}
+
+		maxWatermarkSoFar = newWatermark;
+
+		try {
+			if (isIdle) {
+				output.emitStreamStatus(StreamStatus.ACTIVE);
+				isIdle = false;
+			}
+
+			output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(newWatermark));
+		} catch (ExceptionInChainedOperatorException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExceptionInChainedOperatorException(e);
+		}
+	}
+
+	@Override
+	public void markIdle() {
+		if (isIdle) {
+			return;
+		}
+
+		try {
+			output.emitStreamStatus(StreamStatus.IDLE);
+			isIdle = true;
+		} catch (ExceptionInChainedOperatorException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new ExceptionInChainedOperatorException(e);
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 34b7962..93ced72 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
@@ -28,7 +27,6 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -39,7 +37,7 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.Before;
@@ -165,32 +163,4 @@ public class SourceOperatorTest {
 		return abstractStateBackend.createOperatorStateBackend(
 				env, "test-operator", Collections.emptyList(), cancelStreamRegistry);
 	}
-
-	// -------------- Testing SourceOperator class -------------
-
-	/**
-	 * A testing class that overrides the getRuntimeContext() Method.
-	 */
-	private static class TestingSourceOperator<OUT> extends SourceOperator<OUT, MockSourceSplit> {
-
-		private final int subtaskIndex;
-
-		TestingSourceOperator(
-				SourceReader<OUT, MockSourceSplit> reader,
-				OperatorEventGateway eventGateway,
-				int subtaskIndex) {
-
-			super(
-					(context) -> reader,
-					eventGateway,
-					new MockSourceSplitSerializer());
-
-			this.subtaskIndex = subtaskIndex;
-		}
-
-		@Override
-		public StreamingRuntimeContext getRuntimeContext() {
-			return new MockStreamingRuntimeContext(false, 5, subtaskIndex);
-		}
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
new file mode 100644
index 0000000..a536553
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A test utility implementation of {@link PushingAsyncDataInput.DataOutput} that collects all events.
+ */
+final class CollectingDataOutput<E> implements PushingAsyncDataInput.DataOutput<E> {
+
+	final List<Object> events = new ArrayList<>();
+
+	@Override
+	public void emitWatermark(Watermark watermark) throws Exception {
+		events.add(watermark);
+	}
+
+	@Override
+	public void emitStreamStatus(StreamStatus streamStatus) throws Exception {
+		events.add(streamStatus);
+	}
+
+	@Override
+	public void emitRecord(StreamRecord<E> streamRecord) throws Exception {
+		events.add(streamRecord);
+	}
+
+	@Override
+	public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+		events.add(latencyMarker);
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java
new file mode 100644
index 0000000..55141c1
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+
+final class OnEventTestWatermarkGenerator<T> implements WatermarkGenerator<T> {
+
+	@Override
+	public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
+		output.emitWatermark(new Watermark(eventTimestamp));
+	}
+
+	@Override
+	public void onPeriodicEmit(WatermarkOutput output) {}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java
new file mode 100644
index 0000000..1342457
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+
+import javax.annotation.Nullable;
+
+final class OnPeriodicTestWatermarkGenerator<T> implements WatermarkGenerator<T> {
+
+	@Nullable
+	private Long lastTimestamp;
+
+	@Override
+	public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
+		lastTimestamp = eventTimestamp;
+	}
+
+	@Override
+	public void onPeriodicEmit(WatermarkOutput output) {
+		if (lastTimestamp != null) {
+			output.emitWatermark(new Watermark(lastTimestamp));
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
new file mode 100644
index 0000000..347975c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests that validate correct handling of watermark generation in the {@link ReaderOutput} as created
+ * by the {@link StreamingTimestampsAndWatermarks}.
+ */
+public class SourceOperatorEventTimeTest {
+
+	@Test
+	public void testMainOutputPeriodicWatermarks() throws Exception {
+		final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+			.<Integer>forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>())
+			.build();
+
+		final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+			(output) -> output.collect(0, 100L),
+			(output) -> output.collect(0, 120L),
+			(output) -> output.collect(0, 110L)
+		);
+
+		assertThat(result, contains(
+			new Watermark(100L),
+			new Watermark(120L)
+		));
+	}
+
+	@Test
+	public void testMainOutputEventWatermarks() throws Exception {
+		final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+			.<Integer>forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>())
+			.build();
+
+		final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+			(output) -> output.collect(0, 100L),
+			(output) -> output.collect(0, 120L),
+			(output) -> output.collect(0, 110L)
+		);
+
+		assertThat(result, contains(
+			new Watermark(100L),
+			new Watermark(120L)
+		));
+	}
+
+	@Test
+	public void testPerSplitOutputPeriodicWatermarks() throws Exception {
+		final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+			.<Integer>forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>())
+			.build();
+
+		final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+			(output) -> {
+				output.createOutputForSplit("A");
+				output.createOutputForSplit("B");
+			},
+			(output) -> output.createOutputForSplit("A").collect(0, 100L),
+			(output) -> output.createOutputForSplit("B").collect(0, 200L),
+			(output) -> output.createOutputForSplit("A").collect(0, 150L),
+			(output) -> output.releaseOutputForSplit("A"),
+			(output) -> output.createOutputForSplit("B").collect(0, 200L)
+		);
+
+		assertThat(result, contains(
+			new Watermark(100L),
+			new Watermark(150L),
+			new Watermark(200L)
+		));
+	}
+
+	@Test
+	public void testPerSplitOutputEventWatermarks() throws Exception {
+		final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+				.<Integer>forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>())
+				.build();
+
+		final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+			(output) -> {
+				output.createOutputForSplit("one");
+				output.createOutputForSplit("two");
+			},
+			(output) -> output.createOutputForSplit("one").collect(0, 100L),
+			(output) -> output.createOutputForSplit("two").collect(0, 200L),
+			(output) -> output.createOutputForSplit("one").collect(0, 150L),
+			(output) -> output.releaseOutputForSplit("one"),
+			(output) -> output.createOutputForSplit("two").collect(0, 200L)
+		);
+
+		assertThat(result, contains(
+			new Watermark(100L),
+			new Watermark(150L),
+			new Watermark(200L)
+		));
+	}
+
+	// ------------------------------------------------------------------------
+	//   test execution helpers
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("FinalPrivateMethod")
+	@SafeVarargs
+	private final List<Watermark> testSequenceOfWatermarks(
+			final WatermarkStrategy<Integer> watermarkStrategy,
+			final Consumer<ReaderOutput<Integer>>... actions) throws Exception {
+
+		final List<Object> allEvents = testSequenceOfEvents(watermarkStrategy, actions);
+
+		return allEvents.stream()
+				.filter((evt) -> evt instanceof org.apache.flink.streaming.api.watermark.Watermark)
+				.map((evt) -> new Watermark(((org.apache.flink.streaming.api.watermark.Watermark) evt).getTimestamp()))
+				.collect(Collectors.toList());
+	}
+
+	@SuppressWarnings("FinalPrivateMethod")
+	@SafeVarargs
+	private final List<Object> testSequenceOfEvents(
+			WatermarkStrategy<Integer> watermarkStrategy,
+			final Consumer<ReaderOutput<Integer>>... actions) throws Exception {
+
+		final CollectingDataOutput<Integer> out = new CollectingDataOutput<>();
+
+		final TestProcessingTimeService timeService = new TestProcessingTimeService();
+		timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that is not zero
+
+		final SourceReader<Integer, MockSourceSplit> reader = new InterpretingSourceReader(actions);
+
+		final SourceOperator<Integer, MockSourceSplit> sourceOperator =
+				createTestOperator(reader, watermarkStrategy, timeService);
+
+		while (sourceOperator.emitNext(out) != InputStatus.END_OF_INPUT) {
+			timeService.setCurrentTime(timeService.getCurrentProcessingTime() + 100);
+		}
+
+		return out.events;
+	}
+
+	// ------------------------------------------------------------------------
+	//   test setup helpers
+	// ------------------------------------------------------------------------
+
+	private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
+			SourceReader<T, MockSourceSplit> reader,
+			WatermarkStrategy<T> watermarkStrategy,
+			ProcessingTimeService timeService) throws Exception {
+
+		final OperatorStateStore operatorStateStore =
+				new MemoryStateBackend().createOperatorStateBackend(
+						new MockEnvironmentBuilder().build(),
+						"test-operator",
+						Collections.emptyList(),
+						new CloseableRegistry());
+
+		final StateInitializationContext stateContext = new StateInitializationContextImpl(
+			false, operatorStateStore, null, null, null);
+
+		final SourceOperator<T, MockSourceSplit> sourceOperator =
+				new TestingSourceOperator<>(reader, watermarkStrategy, timeService);
+		sourceOperator.initializeState(stateContext);
+		sourceOperator.open();
+
+		return sourceOperator;
+	}
+
+	// ------------------------------------------------------------------------
+	//   test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class InterpretingSourceReader implements SourceReader<Integer, MockSourceSplit> {
+
+		private final Iterator<Consumer<ReaderOutput<Integer>>> actions;
+
+		@SafeVarargs
+		private InterpretingSourceReader(Consumer<ReaderOutput<Integer>>... actions) {
+			this.actions = Arrays.asList(actions).iterator();
+		}
+
+		@Override
+		public void start() {}
+
+		@Override
+		public InputStatus pollNext(ReaderOutput<Integer> output) {
+			if (actions.hasNext()) {
+				actions.next().accept(output);
+				return InputStatus.MORE_AVAILABLE;
+			} else {
+				return InputStatus.END_OF_INPUT;
+			}
+		}
+
+		@Override
+		public List<MockSourceSplit> snapshotState() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public CompletableFuture<Void> isAvailable() {
+			return CompletableFuture.completedFuture(null);
+		}
+
+		@Override
+		public void addSplits(List<MockSourceSplit> splits) {}
+
+		@Override
+		public void handleSourceEvents(SourceEvent sourceEvent) {}
+
+		@Override
+		public void close() {}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java
new file mode 100644
index 0000000..b3f079c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
+import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link SourceOutputWithWatermarks}.
+ */
+public class SourceOutputWithWatermarksTest {
+
+	@Test
+	public void testNoTimestampValue() {
+		final CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();
+		final SourceOutputWithWatermarks<Integer> out = SourceOutputWithWatermarks.createWithSameOutputs(
+				dataOutput, new RecordTimestampAssigner<>(), new NoWatermarksGenerator<>());
+
+		out.collect(17);
+
+		final Object event = dataOutput.events.get(0);
+		assertThat(event, instanceOf(StreamRecord.class));
+		assertEquals(TimestampAssigner.NO_TIMESTAMP, ((StreamRecord<?>) event).getTimestamp());
+	}
+
+	@Test
+	public void eventsAreBeforeWatermarks() {
+		final CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();
+		final SourceOutputWithWatermarks<Integer> out = SourceOutputWithWatermarks.createWithSameOutputs(
+				dataOutput, new RecordTimestampAssigner<>(), new TestWatermarkGenerator<>());
+
+		out.collect(42, 12345L);
+
+		assertThat(dataOutput.events, contains(
+				new StreamRecord<>(42, 12345L),
+				new org.apache.flink.streaming.api.watermark.Watermark(12345L)
+		));
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestWatermarkGenerator<T> implements WatermarkGenerator<T> {
+
+		private long lastTimestamp;
+
+		@Override
+		public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
+			lastTimestamp = eventTimestamp;
+			output.emitWatermark(new Watermark(eventTimestamp));
+		}
+
+		@Override
+		public void onPeriodicEmit(WatermarkOutput output) {
+			output.emitWatermark(new Watermark(lastTimestamp));
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
new file mode 100644
index 0000000..43992dc
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+/**
+ * A SourceOperator extension to simplify test setup.
+ */
+public class TestingSourceOperator<T>  extends SourceOperator<T, MockSourceSplit> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final int subtaskIndex;
+	private final int parallelism;
+
+	public TestingSourceOperator(
+			SourceReader<T, MockSourceSplit> reader,
+			WatermarkStrategy<T> watermarkStrategy,
+			ProcessingTimeService timeService) {
+
+		this(reader, watermarkStrategy, timeService, new MockOperatorEventGateway(), 1, 5);
+	}
+
+	public TestingSourceOperator(
+			SourceReader<T, MockSourceSplit> reader,
+			OperatorEventGateway eventGateway,
+			int subtaskIndex) {
+
+		this(reader, WatermarkStrategies.<T>noWatermarks().build(), new TestProcessingTimeService(), eventGateway, subtaskIndex, 5);
+	}
+
+	public TestingSourceOperator(
+			SourceReader<T, MockSourceSplit> reader,
+			WatermarkStrategy<T> watermarkStrategy,
+			ProcessingTimeService timeService,
+			OperatorEventGateway eventGateway,
+			int subtaskIndex,
+			int parallelism) {
+
+		super(
+			(context) -> reader,
+			eventGateway,
+			new MockSourceSplitSerializer(),
+			watermarkStrategy,
+			timeService);
+
+		this.subtaskIndex = subtaskIndex;
+		this.parallelism = parallelism;
+		this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+	}
+
+	@Override
+	public StreamingRuntimeContext getRuntimeContext() {
+		return new MockStreamingRuntimeContext(false, parallelism, subtaskIndex);
+	}
+
+	// this is overridden to avoid complex mock injection through the "containingTask"
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		ExecutionConfig cfg = new ExecutionConfig();
+		cfg.setAutoWatermarkInterval(100);
+		return cfg;
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java
new file mode 100644
index 0000000..0fb3f81
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for the {@link WatermarkToDataOutput}.
+ */
+public class WatermarkToDataOutputTest {
+
+	@Test
+	public void testInitialZeroWatermark() {
+		final CollectingDataOutput<Object> testingOutput = new CollectingDataOutput<>();
+		final WatermarkToDataOutput wmOutput = new WatermarkToDataOutput(testingOutput);
+
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(0L));
+
+		assertThat(testingOutput.events, contains(new Watermark(0L)));
+	}
+
+	@Test
+	public void testWatermarksDoNotRegress() {
+		final CollectingDataOutput<Object> testingOutput = new CollectingDataOutput<>();
+		final WatermarkToDataOutput wmOutput = new WatermarkToDataOutput(testingOutput);
+
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(12L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(17L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(10L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(18L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(17L));
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(18L));
+
+		assertThat(testingOutput.events, contains(
+			new Watermark(12L),
+			new Watermark(17L),
+			new Watermark(18L)
+		));
+	}
+
+	@Test
+	public void becomingActiveEmitsStatus() {
+		final CollectingDataOutput<Object> testingOutput = new CollectingDataOutput<>();
+		final WatermarkToDataOutput wmOutput = new WatermarkToDataOutput(testingOutput);
+
+		wmOutput.markIdle();
+		wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(100L));
+
+		assertThat(testingOutput.events, contains(
+			StreamStatus.IDLE,
+			StreamStatus.ACTIVE,
+			new Watermark(100L)
+		));
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index b1e75b2..515bd51 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
@@ -107,7 +109,7 @@ public class SourceOperatorStreamTaskTest {
 
 			// Build expected output to verify the results
 			Queue<Object> expectedOutput = new LinkedList<>();
-			expectedRecords.forEach(r -> expectedOutput.offer(new StreamRecord<>(r)));
+			expectedRecords.forEach(r -> expectedOutput.offer(new StreamRecord<>(r, TimestampAssigner.NO_TIMESTAMP)));
 			// Add barrier to the expected output.
 			expectedOutput.add(new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions));
 
@@ -128,8 +130,10 @@ public class SourceOperatorStreamTaskTest {
 			long checkpointId,
 			TaskStateSnapshot snapshot) throws Exception {
 		// get a source operator.
-		SourceOperatorFactory<Integer> sourceOperatorFactory =
-				new SourceOperatorFactory<>(new MockSource(Boundedness.BOUNDED, 1));
+		SourceOperatorFactory<Integer> sourceOperatorFactory = new SourceOperatorFactory<>(
+				new MockSource(Boundedness.BOUNDED, 1),
+				WatermarkStrategies.<Integer>noWatermarks().build());
+
 		// build a test harness.
 		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
 				new MultipleInputStreamTaskTestHarnessBuilder<>(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);