You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/27 20:01:07 UTC
[flink] 11/16: [FLINK-17899][runtime] Integrate FLIP-126 Watermarks
with FLIP-27 Sources
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 54f1a4c8071a6d71111185449e795b2f00fa49e9
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 19 20:37:31 2020 +0200
[FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sources
---
.../base/source/reader/SourceReaderBase.java | 43 +++-
.../base/source/reader/SourceReaderTestBase.java | 15 +-
.../flink/api/connector/source/ReaderOutput.java | 87 +++++++
.../flink/api/connector/source/SourceReader.java | 2 +-
.../connector/source/mocks/MockSourceReader.java | 4 +-
.../streaming/api/operators/SourceOperator.java | 60 ++++-
.../api/operators/SourceOperatorFactory.java | 21 +-
.../source/BatchTimestampsAndWatermarks.java | 126 ++++++++++
.../source/SourceOutputWithWatermarks.java | 179 ++++++++++++++
.../source/StreamingTimestampsAndWatermarks.java | 247 ++++++++++++++++++++
.../operators/source/TimestampsAndWatermarks.java | 92 ++++++++
.../source/TimestampsAndWatermarksContext.java | 46 ++++
.../operators/source/WatermarkToDataOutput.java | 86 +++++++
.../api/operators/SourceOperatorTest.java | 32 +--
.../api/operators/source/CollectingDataOutput.java | 56 +++++
.../source/OnEventTestWatermarkGenerator.java | 34 +++
.../source/OnPeriodicTestWatermarkGenerator.java | 43 ++++
.../source/SourceOperatorEventTimeTest.java | 257 +++++++++++++++++++++
.../source/SourceOutputWithWatermarksTest.java | 85 +++++++
.../operators/source/TestingSourceOperator.java | 94 ++++++++
.../source/WatermarkToDataOutputTest.java | 77 ++++++
.../tasks/SourceOperatorStreamTaskTest.java | 10 +-
22 files changed, 1637 insertions(+), 59 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index b5781de..1da08d1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.source.reader;
+import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
@@ -63,7 +64,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
/** The state of the splits. */
- private final Map<String, SplitStateT> splitStates;
+ private final Map<String, SplitContext<T, SplitStateT>> splitStates;
/** The record emitter to handle the records read by the SplitReaders. */
protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
@@ -111,7 +112,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
}
@Override
- public InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception {
+ public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
splitFetcherManager.checkErrors();
// poll from the queue if the last element was successfully handled. Otherwise
// just pass the last element again.
@@ -133,14 +134,19 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
// Process one record.
if (splitIter.hasNext()) {
// emit the record.
- E record = splitIter.next();
- recordEmitter.emitRecord(record, sourceOutput, splitStates.get(splitIter.currentSplitId()));
+ final E record = splitIter.next();
+ final SplitContext<T, SplitStateT> splitContext = splitStates.get(splitIter.currentSplitId());
+ final SourceOutput<T> splitOutput = splitContext.getOrCreateSplitOutput(output);
+ recordEmitter.emitRecord(record, splitOutput, splitContext.state);
LOG.trace("Emitted record: {}", record);
}
// Do some cleanup if the all the records in the current splitIter have been processed.
if (!splitIter.hasNext()) {
// First remove the state of the split.
- splitIter.finishedSplitIds().forEach(splitStates::remove);
+ splitIter.finishedSplitIds().forEach((id) -> {
+ splitStates.remove(id);
+ output.releaseOutputForSplit(id);
+ });
// Handle the finished splits.
onSplitFinished(splitIter.finishedSplitIds());
// Prepare the return status based on the availability of the next element.
@@ -173,7 +179,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
@Override
public List<SplitT> snapshotState() {
List<SplitT> splits = new ArrayList<>();
- splitStates.forEach((id, state) -> splits.add(toSplitType(id, state)));
+ splitStates.forEach((id, context) -> splits.add(toSplitType(id, context.state)));
return splits;
}
@@ -181,7 +187,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
public void addSplits(List<SplitT> splits) {
LOG.trace("Adding splits {}", splits);
// Initialize the state for each split.
- splits.forEach(s -> splitStates.put(s.splitId(), initializedState(s)));
+ splits.forEach(s -> splitStates.put(s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
// Hand over the splits to the split fetcher to start fetch.
splitFetcherManager.addSplits(splits);
}
@@ -201,6 +207,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
splitFetcherManager.close(options.sourceReaderCloseTimeout);
}
+
+
// -------------------- Abstract method to allow different implementations ------------------
/**
* Handles the finished splits to clean the state if needed.
@@ -233,4 +241,25 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
return InputStatus.NOTHING_AVAILABLE;
}
}
+
+ // ------------------ private helper classes ---------------------
+
+ private static final class SplitContext<T, SplitStateT> {
+
+ final String splitId;
+ final SplitStateT state;
+ SourceOutput<T> sourceOutput;
+
+ private SplitContext(String splitId, SplitStateT state) {
+ this.state = state;
+ this.splitId = splitId;
+ }
+
+ SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
+ if (sourceOutput == null) {
+ sourceOutput = mainOutput.createOutputForSplit(splitId);
+ }
+ return sourceOutput;
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index fcf44fa..5f58836 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
@@ -170,7 +171,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
/**
* A source output that validates the output.
*/
- protected static class ValidatingSourceOutput implements SourceOutput<Integer> {
+ protected static class ValidatingSourceOutput implements ReaderOutput<Integer> {
private Set<Integer> consumedValues = new HashSet<>();
private int max = Integer.MIN_VALUE;
private int min = Integer.MAX_VALUE;
@@ -204,13 +205,17 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
}
@Override
- public void emitWatermark(Watermark watermark) {
-
- }
+ public void emitWatermark(Watermark watermark) {}
@Override
- public void markIdle() {
+ public void markIdle() {}
+ @Override
+ public SourceOutput<Integer> createOutputForSplit(String splitId) {
+ return this;
}
+
+ @Override
+ public void releaseOutputForSplit(String splitId) {}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
new file mode 100644
index 0000000..1774a7c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
@@ -0,0 +1,87 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+/**
+ * The interface provided by Flink task to the {@link SourceReader} to emit records
+ * to downstream operators for message processing.
+ */
+@PublicEvolving
+public interface ReaderOutput<T> extends SourceOutput<T> {
+
+ /**
+ * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+ *
+ * @param record the record to emit.
+ */
+ @Override
+ void collect(T record);
+
+ /**
+ * Emit a record with timestamp.
+ *
+ * @param record the record to emit.
+ * @param timestamp the timestamp of the record.
+ */
+ @Override
+ void collect(T record, long timestamp);
+
+ /**
+ * Emits the given watermark.
+ *
+ * <p>Emitting a watermark also implicitly marks the stream as <i>active</i>, ending
+ * previously marked idleness.
+ */
+ @Override
+ void emitWatermark(Watermark watermark);
+
+ /**
+ * Marks this output as idle, meaning that downstream operations do not
+ * wait for watermarks from this output.
+ *
+ * <p>An output becomes active again as soon as the next watermark is emitted.
+ */
+ @Override
+ void markIdle();
+
+ /**
+ * Creates a {@code SourceOutput} for a specific Source Split. Use these outputs if you want to
+ * run split-local logic, like watermark generation.
+ *
+ * <p>If a split-local output was already created for this split-ID, the method will return that instance,
+ * so that only one split-local output exists per split-ID.
+ *
+ * <p><b>IMPORTANT:</b> After the split has been finished, it is crucial to release the created
+ * output again. Otherwise it will continue to contribute to the watermark generation like a
+ * perpetually stalling source split, and may hold back the watermark indefinitely.
+ *
+ * @see #releaseOutputForSplit(String)
+ */
+ SourceOutput<T> createOutputForSplit(String splitId);
+
+ /**
+ * Releases the {@code SourceOutput} created for the split with the given ID.
+ *
+ * @see #createOutputForSplit(String)
+ */
+ void releaseOutputForSplit(String splitId);
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index 6a26d70..b09a724 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -51,7 +51,7 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
*
* @return The InputStatus of the SourceReader after the method invocation.
*/
- InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception;
+ InputStatus pollNext(ReaderOutput<T> output) throws Exception;
/**
* Checkpoint on the state of the source.
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index 9a7a327..0bdcdec 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -18,8 +18,8 @@
package org.apache.flink.api.connector.source.mocks;
+import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.core.io.InputStatus;
@@ -53,7 +53,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit>
}
@Override
- public InputStatus pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
+ public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception {
boolean finished = true;
currentSplitIndex = 0;
// Find first splits with available records.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 5d4b2d6..9ac80d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -19,11 +19,12 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
@@ -38,8 +39,10 @@ import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CollectionUtil;
import java.util.List;
@@ -81,30 +84,47 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
/** The event gateway through which this operator talks to its coordinator. */
private final OperatorEventGateway operatorEventGateway;
- // ---- lazily initialized fields ----
+ /** The factory for timestamps and watermark generators. */
+ private final WatermarkStrategy<OUT> watermarkStrategy;
+
+ // ---- lazily initialized fields (these fields are the "hot" fields) ----
/** The source reader that does most of the work. */
private SourceReader<OUT, SplitT> sourceReader;
+ private ReaderOutput<OUT> currentMainOutput;
+
+ private DataOutput<OUT> lastInvokedOutput;
+
/** The state that holds the currently assigned splits. */
private ListState<SplitT> readerState;
+ /** The event time and watermarking logic. Ideally this would be eagerly passed into this operator,
+ * but we currently need to instantiate this lazily, because the metric groups exist only later. */
+ private TimestampsAndWatermarks<OUT> eventTimeLogic;
+
public SourceOperator(
Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory,
OperatorEventGateway operatorEventGateway,
- SimpleVersionedSerializer<SplitT> splitSerializer) {
+ SimpleVersionedSerializer<SplitT> splitSerializer,
+ WatermarkStrategy<OUT> watermarkStrategy,
+ ProcessingTimeService timeService) {
this.readerFactory = checkNotNull(readerFactory);
this.operatorEventGateway = checkNotNull(operatorEventGateway);
this.splitSerializer = checkNotNull(splitSerializer);
+ this.watermarkStrategy = checkNotNull(watermarkStrategy);
+ this.processingTimeService = timeService;
}
@Override
public void open() throws Exception {
+ final MetricGroup metricGroup = getMetricGroup();
+
final SourceReaderContext context = new SourceReaderContext() {
@Override
public MetricGroup metricGroup() {
- return getRuntimeContext().getMetricGroup();
+ return metricGroup;
}
@Override
@@ -113,6 +133,15 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
}
};
+ // in the future when we support both batch and streaming modes for the source operator,
+ // and when this one is migrated to the "eager initialization" operator (StreamOperatorV2),
+ // then we should evaluate this during operator construction.
+ eventTimeLogic = TimestampsAndWatermarks.createStreamingEventTimeLogic(
+ watermarkStrategy,
+ metricGroup,
+ getProcessingTimeService(),
+ getExecutionConfig().getAutoWatermarkInterval());
+
sourceReader = readerFactory.apply(context);
// restore the state if necessary.
@@ -125,12 +154,31 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
sourceReader.start();
// Register the reader to the coordinator.
registerReader();
+
+ eventTimeLogic.startPeriodicWatermarkEmits();
+ }
+
+ @Override
+ public void close() throws Exception {
+ eventTimeLogic.stopPeriodicWatermarkEmits();
+ super.close();
}
@Override
- @SuppressWarnings("unchecked")
public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
- return sourceReader.pollNext((SourceOutput<OUT>) output);
+ // guarding an assumptions we currently make due to the fact that certain classes
+ // assume a constant output
+ assert lastInvokedOutput == output || lastInvokedOutput == null;
+
+ // short circuit the common case (every invocation except the first)
+ if (currentMainOutput != null) {
+ return sourceReader.pollNext(currentMainOutput);
+ }
+
+ // this creates a batch or streaming output based on the runtime mode
+ currentMainOutput = eventTimeLogic.createMainOutput(output);
+ lastInvokedOutput = output;
+ return sourceReader.pollNext(currentMainOutput);
}
@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index c30a0a7..02c7927 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -27,6 +28,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
+import org.apache.flink.streaming.api.operators.source.NoOpWatermarkGenerator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;
import java.util.function.Function;
@@ -34,13 +38,16 @@ import java.util.function.Function;
* The Factory class for {@link SourceOperator}.
*/
public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT>
- implements CoordinatedOperatorFactory<OUT> {
+ implements CoordinatedOperatorFactory<OUT>, ProcessingTimeServiceAware {
private static final long serialVersionUID = 1L;
/** The {@link Source} to create the {@link SourceOperator}. */
private final Source<OUT, ?, ?> source;
+ /** The event time setup (timestamp assigners, watermark generators, etc.). */
+ private final WatermarkStrategy<OUT> watermarkStrategy = (ctx) -> new NoOpWatermarkGenerator<>();
+
/** The number of worker thread for the source coordinator. */
private final int numCoordinatorWorkerThread;
@@ -61,7 +68,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
final SourceOperator<OUT, ?> sourceOperator = instantiateSourceOperator(
source::createReader,
gateway,
- source.getSplitSerializer());
+ source.getSplitSerializer(),
+ watermarkStrategy,
+ parameters.getProcessingTimeService());
sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
@@ -99,7 +108,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(
Function<SourceReaderContext, SourceReader<T, ?>> readerFactory,
OperatorEventGateway eventGateway,
- SimpleVersionedSerializer<?> splitSerializer) {
+ SimpleVersionedSerializer<?> splitSerializer,
+ WatermarkStrategy<T> watermarkStrategy,
+ ProcessingTimeService timeService) {
// jumping through generics hoops: cast the generics away to then cast them back more strictly typed
final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
@@ -110,6 +121,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
return new SourceOperator<>(
typedReaderFactory,
eventGateway,
- typedSplitSerializer);
+ typedSplitSerializer,
+ watermarkStrategy,
+ timeService);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
new file mode 100644
index 0000000..135cad9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of {@link TimestampsAndWatermarks} to be used during batch execution of a
+ * program. Batch execution has no watermarks, so all watermark related operations in this
+ * implementation are no-ops.
+ *
+ * @param <T> The type of the emitted records.
+ */
+@Internal
+public class BatchTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T> {
+
+ private final TimestampAssigner<T> timestamps;
+
+ /**
+ * Creates a new BatchTimestampsAndWatermarks with the given TimestampAssigner.
+ */
+ public BatchTimestampsAndWatermarks(TimestampAssigner<T> timestamps) {
+ this.timestamps = checkNotNull(timestamps);
+ }
+
+ @Override
+ public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output) {
+ checkNotNull(output);
+ return new TimestampsOnlyOutput<>(output, timestamps);
+ }
+
+ @Override
+ public void startPeriodicWatermarkEmits() {
+ // no periodic watermarks in batch processing
+ }
+
+ @Override
+ public void stopPeriodicWatermarkEmits() {
+ // no periodic watermarks in batch processing
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A simple implementation of {@link SourceOutput} and {@link ReaderOutput} that extracts
+ * timestamps but has no watermarking logic. Because only watermarking logic has state per
+ * Source Split, the same instance gets shared across all Source Splits.
+ *
+ * @param <T> The type of the emitted records.
+ */
+ private static final class TimestampsOnlyOutput<T> implements ReaderOutput<T> {
+
+ private final PushingAsyncDataInput.DataOutput<T> output;
+ private final TimestampAssigner<T> timestampAssigner;
+
+ private TimestampsOnlyOutput(
+ PushingAsyncDataInput.DataOutput<T> output,
+ TimestampAssigner<T> timestampAssigner) {
+
+ this.output = output;
+ this.timestampAssigner = timestampAssigner;
+ }
+
+ @Override
+ public void collect(T record) {
+ collect(record, TimestampAssigner.NO_TIMESTAMP);
+ }
+
+ @Override
+ public void collect(T record, long timestamp) {
+ try {
+ output.emitRecord(new StreamRecord<>(record, timestampAssigner.extractTimestamp(record, timestamp)));
+ } catch (ExceptionInChainedOperatorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ExceptionInChainedOperatorException(e);
+ }
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ // do nothing, this does not forward any watermarks manually emitted by the source directly
+ }
+
+ @Override
+ public void markIdle() {
+ // do nothing, because without watermarks there is no idleness
+ }
+
+ @Override
+ public SourceOutput<T> createOutputForSplit(String splitId) {
+ // we don't need per-partition instances, because we do not generate watermarks
+ return this;
+ }
+
+ @Override
+ public void releaseOutputForSplit(String splitId) {
+ // nothing to release, because we do not create per-partition instances
+ }
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
new file mode 100644
index 0000000..3a8396e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the SourceOutput. The records emitted to this output are pushed into a given
+ * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into the same output, or
+ * into a separate {@link WatermarkOutput}, if one is provided.
+ *
+ * <h2>Periodic Watermarks</h2>
+ *
+ * <p>This output does not implement automatic periodic watermark emission. The
+ * method {@link SourceOutputWithWatermarks#emitPeriodicWatermark()} needs to be called periodically.
+ *
+ * <h2>Note on Performance Considerations</h2>
+ *
+ * <p>The methods {@link SourceOutput#collect(Object)} and {@link SourceOutput#collect(Object, long)}
+ * are highly performance-critical (part of the hot loop). To make the code as JIT friendly as possible,
+ * we want to have only a single implementation of these two methods, across all classes.
+ * That way, the JIT compiler can de-virtualize (and inline) them better.
+ *
+ * <p>Currently, we have one implementation of these methods in the batch case (see class
+ * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this class). When the JVM
+ * is dedicated to a single job (or type of job) only one of these classes will be loaded. In mixed
+ * job setups, we still have a bimorphic method (rather than a poly/-/mega-morphic method).
+ *
+ * @param <T> The type of emitted records.
+ */
+@Internal
+public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
+
+ private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
+
+ private final TimestampAssigner<T> timestampAssigner;
+
+ private final WatermarkGenerator<T> watermarkGenerator;
+
+ private final WatermarkOutput onEventWatermarkOutput;
+
+ private final WatermarkOutput periodicWatermarkOutput;
+
+ /**
+ * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+ * and watermarks to the (possibly different) WatermarkOutput.
+ */
+ protected SourceOutputWithWatermarks(
+ PushingAsyncDataInput.DataOutput<T> recordsOutput,
+ WatermarkOutput onEventWatermarkOutput,
+ WatermarkOutput periodicWatermarkOutput,
+ TimestampAssigner<T> timestampAssigner,
+ WatermarkGenerator<T> watermarkGenerator) {
+
+ this.recordsOutput = checkNotNull(recordsOutput);
+ this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput);
+ this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput);
+ this.timestampAssigner = checkNotNull(timestampAssigner);
+ this.watermarkGenerator = checkNotNull(watermarkGenerator);
+ }
+
+ // ------------------------------------------------------------------------
+ // SourceOutput Methods
+ //
+ // Note that the two methods below are final, as a partial enforcement
+ // of the performance design goal mentioned in the class-level comment.
+ // ------------------------------------------------------------------------
+
+ @Override
+ public final void collect(T record) {
+ collect(record, TimestampAssigner.NO_TIMESTAMP);
+ }
+
+ @Override
+ public final void collect(T record, long timestamp) {
+ try {
+ final long assignedTimestamp = timestampAssigner.extractTimestamp(record, timestamp);
+
+ // IMPORTANT: The event must be emitted before the watermark generator is called.
+ recordsOutput.emitRecord(new StreamRecord<>(record, assignedTimestamp));
+ watermarkGenerator.onEvent(record, assignedTimestamp, onEventWatermarkOutput);
+ } catch (ExceptionInChainedOperatorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ExceptionInChainedOperatorException(e);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // WatermarkOutput Methods
+ //
+ // These two methods are final as well, to enforce the contract that the
+ // watermarks from emitWatermark(Watermark) go to the same output as the
+ // watermarks from the watermarkGenerator.onEvent(...) calls in the collect(...)
+ // methods.
+ // ------------------------------------------------------------------------
+
+ @Override
+ public final void emitWatermark(Watermark watermark) {
+ onEventWatermarkOutput.emitWatermark(watermark);
+ }
+
+ @Override
+ public final void markIdle() {
+ onEventWatermarkOutput.markIdle();
+ }
+
+ public final void emitPeriodicWatermark() {
+ watermarkGenerator.onPeriodicEmit(periodicWatermarkOutput);
+ }
+
+ // ------------------------------------------------------------------------
+ // Factories
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+ * and watermarks to the (possibly different) WatermarkOutput.
+ */
+ public static <E> SourceOutputWithWatermarks<E> createWithSameOutputs(
+ PushingAsyncDataInput.DataOutput<E> recordsAndWatermarksOutput,
+ TimestampAssigner<E> timestampAssigner,
+ WatermarkGenerator<E> watermarkGenerator) {
+
+ final WatermarkOutput watermarkOutput = new WatermarkToDataOutput(recordsAndWatermarksOutput);
+
+ return new SourceOutputWithWatermarks<>(
+ recordsAndWatermarksOutput,
+ watermarkOutput,
+ watermarkOutput,
+ timestampAssigner,
+ watermarkGenerator);
+ }
+
+ /**
+ * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput
+ * and watermarks to the different WatermarkOutputs.
+ */
+ public static <E> SourceOutputWithWatermarks<E> createWithSeparateOutputs(
+ PushingAsyncDataInput.DataOutput<E> recordsOutput,
+ WatermarkOutput onEventWatermarkOutput,
+ WatermarkOutput periodicWatermarkOutput,
+ TimestampAssigner<E> timestampAssigner,
+ WatermarkGenerator<E> watermarkGenerator) {
+
+ return new SourceOutputWithWatermarks<>(
+ recordsOutput,
+ onEventWatermarkOutput,
+ periodicWatermarkOutput,
+ timestampAssigner,
+ watermarkGenerator);
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java
new file mode 100644
index 0000000..af51f414
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/StreamingTimestampsAndWatermarks.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of timestamp extraction and watermark generation logic for streaming sources.
+ *
+ * @param <T> The type of the emitted records.
+ */
+@Internal
+public class StreamingTimestampsAndWatermarks<T> implements TimestampsAndWatermarks<T> {
+
+ private final TimestampAssigner<T> timestampAssigner;
+
+ private final WatermarkGeneratorSupplier<T> watermarksFactory;
+
+ private final WatermarkGeneratorSupplier.Context watermarksContext;
+
+ private final ProcessingTimeService timeService;
+
+ private final long periodicWatermarkInterval;
+
+ @Nullable
+ private SplitLocalOutputs<T> currentPerSplitOutputs;
+
+ @Nullable
+ private StreamingReaderOutput<T> currentMainOutput;
+
+ @Nullable
+ private ScheduledFuture<?> periodicEmitHandle;
+
+ public StreamingTimestampsAndWatermarks(
+ TimestampAssigner<T> timestampAssigner,
+ WatermarkGeneratorSupplier<T> watermarksFactory,
+ WatermarkGeneratorSupplier.Context watermarksContext,
+ ProcessingTimeService timeService,
+ Duration periodicWatermarkInterval) {
+
+ this.timestampAssigner = timestampAssigner;
+ this.watermarksFactory = watermarksFactory;
+ this.watermarksContext = watermarksContext;
+ this.timeService = timeService;
+
+ long periodicWatermarkIntervalMillis;
+ try {
+ periodicWatermarkIntervalMillis = periodicWatermarkInterval.toMillis();
+ } catch (ArithmeticException ignored) {
+ // long integer overflow
+ periodicWatermarkIntervalMillis = Long.MAX_VALUE;
+ }
+ this.periodicWatermarkInterval = periodicWatermarkIntervalMillis;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output) {
+ // At the moment, we assume only one output is ever created!
+ // This assumption is strict, currently, because many of the classes in this implementation do not
+ // support re-assigning the underlying output
+ checkState(currentMainOutput == null && currentPerSplitOutputs == null, "already created a main output");
+
+ final WatermarkOutput watermarkOutput = new WatermarkToDataOutput(output);
+ final WatermarkGenerator<T> watermarkGenerator = watermarksFactory.createWatermarkGenerator(watermarksContext);
+
+ currentPerSplitOutputs = new SplitLocalOutputs<>(
+ output,
+ watermarkOutput,
+ timestampAssigner,
+ watermarksFactory,
+ watermarksContext);
+
+ currentMainOutput = new StreamingReaderOutput<>(
+ output,
+ watermarkOutput,
+ timestampAssigner,
+ watermarkGenerator,
+ currentPerSplitOutputs);
+
+ return currentMainOutput;
+ }
+
+ @Override
+ public void startPeriodicWatermarkEmits() {
+ checkState(periodicEmitHandle == null, "periodic emitter already started");
+
+ if (periodicWatermarkInterval == 0) {
+ // a value of zero means not activated
+ return;
+ }
+
+ periodicEmitHandle = timeService.scheduleWithFixedDelay(
+ this::triggerPeriodicEmit,
+ periodicWatermarkInterval,
+ periodicWatermarkInterval);
+ }
+
+ @Override
+ public void stopPeriodicWatermarkEmits() {
+ if (periodicEmitHandle != null) {
+ periodicEmitHandle.cancel(false);
+ periodicEmitHandle = null;
+ }
+ }
+
+ void triggerPeriodicEmit(@SuppressWarnings("unused") long wallClockTimestamp) {
+ if (currentPerSplitOutputs != null) {
+ currentPerSplitOutputs.emitPeriodicWatermark();
+ }
+ if (currentMainOutput != null) {
+ currentMainOutput.emitPeriodicWatermark();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class StreamingReaderOutput<T> extends SourceOutputWithWatermarks<T> implements ReaderOutput<T> {
+
+ private final SplitLocalOutputs<T> splitLocalOutputs;
+
+ StreamingReaderOutput(
+ PushingAsyncDataInput.DataOutput<T> output,
+ WatermarkOutput watermarkOutput,
+ TimestampAssigner<T> timestampAssigner,
+ WatermarkGenerator<T> watermarkGenerator,
+ SplitLocalOutputs<T> splitLocalOutputs) {
+
+ super(output, watermarkOutput, watermarkOutput, timestampAssigner, watermarkGenerator);
+ this.splitLocalOutputs = splitLocalOutputs;
+ }
+
+ @Override
+ public SourceOutput<T> createOutputForSplit(String splitId) {
+ return splitLocalOutputs.createOutputForSplit(splitId);
+ }
+
+ @Override
+ public void releaseOutputForSplit(String splitId) {
+ splitLocalOutputs.releaseOutputForSplit(splitId);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A holder and factory for split-local {@link SourceOutput}s. The split-local outputs maintain
+ * local watermark generators with their own state, to facilitate per-split watermarking logic.
+ *
+ * @param <T> The type of the emitted records.
+ */
+ private static final class SplitLocalOutputs<T> {
+
+ private final WatermarkOutputMultiplexer watermarkMultiplexer;
+ private final Map<String, SourceOutputWithWatermarks<T>> localOutputs;
+ private final PushingAsyncDataInput.DataOutput<T> recordOutput;
+ private final TimestampAssigner<T> timestampAssigner;
+ private final WatermarkGeneratorSupplier<T> watermarksFactory;
+ private final WatermarkGeneratorSupplier.Context watermarkContext;
+
+ private SplitLocalOutputs(
+ PushingAsyncDataInput.DataOutput<T> recordOutput,
+ WatermarkOutput watermarkOutput,
+ TimestampAssigner<T> timestampAssigner,
+ WatermarkGeneratorSupplier<T> watermarksFactory,
+ WatermarkGeneratorSupplier.Context watermarkContext) {
+
+ this.recordOutput = recordOutput;
+ this.timestampAssigner = timestampAssigner;
+ this.watermarksFactory = watermarksFactory;
+ this.watermarkContext = watermarkContext;
+
+ this.watermarkMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
+ this.localOutputs = new LinkedHashMap<>(); // we use a LinkedHashMap because it iterates faster
+ }
+
+ SourceOutput<T> createOutputForSplit(String splitId) {
+ final SourceOutputWithWatermarks<T> previous = localOutputs.get(splitId);
+ if (previous != null) {
+ return previous;
+ }
+
+ watermarkMultiplexer.registerNewOutput(splitId);
+ final WatermarkOutput onEventOutput = watermarkMultiplexer.getImmediateOutput(splitId);
+ final WatermarkOutput periodicOutput = watermarkMultiplexer.getDeferredOutput(splitId);
+
+ final WatermarkGenerator<T> watermarks = watermarksFactory.createWatermarkGenerator(watermarkContext);
+
+ final SourceOutputWithWatermarks<T> localOutput = SourceOutputWithWatermarks.createWithSeparateOutputs(
+ recordOutput, onEventOutput, periodicOutput, timestampAssigner, watermarks);
+
+ localOutputs.put(splitId, localOutput);
+ return localOutput;
+ }
+
+ void releaseOutputForSplit(String splitId) {
+ localOutputs.remove(splitId);
+ watermarkMultiplexer.unregisterOutput(splitId);
+ }
+
+ void emitPeriodicWatermark() {
+ // The call in the loop only records the next watermark candidate for each local output.
+ // The call to 'watermarkMultiplexer.onPeriodicEmit()' actually merges the watermarks.
+ // That way, we save inefficient repeated merging of (partially outdated) watermarks before
+ // all local generators have emitted their candidates.
+ for (SourceOutputWithWatermarks<?> output : localOutputs.values()) {
+ output.emitPeriodicWatermark();
+ }
+ watermarkMultiplexer.onPeriodicEmit();
+ }
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
new file mode 100644
index 0000000..f95ec55
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.time.Duration;
+
+/**
+ * Basic interface for the timestamp extraction and watermark generation logic for the
+ * {@link org.apache.flink.api.connector.source.SourceReader}.
+ *
+ * <p>Implementations of this class may or may not actually perform certain tasks, like watermark
+ * generation. For example, the batch-oriented implementation typically skips all watermark generation
+ * logic.
+ *
+ * @param <T> The type of the emitted records.
+ */
+@Internal
+public interface TimestampsAndWatermarks<T> {
+
+ /**
+ * Creates the ReaderOutput for the source reader, than internally runs the timestamp extraction and
+ * watermark generation.
+ */
+ ReaderOutput<T> createMainOutput(PushingAsyncDataInput.DataOutput<T> output);
+
+ /**
+ * Starts emitting periodic watermarks, if this implementation produces watermarks, and if
+ * periodic watermarks are configured.
+ *
+ * <p>Periodic watermarks are produced by periodically calling the
+ * {@link org.apache.flink.api.common.eventtime.WatermarkGenerator#onPeriodicEmit(WatermarkOutput)} method
+ * of the underlying Watermark Generators.
+ */
+ void startPeriodicWatermarkEmits();
+
+ /**
+ * Stops emitting periodic watermarks.
+ */
+ void stopPeriodicWatermarkEmits();
+
+ // ------------------------------------------------------------------------
+ // factories
+ // ------------------------------------------------------------------------
+
+ static <E> TimestampsAndWatermarks<E> createStreamingEventTimeLogic(
+ WatermarkStrategy<E> watermarkStrategy,
+ MetricGroup metrics,
+ ProcessingTimeService timeService,
+ long periodicWatermarkIntervalMillis) {
+
+ final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics);
+ final TimestampAssigner<E> timestampAssigner = watermarkStrategy.createTimestampAssigner(context);
+
+ return new StreamingTimestampsAndWatermarks<>(
+ timestampAssigner, watermarkStrategy, context, timeService, Duration.ofMillis(periodicWatermarkIntervalMillis));
+ }
+
+ static <E> TimestampsAndWatermarks<E> createBatchEventTimeLogic(
+ WatermarkStrategy<E> watermarkStrategy,
+ MetricGroup metrics) {
+
+ final TimestampsAndWatermarksContext context = new TimestampsAndWatermarksContext(metrics);
+ final TimestampAssigner<E> timestampAssigner = watermarkStrategy.createTimestampAssigner(context);
+
+ return new BatchTimestampsAndWatermarks<>(timestampAssigner);
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
new file mode 100644
index 0000000..201b901
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple implementation of a context that is both {@link TimestampAssignerSupplier.Context}
+ * and {@link WatermarkGeneratorSupplier.Context}.
+ */
+@Internal
+public final class TimestampsAndWatermarksContext
+ implements TimestampAssignerSupplier.Context, WatermarkGeneratorSupplier.Context {
+
+ private final MetricGroup metricGroup;
+
+ public TimestampsAndWatermarksContext(MetricGroup metricGroup) {
+ this.metricGroup = checkNotNull(metricGroup);
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return metricGroup;
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
new file mode 100644
index 0000000..3e3b255
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An adapter that exposes a {@link WatermarkOutput} based on a {@link PushingAsyncDataInput.DataOutput}.
+ */
+@Internal
+public final class WatermarkToDataOutput implements WatermarkOutput {
+
+ private final PushingAsyncDataInput.DataOutput<?> output;
+ private long maxWatermarkSoFar;
+ private boolean isIdle;
+
+ /**
+ * Creates a new WatermarkOutput against the given DataOutput.
+ */
+ public WatermarkToDataOutput(PushingAsyncDataInput.DataOutput<?> output) {
+ this.output = checkNotNull(output);
+ this.maxWatermarkSoFar = Long.MIN_VALUE;
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ final long newWatermark = watermark.getTimestamp();
+ if (newWatermark <= maxWatermarkSoFar) {
+ return;
+ }
+
+ maxWatermarkSoFar = newWatermark;
+
+ try {
+ if (isIdle) {
+ output.emitStreamStatus(StreamStatus.ACTIVE);
+ isIdle = false;
+ }
+
+ output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(newWatermark));
+ } catch (ExceptionInChainedOperatorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ExceptionInChainedOperatorException(e);
+ }
+ }
+
+ @Override
+ public void markIdle() {
+ if (isIdle) {
+ return;
+ }
+
+ try {
+ output.emitStreamStatus(StreamStatus.IDLE);
+ isIdle = true;
+ } catch (ExceptionInChainedOperatorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ExceptionInChainedOperatorException(e);
+ }
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 34b7962..93ced72 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
@@ -28,7 +27,6 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -39,7 +37,7 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.util.CollectionUtil;
import org.junit.Before;
@@ -165,32 +163,4 @@ public class SourceOperatorTest {
return abstractStateBackend.createOperatorStateBackend(
env, "test-operator", Collections.emptyList(), cancelStreamRegistry);
}
-
- // -------------- Testing SourceOperator class -------------
-
- /**
- * A testing class that overrides the getRuntimeContext() Method.
- */
- private static class TestingSourceOperator<OUT> extends SourceOperator<OUT, MockSourceSplit> {
-
- private final int subtaskIndex;
-
- TestingSourceOperator(
- SourceReader<OUT, MockSourceSplit> reader,
- OperatorEventGateway eventGateway,
- int subtaskIndex) {
-
- super(
- (context) -> reader,
- eventGateway,
- new MockSourceSplitSerializer());
-
- this.subtaskIndex = subtaskIndex;
- }
-
- @Override
- public StreamingRuntimeContext getRuntimeContext() {
- return new MockStreamingRuntimeContext(false, 5, subtaskIndex);
- }
- }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
new file mode 100644
index 0000000..a536553
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/CollectingDataOutput.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A test utility implementation of {@link PushingAsyncDataInput.DataOutput} that collects all events.
+ */
+final class CollectingDataOutput<E> implements PushingAsyncDataInput.DataOutput<E> {
+
+ final List<Object> events = new ArrayList<>();
+
+ @Override
+ public void emitWatermark(Watermark watermark) throws Exception {
+ events.add(watermark);
+ }
+
+ @Override
+ public void emitStreamStatus(StreamStatus streamStatus) throws Exception {
+ events.add(streamStatus);
+ }
+
+ @Override
+ public void emitRecord(StreamRecord<E> streamRecord) throws Exception {
+ events.add(streamRecord);
+ }
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+ events.add(latencyMarker);
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java
new file mode 100644
index 0000000..55141c1
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnEventTestWatermarkGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+
+final class OnEventTestWatermarkGenerator<T> implements WatermarkGenerator<T> {
+
+ @Override
+ public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
+ output.emitWatermark(new Watermark(eventTimestamp));
+ }
+
+ @Override
+ public void onPeriodicEmit(WatermarkOutput output) {}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java
new file mode 100644
index 0000000..1342457
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/OnPeriodicTestWatermarkGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+
+import javax.annotation.Nullable;
+
+final class OnPeriodicTestWatermarkGenerator<T> implements WatermarkGenerator<T> {
+
+ @Nullable
+ private Long lastTimestamp;
+
+ @Override
+ public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
+ lastTimestamp = eventTimestamp;
+ }
+
+ @Override
+ public void onPeriodicEmit(WatermarkOutput output) {
+ if (lastTimestamp != null) {
+ output.emitWatermark(new Watermark(lastTimestamp));
+ }
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
new file mode 100644
index 0000000..347975c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests that validate correct handling of watermark generation in the {@link ReaderOutput} as created
+ * by the {@link StreamingTimestampsAndWatermarks}.
+ */
+public class SourceOperatorEventTimeTest {
+
+ @Test
+ public void testMainOutputPeriodicWatermarks() throws Exception {
+ final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+ .<Integer>forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>())
+ .build();
+
+ final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+ (output) -> output.collect(0, 100L),
+ (output) -> output.collect(0, 120L),
+ (output) -> output.collect(0, 110L)
+ );
+
+ assertThat(result, contains(
+ new Watermark(100L),
+ new Watermark(120L)
+ ));
+ }
+
+ @Test
+ public void testMainOutputEventWatermarks() throws Exception {
+ final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+ .<Integer>forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>())
+ .build();
+
+ final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+ (output) -> output.collect(0, 100L),
+ (output) -> output.collect(0, 120L),
+ (output) -> output.collect(0, 110L)
+ );
+
+ assertThat(result, contains(
+ new Watermark(100L),
+ new Watermark(120L)
+ ));
+ }
+
+ @Test
+ public void testPerSplitOutputPeriodicWatermarks() throws Exception {
+ final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+ .<Integer>forGenerator((ctx) -> new OnPeriodicTestWatermarkGenerator<>())
+ .build();
+
+ final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+ (output) -> {
+ output.createOutputForSplit("A");
+ output.createOutputForSplit("B");
+ },
+ (output) -> output.createOutputForSplit("A").collect(0, 100L),
+ (output) -> output.createOutputForSplit("B").collect(0, 200L),
+ (output) -> output.createOutputForSplit("A").collect(0, 150L),
+ (output) -> output.releaseOutputForSplit("A"),
+ (output) -> output.createOutputForSplit("B").collect(0, 200L)
+ );
+
+ assertThat(result, contains(
+ new Watermark(100L),
+ new Watermark(150L),
+ new Watermark(200L)
+ ));
+ }
+
+ @Test
+ public void testPerSplitOutputEventWatermarks() throws Exception {
+ final WatermarkStrategy<Integer> watermarkStrategy = WatermarkStrategies
+ .<Integer>forGenerator((ctx) -> new OnEventTestWatermarkGenerator<>())
+ .build();
+
+ final List<Watermark> result = testSequenceOfWatermarks(watermarkStrategy,
+ (output) -> {
+ output.createOutputForSplit("one");
+ output.createOutputForSplit("two");
+ },
+ (output) -> output.createOutputForSplit("one").collect(0, 100L),
+ (output) -> output.createOutputForSplit("two").collect(0, 200L),
+ (output) -> output.createOutputForSplit("one").collect(0, 150L),
+ (output) -> output.releaseOutputForSplit("one"),
+ (output) -> output.createOutputForSplit("two").collect(0, 200L)
+ );
+
+ assertThat(result, contains(
+ new Watermark(100L),
+ new Watermark(150L),
+ new Watermark(200L)
+ ));
+ }
+
+ // ------------------------------------------------------------------------
+ // test execution helpers
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("FinalPrivateMethod")
+ @SafeVarargs
+ private final List<Watermark> testSequenceOfWatermarks(
+ final WatermarkStrategy<Integer> watermarkStrategy,
+ final Consumer<ReaderOutput<Integer>>... actions) throws Exception {
+
+ final List<Object> allEvents = testSequenceOfEvents(watermarkStrategy, actions);
+
+ return allEvents.stream()
+ .filter((evt) -> evt instanceof org.apache.flink.streaming.api.watermark.Watermark)
+ .map((evt) -> new Watermark(((org.apache.flink.streaming.api.watermark.Watermark) evt).getTimestamp()))
+ .collect(Collectors.toList());
+ }
+
+ @SuppressWarnings("FinalPrivateMethod")
+ @SafeVarargs
+ private final List<Object> testSequenceOfEvents(
+ WatermarkStrategy<Integer> watermarkStrategy,
+ final Consumer<ReaderOutput<Integer>>... actions) throws Exception {
+
+ final CollectingDataOutput<Integer> out = new CollectingDataOutput<>();
+
+ final TestProcessingTimeService timeService = new TestProcessingTimeService();
+ timeService.setCurrentTime(Integer.MAX_VALUE); // start somewhere that is not zero
+
+ final SourceReader<Integer, MockSourceSplit> reader = new InterpretingSourceReader(actions);
+
+ final SourceOperator<Integer, MockSourceSplit> sourceOperator =
+ createTestOperator(reader, watermarkStrategy, timeService);
+
+ while (sourceOperator.emitNext(out) != InputStatus.END_OF_INPUT) {
+ timeService.setCurrentTime(timeService.getCurrentProcessingTime() + 100);
+ }
+
+ return out.events;
+ }
+
+ // ------------------------------------------------------------------------
+ // test setup helpers
+ // ------------------------------------------------------------------------
+
+ private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
+ SourceReader<T, MockSourceSplit> reader,
+ WatermarkStrategy<T> watermarkStrategy,
+ ProcessingTimeService timeService) throws Exception {
+
+ final OperatorStateStore operatorStateStore =
+ new MemoryStateBackend().createOperatorStateBackend(
+ new MockEnvironmentBuilder().build(),
+ "test-operator",
+ Collections.emptyList(),
+ new CloseableRegistry());
+
+ final StateInitializationContext stateContext = new StateInitializationContextImpl(
+ false, operatorStateStore, null, null, null);
+
+ final SourceOperator<T, MockSourceSplit> sourceOperator =
+ new TestingSourceOperator<>(reader, watermarkStrategy, timeService);
+ sourceOperator.initializeState(stateContext);
+ sourceOperator.open();
+
+ return sourceOperator;
+ }
+
+ // ------------------------------------------------------------------------
+ // test mocks
+ // ------------------------------------------------------------------------
+
+ private static final class InterpretingSourceReader implements SourceReader<Integer, MockSourceSplit> {
+
+ private final Iterator<Consumer<ReaderOutput<Integer>>> actions;
+
+ @SafeVarargs
+ private InterpretingSourceReader(Consumer<ReaderOutput<Integer>>... actions) {
+ this.actions = Arrays.asList(actions).iterator();
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<Integer> output) {
+ if (actions.hasNext()) {
+ actions.next().accept(output);
+ return InputStatus.MORE_AVAILABLE;
+ } else {
+ return InputStatus.END_OF_INPUT;
+ }
+ }
+
+ @Override
+ public List<MockSourceSplit> snapshotState() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> isAvailable() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void addSplits(List<MockSourceSplit> splits) {}
+
+ @Override
+ public void handleSourceEvents(SourceEvent sourceEvent) {}
+
+ @Override
+ public void close() {}
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java
new file mode 100644
index 0000000..b3f079c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarksTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
+import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link SourceOutputWithWatermarks}.
+ */
+public class SourceOutputWithWatermarksTest {
+
+ @Test
+ public void testNoTimestampValue() {
+ final CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();
+ final SourceOutputWithWatermarks<Integer> out = SourceOutputWithWatermarks.createWithSameOutputs(
+ dataOutput, new RecordTimestampAssigner<>(), new NoWatermarksGenerator<>());
+
+ out.collect(17);
+
+ final Object event = dataOutput.events.get(0);
+ assertThat(event, instanceOf(StreamRecord.class));
+ assertEquals(TimestampAssigner.NO_TIMESTAMP, ((StreamRecord<?>) event).getTimestamp());
+ }
+
+ @Test
+ public void eventsAreBeforeWatermarks() {
+ final CollectingDataOutput<Integer> dataOutput = new CollectingDataOutput<>();
+ final SourceOutputWithWatermarks<Integer> out = SourceOutputWithWatermarks.createWithSameOutputs(
+ dataOutput, new RecordTimestampAssigner<>(), new TestWatermarkGenerator<>());
+
+ out.collect(42, 12345L);
+
+ assertThat(dataOutput.events, contains(
+ new StreamRecord<>(42, 12345L),
+ new org.apache.flink.streaming.api.watermark.Watermark(12345L)
+ ));
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class TestWatermarkGenerator<T> implements WatermarkGenerator<T> {
+
+ private long lastTimestamp;
+
+ @Override
+ public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
+ lastTimestamp = eventTimestamp;
+ output.emitWatermark(new Watermark(eventTimestamp));
+ }
+
+ @Override
+ public void onPeriodicEmit(WatermarkOutput output) {
+ output.emitWatermark(new Watermark(lastTimestamp));
+ }
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
new file mode 100644
index 0000000..43992dc
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+/**
+ * A SourceOperator extension to simplify test setup.
+ */
+public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int subtaskIndex;
+ private final int parallelism;
+
+ public TestingSourceOperator(
+ SourceReader<T, MockSourceSplit> reader,
+ WatermarkStrategy<T> watermarkStrategy,
+ ProcessingTimeService timeService) {
+
+ this(reader, watermarkStrategy, timeService, new MockOperatorEventGateway(), 1, 5);
+ }
+
+ public TestingSourceOperator(
+ SourceReader<T, MockSourceSplit> reader,
+ OperatorEventGateway eventGateway,
+ int subtaskIndex) {
+
+ this(reader, WatermarkStrategies.<T>noWatermarks().build(), new TestProcessingTimeService(), eventGateway, subtaskIndex, 5);
+ }
+
+ public TestingSourceOperator(
+ SourceReader<T, MockSourceSplit> reader,
+ WatermarkStrategy<T> watermarkStrategy,
+ ProcessingTimeService timeService,
+ OperatorEventGateway eventGateway,
+ int subtaskIndex,
+ int parallelism) {
+
+ super(
+ (context) -> reader,
+ eventGateway,
+ new MockSourceSplitSerializer(),
+ watermarkStrategy,
+ timeService);
+
+ this.subtaskIndex = subtaskIndex;
+ this.parallelism = parallelism;
+ this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+ }
+
+ @Override
+ public StreamingRuntimeContext getRuntimeContext() {
+ return new MockStreamingRuntimeContext(false, parallelism, subtaskIndex);
+ }
+
+ // this is overridden to avoid complex mock injection through the "containingTask"
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ ExecutionConfig cfg = new ExecutionConfig();
+ cfg.setAutoWatermarkInterval(100);
+ return cfg;
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java
new file mode 100644
index 0000000..0fb3f81
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutputTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for the {@link WatermarkToDataOutput}.
+ */
+public class WatermarkToDataOutputTest {
+
+ @Test
+ public void testInitialZeroWatermark() {
+ final CollectingDataOutput<Object> testingOutput = new CollectingDataOutput<>();
+ final WatermarkToDataOutput wmOutput = new WatermarkToDataOutput(testingOutput);
+
+ wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(0L));
+
+ assertThat(testingOutput.events, contains(new Watermark(0L)));
+ }
+
+ @Test
+ public void testWatermarksDoNotRegress() {
+ final CollectingDataOutput<Object> testingOutput = new CollectingDataOutput<>();
+ final WatermarkToDataOutput wmOutput = new WatermarkToDataOutput(testingOutput);
+
+ wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(12L));
+ wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(17L));
+ wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(10L));
+ wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(18L));
+ wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(17L));
+ wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(18L));
+
+ assertThat(testingOutput.events, contains(
+ new Watermark(12L),
+ new Watermark(17L),
+ new Watermark(18L)
+ ));
+ }
+
+ @Test
+ public void becomingActiveEmitsStatus() {
+ final CollectingDataOutput<Object> testingOutput = new CollectingDataOutput<>();
+ final WatermarkToDataOutput wmOutput = new WatermarkToDataOutput(testingOutput);
+
+ wmOutput.markIdle();
+ wmOutput.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark(100L));
+
+ assertThat(testingOutput.events, contains(
+ StreamStatus.IDLE,
+ StreamStatus.ACTIVE,
+ new Watermark(100L)
+ ));
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index b1e75b2..515bd51 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.runtime.tasks;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.WatermarkStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
@@ -107,7 +109,7 @@ public class SourceOperatorStreamTaskTest {
// Build expected output to verify the results
Queue<Object> expectedOutput = new LinkedList<>();
- expectedRecords.forEach(r -> expectedOutput.offer(new StreamRecord<>(r)));
+ expectedRecords.forEach(r -> expectedOutput.offer(new StreamRecord<>(r, TimestampAssigner.NO_TIMESTAMP)));
// Add barrier to the expected output.
expectedOutput.add(new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions));
@@ -128,8 +130,10 @@ public class SourceOperatorStreamTaskTest {
long checkpointId,
TaskStateSnapshot snapshot) throws Exception {
// get a source operator.
- SourceOperatorFactory<Integer> sourceOperatorFactory =
- new SourceOperatorFactory<>(new MockSource(Boundedness.BOUNDED, 1));
+ SourceOperatorFactory<Integer> sourceOperatorFactory = new SourceOperatorFactory<>(
+ new MockSource(Boundedness.BOUNDED, 1),
+ WatermarkStrategies.<Integer>noWatermarks().build());
+
// build a test harness.
MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
new MultipleInputStreamTaskTestHarnessBuilder<>(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);